34 const string CConnectionRepository::_overflow_event_name =
"CRDatagramOverflow";
37 PStatCollector CConnectionRepository::_update_pcollector(
"App:Show code:readerPollTask:Update");
43 CConnectionRepository::
44 CConnectionRepository(
bool has_owner_view,
bool threaded_net) :
45 _lock(
"CConnectionRepository::_lock"),
47 _python_repository(nullptr),
53 _cw(&_qcm, threaded_net ? 1 : 0),
54 _qcr(&_qcm, threaded_net ? 1 : 0),
56 #ifdef WANT_NATIVE_NET
57 _bdc(4096000,4096000,1400),
60 _client_datagram(true),
61 _handle_datagrams_internally(handle_datagrams_internally),
62 _simulated_disconnect(false),
63 _verbose(distributed_cat.is_spam()),
68 _has_owner_view(has_owner_view),
69 _handle_c_updates(true),
70 _want_message_bundling(true),
74 #if defined(HAVE_NET) && defined(SIMULATE_NETWORK_DELAY)
75 if (min_lag != 0.0 || max_lag != 0.0) {
76 _qcr.start_delay(min_lag, max_lag);
79 _tcp_header_size = tcp_header_size;
85 CConnectionRepository::
86 ~CConnectionRepository() {
98 _tcp_header_size = tcp_header_size;
101 if (_http_conn !=
nullptr) {
102 _http_conn->set_tcp_header_size(tcp_header_size);
107 _cw.set_tcp_header_size(tcp_header_size);
108 _qcr.set_tcp_header_size(tcp_header_size);
118 void CConnectionRepository::
119 set_connection_http(HTTPChannel *channel) {
123 nassertv(channel->is_connection_ready());
124 _http_conn = channel->get_connection();
125 _http_conn->set_tcp_header_size(_tcp_header_size);
126 #ifdef SIMULATE_NETWORK_DELAY
127 if (min_lag != 0.0 || max_lag != 0.0) {
128 _http_conn->start_delay(min_lag, max_lag);
132 #endif // HAVE_OPENSSL
139 SocketStream *CConnectionRepository::
145 #endif // HAVE_OPENSSL
153 bool CConnectionRepository::
154 try_connect_net(
const URLSpec &url) {
161 game_server_timeout_ms);
163 if (_net_conn !=
nullptr) {
164 _net_conn->set_no_delay(
true);
165 _qcr.add_connection(_net_conn);
173 #ifdef WANT_NATIVE_NET
178 bool CConnectionRepository::
179 connect_native(
const URLSpec &url) {
185 _bdc.ClearAddresses();
186 _bdc.AddAddress(addr);
187 return _bdc.DoConnect();
190 #endif //WANT NATIVE NET
192 #ifdef SIMULATE_NETWORK_DELAY
206 void CConnectionRepository::
207 start_delay(
double min_delay,
double max_delay) {
210 if (min_delay != 0.0 || max_delay != 0.0) {
212 _qcr.start_delay(min_delay, max_delay);
215 if (_http_conn !=
nullptr) {
216 _http_conn->start_delay(min_delay, max_delay);
218 #endif // HAVE_OPENSSL
223 #endif // SIMULATE_NETWORK_DELAY
225 #ifdef SIMULATE_NETWORK_DELAY
231 void CConnectionRepository::
239 if (_http_conn !=
nullptr) {
240 _http_conn->stop_delay();
242 #endif // HAVE_OPENSSL
244 #endif // SIMULATE_NETWORK_DELAY
255 if (_simulated_disconnect) {
258 #ifdef WANT_NATIVE_NET
261 #endif //WANT_NATIVE_NET
263 while (do_check_datagram()) {
265 describe_message(nout,
"RECV", _dg);
271 if (!_client_datagram) {
272 unsigned char wc_cnt;
274 _msg_channels.clear();
275 for (
unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) {
277 _msg_channels.push_back(schan);
284 if (_python_repository !=
nullptr) {
285 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
286 PyGILState_STATE gstate;
287 gstate = PyGILState_Ensure();
289 PyObject *value = PyLong_FromUnsignedLongLong(_msg_sender);
290 PyObject_SetAttrString(_python_repository,
"msgSender", value);
292 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
293 PyGILState_Release(gstate);
296 #endif // HAVE_PYTHON
301 if (!_handle_datagrams_internally) {
307 case CLIENT_OBJECT_SET_FIELD:
308 case STATESERVER_OBJECT_SET_FIELD:
309 if (_handle_c_updates) {
310 if (_has_owner_view) {
311 if (!handle_update_field_owner()) {
315 if (!handle_update_field()) {
324 #endif // HAVE_PYTHON
346 #ifdef WANT_NATIVE_NET
348 return (_bdc.IsConnected());
353 if (_qcm.reset_connection_available()) {
355 if (_qcm.get_reset_connection(reset_connection)) {
356 _qcm.close_connection(reset_connection);
357 if (reset_connection == _net_conn) {
370 if (!_http_conn->is_closed()) {
376 _http_conn =
nullptr;
378 #endif // HAVE_OPENSSL
392 if (_simulated_disconnect) {
393 distributed_cat.warning()
394 <<
"Unable to send datagram during simulated disconnect.\n";
399 describe_message(nout,
"SEND", dg);
407 #ifdef WANT_NATIVE_NET
409 bool result = _bdc.SendMessage(dg);
410 if (!result && _bdc.IsConnected()) {
412 std::ostringstream s;
414 #if PY_VERSION_HEX >= 0x03030000
415 PyObject *exc_type = PyExc_ConnectionError;
417 PyObject *exc_type = PyExc_OSError;
420 s << endl <<
"Error sending message: " << endl;
422 s <<
"Message data: " << dg.
get_data() << endl;
424 string message = s.str();
425 PyErr_SetString(exc_type, message.c_str());
434 _cw.send(dg, _net_conn);
441 if (!_http_conn->send_datagram(dg)) {
442 distributed_cat.warning()
443 <<
"Could not send datagram.\n";
449 #endif // HAVE_OPENSSL
451 distributed_cat.warning()
452 <<
"Unable to send datagram after connection is closed.\n";
471 nout <<
"CR::SEND:BUNDLE_START(" << _bundling_msgs <<
")" << endl;
473 if (_bundling_msgs == 0) {
474 _bundle_msgs.clear();
485 nassertv(_bundling_msgs);
490 nout <<
"CR::SEND:BUNDLE_FINISH(" << _bundling_msgs <<
")" << endl;
502 BundledMsgVector::const_iterator bmi;
503 for (bmi = _bundle_msgs.begin(); bmi != _bundle_msgs.end(); bmi++) {
520 _bundle_msgs.clear();
526 void CConnectionRepository::
542 if (_simulated_disconnect) {
546 #ifdef WANT_NATIVE_NET
553 return _net_conn->consider_flush();
559 return _http_conn->consider_flush();
561 #endif // HAVE_OPENSSL
574 if (_simulated_disconnect) {
577 #ifdef WANT_NATIVE_NET
584 return _net_conn->flush();
590 return _http_conn->flush();
592 #endif // HAVE_OPENSSL
604 #ifdef WANT_NATIVE_NET
607 _bdc.ClearAddresses();
612 _qcm.close_connection(_net_conn);
621 _http_conn =
nullptr;
623 #endif // HAVE_OPENSSL
625 _simulated_disconnect =
false;
645 bool CConnectionRepository::
646 do_check_datagram() {
647 #ifdef WANT_NATIVE_NET
649 return _bdc.GetMessage(_dg);
654 _net_conn->consider_flush();
655 if (_qcr.get_overflow_flag()) {
657 _qcr.reset_overflow_flag();
659 return (_qcr.data_available() && _qcr.get_data(_dg));
665 _http_conn->consider_flush();
666 return _http_conn->receive_datagram(_dg);
668 #endif // HAVE_OPENSSL
680 bool CConnectionRepository::
681 handle_update_field() {
683 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
684 PyGILState_STATE gstate;
685 gstate = PyGILState_Ensure();
690 if (_python_repository !=
nullptr)
693 PyObject_GetAttrString(_python_repository,
"doId2do");
694 nassertr(doId2do !=
nullptr,
false);
696 #ifdef USE_PYTHON_2_2_OR_EARLIER
697 PyObject *doId = PyInt_FromLong(do_id);
699 PyObject *doId = PyLong_FromUnsignedLong(do_id);
701 PyObject *distobj = PyDict_GetItem(doId2do, doId);
705 if (distobj !=
nullptr) {
706 PyObject *dclass_obj = PyObject_GetAttrString(distobj,
"dclass");
707 nassertr(dclass_obj !=
nullptr,
false);
710 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
711 Py_DECREF(dclass_obj);
712 nassertr(dclass_this !=
nullptr,
false);
715 Py_DECREF(dclass_this);
719 if (_in_quiet_zone) {
720 PyObject *neverDisable = PyObject_GetAttrString(distobj,
"neverDisable");
721 nassertr(neverDisable !=
nullptr,
false);
723 unsigned int cNeverDisable = PyLong_AsLong(neverDisable);
724 if (!cNeverDisable) {
727 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
728 PyGILState_Release(gstate);
739 dclass->receive_update(distobj, _di);
742 if (PyErr_Occurred()) {
743 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
744 PyGILState_Release(gstate);
752 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
753 PyGILState_Release(gstate);
755 #endif // HAVE_PYTHON
768 bool CConnectionRepository::
769 handle_update_field_owner() {
771 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
772 PyGILState_STATE gstate;
773 gstate = PyGILState_Ensure();
778 if (_python_repository !=
nullptr) {
780 PyObject_GetAttrString(_python_repository,
"doId2do");
781 nassertr(doId2do !=
nullptr,
false);
783 PyObject *doId2ownerView =
784 PyObject_GetAttrString(_python_repository,
"doId2ownerView");
785 nassertr(doId2ownerView !=
nullptr,
false);
787 #ifdef USE_PYTHON_2_2_OR_EARLIER
788 PyObject *doId = PyInt_FromLong(do_id);
790 PyObject *doId = PyLong_FromUnsignedLong(do_id);
794 PyObject *distobjOV = PyDict_GetItem(doId2ownerView, doId);
795 Py_DECREF(doId2ownerView);
797 if (distobjOV !=
nullptr) {
798 PyObject *dclass_obj = PyObject_GetAttrString(distobjOV,
"dclass");
799 nassertr(dclass_obj !=
nullptr,
false);
801 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
802 Py_DECREF(dclass_obj);
803 nassertr(dclass_this !=
nullptr,
false);
806 Py_DECREF(dclass_this);
819 Py_INCREF(distobjOV);
823 dclass->receive_update(distobjOV, _odi);
824 Py_DECREF(distobjOV);
826 if (PyErr_Occurred()) {
827 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
828 PyGILState_Release(gstate);
836 PyObject *distobj = PyDict_GetItem(doId2do, doId);
840 if (distobj !=
nullptr) {
841 PyObject *dclass_obj = PyObject_GetAttrString(distobj,
"dclass");
842 nassertr(dclass_obj !=
nullptr,
false);
844 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
845 Py_DECREF(dclass_obj);
846 nassertr(dclass_this !=
nullptr,
false);
849 Py_DECREF(dclass_this);
864 dclass->receive_update(distobj, _di);
867 if (PyErr_Occurred()) {
868 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
869 PyGILState_Release(gstate);
877 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
878 PyGILState_Release(gstate);
880 #endif // HAVE_PYTHON
889 void CConnectionRepository::
890 describe_message(std::ostream &out,
const string &prefix,
897 bool is_update =
false;
898 string full_prefix =
"CR::" + prefix;
900 if (!_client_datagram)
903 for( ;mcnt > 0; mcnt--)
904 packer.RAW_UNPACK_CHANNEL();
906 packer.RAW_UNPACK_CHANNEL();
908 is_update = (msg_type == STATESERVER_OBJECT_SET_FIELD);
912 is_update = (msg_type == CLIENT_OBJECT_SET_FIELD);
921 if (_python_repository !=
nullptr) {
922 PyObject *msgId = PyLong_FromLong(msg_type);
923 nassertv(msgId !=
nullptr);
924 #if PY_MAJOR_VERSION >= 3
925 PyObject *methodName = PyUnicode_FromString(
"_getMsgName");
927 PyObject *methodName = PyString_FromString(
"_getMsgName");
929 nassertv(methodName !=
nullptr);
931 PyObject *result = PyObject_CallMethodObjArgs(_python_repository, methodName,
933 nassertv(result !=
nullptr);
935 #if PY_MAJOR_VERSION >= 3
936 msgName += string(PyUnicode_AsUTF8(result));
938 msgName += string(PyString_AsString(result));
941 Py_DECREF(methodName);
946 if (msgName.length() == 0) {
947 msgName +=
"unknown message ";
951 out << full_prefix <<
":" << msgName <<
"\n";
961 if (_python_repository !=
nullptr) {
963 PyObject_GetAttrString(_python_repository,
"doId2do");
964 nassertv(doId2do !=
nullptr);
966 #ifdef USE_PYTHON_2_2_OR_EARLIER
967 PyObject *doId = PyInt_FromLong(do_id);
969 PyObject *doId = PyLong_FromUnsignedLong(do_id);
971 PyObject *distobj = PyDict_GetItem(doId2do, doId);
975 if (distobj !=
nullptr) {
976 PyObject *dclass_obj = PyObject_GetAttrString(distobj,
"dclass");
977 nassertv(dclass_obj !=
nullptr);
979 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
980 Py_DECREF(dclass_obj);
981 nassertv(dclass_this !=
nullptr);
983 dclass = (
DCClass *)PyLong_AsVoidPtr(dclass_this);
984 Py_DECREF(dclass_this);
987 #endif // HAVE_PYTHON
991 if (dclass ==
nullptr) {
992 out << full_prefix <<
"update for unknown object " << do_id
993 <<
", field " << field_id <<
"\n";
996 out << full_prefix <<
997 ":" << dclass->
get_name() <<
"(" << do_id <<
").";
999 if (field ==
nullptr) {
1000 out <<
"unknown field " << field_id <<
"\n";