00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "cConnectionRepository.h"
00016 #include "dcmsgtypes.h"
00017 #include "dcClass.h"
00018 #include "dcPacker.h"
00019
00020 #include "config_distributed.h"
00021 #include "config_downloader.h"
00022 #include "httpChannel.h"
00023 #include "urlSpec.h"
00024 #include "datagramIterator.h"
00025 #include "throw_event.h"
00026 #include "pStatTimer.h"
00027
00028 #ifdef HAVE_PYTHON
00029 #ifndef CPPPARSER
00030 #include "py_panda.h"
00031 IMPORT_THIS struct Dtool_PyTypedObject Dtool_DatagramIterator;
00032 IMPORT_THIS struct Dtool_PyTypedObject Dtool_DCClass;
00033 #endif
00034 #endif
00035
00036 const string CConnectionRepository::_overflow_event_name = "CRDatagramOverflow";
00037
00038 #ifndef CPPPARSER
00039 PStatCollector CConnectionRepository::_update_pcollector("App:Show code:readerPollTask:Update");
00040 #endif // CPPPARSER
00041
00042
00043
00044
00045
00046
00047 CConnectionRepository::
00048 CConnectionRepository(bool has_owner_view, bool threaded_net) :
00049 _lock("CConnectionRepository::_lock"),
00050 #ifdef HAVE_PYTHON
00051 _python_repository(NULL),
00052 _python_ai_datagramiterator(NULL),
00053 #endif
00054 #ifdef HAVE_OPENSSL
00055 _http_conn(NULL),
00056 #endif
00057 #ifdef HAVE_NET
00058 _cw(&_qcm, threaded_net ? 1 : 0),
00059 _qcr(&_qcm, threaded_net ? 1 : 0),
00060 #endif
00061 #ifdef WANT_NATIVE_NET
00062 _bdc(4096000,4096000,1400),
00063 _native(false),
00064 #endif
00065 _client_datagram(true),
00066 _handle_datagrams_internally(handle_datagrams_internally),
00067 _simulated_disconnect(false),
00068 _verbose(distributed_cat.is_spam()),
00069 _time_warning(0.0),
00070
00071 _msg_sender(0),
00072 _msg_type(0),
00073 _has_owner_view(has_owner_view),
00074 _handle_c_updates(true),
00075 _want_message_bundling(true),
00076 _bundling_msgs(0),
00077 _in_quiet_zone(0)
00078 {
00079 #if defined(HAVE_NET) && defined(SIMULATE_NETWORK_DELAY)
00080 if (min_lag != 0.0 || max_lag != 0.0) {
00081 _qcr.start_delay(min_lag, max_lag);
00082 }
00083 #endif
00084 _tcp_header_size = tcp_header_size;
00085
00086 #ifdef HAVE_PYTHON
00087 PyObject * PyDitterator = DTool_CreatePyInstance(&_di,Dtool_DatagramIterator,false,false);
00088 if(PyDitterator != NULL)
00089 _python_ai_datagramiterator = Py_BuildValue("(O)",PyDitterator);
00090 #endif
00091
00092 }
00093
00094
00095
00096
00097
00098
00099 CConnectionRepository::
00100 ~CConnectionRepository() {
00101 disconnect();
00102 }
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113 void CConnectionRepository::
00114 set_tcp_header_size(int tcp_header_size) {
00115 _tcp_header_size = tcp_header_size;
00116
00117 #ifdef HAVE_OPENSSL
00118 if (_http_conn != (SocketStream *)NULL) {
00119 _http_conn->set_tcp_header_size(tcp_header_size);
00120 }
00121 #endif
00122
00123 #ifdef HAVE_NET
00124 _cw.set_tcp_header_size(tcp_header_size);
00125 _qcr.set_tcp_header_size(tcp_header_size);
00126 #endif
00127 }
00128
00129 #ifdef HAVE_OPENSSL
00130
00131
00132
00133
00134
00135
00136
00137
00138 void CConnectionRepository::
00139 set_connection_http(HTTPChannel *channel) {
00140 ReMutexHolder holder(_lock);
00141
00142 disconnect();
00143 nassertv(channel->is_connection_ready());
00144 _http_conn = channel->get_connection();
00145 _http_conn->set_tcp_header_size(_tcp_header_size);
00146 #ifdef SIMULATE_NETWORK_DELAY
00147 if (min_lag != 0.0 || max_lag != 0.0) {
00148 _http_conn->start_delay(min_lag, max_lag);
00149 }
00150 #endif
00151 }
00152 #endif // HAVE_OPENSSL
00153
00154 #ifdef HAVE_OPENSSL
00155
00156
00157
00158
00159
00160
00161
00162 SocketStream *CConnectionRepository::
00163 get_stream() {
00164 ReMutexHolder holder(_lock);
00165
00166 return _http_conn;
00167 }
00168 #endif // HAVE_OPENSSL
00169
00170
00171 #ifdef HAVE_NET
00172
00173
00174
00175
00176
00177
00178
00179 bool CConnectionRepository::
00180 try_connect_net(const URLSpec &url) {
00181 ReMutexHolder holder(_lock);
00182
00183 disconnect();
00184
00185 _net_conn =
00186 _qcm.open_TCP_client_connection(url.get_server(), url.get_port(),
00187 game_server_timeout_ms);
00188
00189 if (_net_conn != (Connection *)NULL) {
00190 _net_conn->set_no_delay(true);
00191 _qcr.add_connection(_net_conn);
00192 return true;
00193 }
00194
00195 return false;
00196 }
00197 #endif // HAVE_NET
00198
00199 #ifdef WANT_NATIVE_NET
00200
00201
00202
00203
00204
00205
00206 bool CConnectionRepository::
00207 connect_native(const URLSpec &url) {
00208 ReMutexHolder holder(_lock);
00209
00210 _native=true;
00211 Socket_Address addr;
00212 addr.set_host(url.get_server(),url.get_port());
00213 _bdc.ClearAddresses();
00214 _bdc.AddAddress(addr);
00215 return _bdc.DoConnect();
00216 }
00217
00218 #endif //WANT NATIVE NET
00219
00220 #ifdef SIMULATE_NETWORK_DELAY
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238 void CConnectionRepository::
00239 start_delay(double min_delay, double max_delay) {
00240 ReMutexHolder holder(_lock);
00241
00242 if (min_delay != 0.0 || max_delay != 0.0) {
00243 #ifdef HAVE_NET
00244 _qcr.start_delay(min_delay, max_delay);
00245 #endif // HAVE_NET
00246 #ifdef HAVE_OPENSSL
00247 if (_http_conn != (SocketStream *)NULL) {
00248 _http_conn->start_delay(min_delay, max_delay);
00249 }
00250 #endif // HAVE_OPENSSL
00251 } else {
00252 stop_delay();
00253 }
00254 }
00255 #endif // SIMULATE_NETWORK_DELAY
00256
00257 #ifdef SIMULATE_NETWORK_DELAY
00258
00259
00260
00261
00262
00263
00264
00265 void CConnectionRepository::
00266 stop_delay() {
00267 ReMutexHolder holder(_lock);
00268
00269 #ifdef HAVE_NET
00270 _qcr.stop_delay();
00271 #endif // HAVE_NET
00272 #ifdef HAVE_OPENSSL
00273 if (_http_conn != (SocketStream *)NULL) {
00274 _http_conn->stop_delay();
00275 }
00276 #endif // HAVE_OPENSSL
00277 }
00278 #endif // SIMULATE_NETWORK_DELAY
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289 bool CConnectionRepository::
00290 check_datagram() {
00291 ReMutexHolder holder(_lock);
00292
00293 if (_simulated_disconnect) {
00294 return false;
00295 }
00296 #ifdef WANT_NATIVE_NET
00297 if(_native)
00298 _bdc.Flush();
00299 #endif //WANT_NATIVE_NET
00300
00301 while (do_check_datagram()) {
00302 if (get_verbose()) {
00303 describe_message(nout, "RECV", _dg);
00304 }
00305
00306
00307 _di = DatagramIterator(_dg);
00308
00309 if (!_client_datagram) {
00310 unsigned char wc_cnt;
00311 wc_cnt = _di.get_uint8();
00312 _msg_channels.clear();
00313 for (unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) {
00314 CHANNEL_TYPE schan = _di.get_uint64();
00315 _msg_channels.push_back(schan);
00316 }
00317 _msg_sender = _di.get_uint64();
00318
00319 #ifdef HAVE_PYTHON
00320
00321
00322
00323 if (_python_repository != (PyObject *)NULL) {
00324 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00325 PyGILState_STATE gstate;
00326 gstate = PyGILState_Ensure();
00327 #endif
00328 PyObject *value = PyLong_FromUnsignedLongLong(_msg_sender);
00329 PyObject_SetAttrString(_python_repository, "msgSender", value);
00330 Py_DECREF(value);
00331 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00332 PyGILState_Release(gstate);
00333 #endif
00334 }
00335 #endif // HAVE_PYTHON
00336 }
00337
00338 _msg_type = _di.get_uint16();
00339
00340 if (!_handle_datagrams_internally) {
00341 return true;
00342 }
00343
00344 switch (_msg_type) {
00345 #ifdef HAVE_PYTHON
00346 case CLIENT_OBJECT_UPDATE_FIELD:
00347 case STATESERVER_OBJECT_UPDATE_FIELD:
00348 if (_handle_c_updates) {
00349 if (_has_owner_view) {
00350 if (!handle_update_field_owner()) {
00351 return false;
00352 }
00353 } else {
00354 if (!handle_update_field()) {
00355 return false;
00356 }
00357 }
00358 } else {
00359
00360 return true;
00361 }
00362 break;
00363 #endif // HAVE_PYTHON
00364
00365 default:
00366
00367 return true;
00368 }
00369 }
00370
00371
00372 return false;
00373 }
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 bool CConnectionRepository::
00389 is_connected() {
00390 ReMutexHolder holder(_lock);
00391
00392 #ifdef WANT_NATIVE_NET
00393 if(_native)
00394 return (_bdc.IsConnected());
00395 #endif
00396
00397 #ifdef HAVE_NET
00398 if (_net_conn) {
00399 if (_qcm.reset_connection_available()) {
00400 PT(Connection) reset_connection;
00401 if (_qcm.get_reset_connection(reset_connection)) {
00402 _qcm.close_connection(reset_connection);
00403 if (reset_connection == _net_conn) {
00404
00405 _net_conn = NULL;
00406 return false;
00407 }
00408 }
00409 }
00410 return true;
00411 }
00412 #endif // HAVE_NET
00413
00414 #ifdef HAVE_OPENSSL
00415 if (_http_conn) {
00416 if (!_http_conn->is_closed()) {
00417 return true;
00418 }
00419
00420
00421 delete _http_conn;
00422 _http_conn = NULL;
00423 }
00424 #endif // HAVE_OPENSSL
00425
00426 return false;
00427 }
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437 bool CConnectionRepository::
00438 send_datagram(const Datagram &dg) {
00439 ReMutexHolder holder(_lock);
00440
00441 if (_simulated_disconnect) {
00442 distributed_cat.warning()
00443 << "Unable to send datagram during simulated disconnect.\n";
00444 return false;
00445 }
00446
00447 if (get_verbose()) {
00448 describe_message(nout, "SEND", dg);
00449 }
00450
00451 if (is_bundling_messages() && get_want_message_bundling()) {
00452 bundle_msg(dg);
00453 return true;
00454 }
00455
00456 #ifdef WANT_NATIVE_NET
00457 if(_native)
00458 return _bdc.SendMessage(dg);
00459 #endif
00460
00461 #ifdef HAVE_NET
00462 if (_net_conn) {
00463 _cw.send(dg, _net_conn);
00464 return true;
00465 }
00466 #endif // HAVE_NET
00467
00468 #ifdef HAVE_OPENSSL
00469 if (_http_conn) {
00470 if (!_http_conn->send_datagram(dg)) {
00471 distributed_cat.warning()
00472 << "Could not send datagram.\n";
00473 return false;
00474 }
00475
00476 return true;
00477 }
00478 #endif // HAVE_OPENSSL
00479
00480 distributed_cat.warning()
00481 << "Unable to send datagram after connection is closed.\n";
00482 return false;
00483 }
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495 void CConnectionRepository::
00496 start_message_bundle() {
00497 ReMutexHolder holder(_lock);
00498
00499
00500
00501
00502
00503 if (get_verbose()) {
00504 nout << "CR::SEND:BUNDLE_START(" << _bundling_msgs << ")" << endl;
00505 }
00506 if (_bundling_msgs == 0) {
00507 _bundle_msgs.clear();
00508 }
00509 ++_bundling_msgs;
00510 }
00511
00512
00513
00514
00515
00516
00517
00518 void CConnectionRepository::
00519 send_message_bundle(unsigned int channel, unsigned int sender_channel) {
00520 ReMutexHolder holder(_lock);
00521 nassertv(_bundling_msgs);
00522
00523 --_bundling_msgs;
00524
00525 if (get_verbose()) {
00526 nout << "CR::SEND:BUNDLE_FINISH(" << _bundling_msgs << ")" << endl;
00527 }
00528
00529
00530 if (_bundling_msgs == 0 && get_want_message_bundling()) {
00531 Datagram dg;
00532
00533 dg.add_int8(1);
00534 dg.add_uint64(channel);
00535 dg.add_uint64(sender_channel);
00536 dg.add_uint16(STATESERVER_BOUNCE_MESSAGE);
00537
00538 BundledMsgVector::const_iterator bmi;
00539 for (bmi = _bundle_msgs.begin(); bmi != _bundle_msgs.end(); bmi++) {
00540 dg.add_string(*bmi);
00541 }
00542
00543 send_datagram(dg);
00544 }
00545 }
00546
00547
00548
00549
00550
00551
00552
00553 void CConnectionRepository::
00554 abandon_message_bundles() {
00555 ReMutexHolder holder(_lock);
00556
00557 nassertv(is_bundling_messages());
00558 _bundling_msgs = 0;
00559 _bundle_msgs.clear();
00560 }
00561
00562
00563
00564
00565
00566
00567 void CConnectionRepository::
00568 bundle_msg(const Datagram &dg) {
00569 ReMutexHolder holder(_lock);
00570
00571 nassertv(is_bundling_messages());
00572 _bundle_msgs.push_back(dg.get_message());
00573 }
00574
00575
00576
00577
00578
00579
00580
00581
00582 bool CConnectionRepository::
00583 consider_flush() {
00584 ReMutexHolder holder(_lock);
00585
00586 if (_simulated_disconnect) {
00587 return false;
00588 }
00589
00590 #ifdef WANT_NATIVE_NET
00591 if(_native)
00592 return true;
00593 #endif
00594
00595 #ifdef HAVE_NET
00596 if (_net_conn) {
00597 return _net_conn->consider_flush();
00598 }
00599 #endif // HAVE_NET
00600
00601 #ifdef HAVE_OPENSSL
00602 if (_http_conn) {
00603 return _http_conn->consider_flush();
00604 }
00605 #endif // HAVE_OPENSSL
00606
00607 return false;
00608 }
00609
00610
00611
00612
00613
00614
00615
00616
00617 bool CConnectionRepository::
00618 flush() {
00619 ReMutexHolder holder(_lock);
00620
00621 if (_simulated_disconnect) {
00622 return false;
00623 }
00624 #ifdef WANT_NATIVE_NET
00625 if(_native)
00626 return _bdc.Flush();
00627 #endif
00628
00629 #ifdef HAVE_NET
00630 if (_net_conn) {
00631 return _net_conn->flush();
00632 }
00633 #endif // HAVE_NET
00634
00635 #ifdef HAVE_OPENSSL
00636 if (_http_conn) {
00637 return _http_conn->flush();
00638 }
00639 #endif // HAVE_OPENSSL
00640
00641 return false;
00642 }
00643
00644
00645
00646
00647
00648
00649 void CConnectionRepository::
00650 disconnect() {
00651 ReMutexHolder holder(_lock);
00652
00653 #ifdef WANT_NATIVE_NET
00654 if(_native) {
00655 _bdc.Reset();
00656 _bdc.ClearAddresses();
00657 }
00658 #endif
00659 #ifdef HAVE_NET
00660 if (_net_conn) {
00661 _qcm.close_connection(_net_conn);
00662 _net_conn = NULL;
00663 }
00664 #endif // HAVE_NET
00665
00666 #ifdef HAVE_OPENSSL
00667 if (_http_conn) {
00668 _http_conn->close();
00669 delete _http_conn;
00670 _http_conn = NULL;
00671 }
00672 #endif // HAVE_OPENSSL
00673
00674 _simulated_disconnect = false;
00675 }
00676
00677
00678
00679
00680
00681
00682
00683 void CConnectionRepository::
00684 shutdown() {
00685 disconnect();
00686
00687 #ifdef HAVE_NET
00688 _cw.shutdown();
00689 _qcr.shutdown();
00690 #endif // HAVE_NET
00691 }
00692
00693
00694
00695
00696
00697
00698
00699 bool CConnectionRepository::
00700 do_check_datagram() {
00701 #ifdef WANT_NATIVE_NET
00702 if(_native) {
00703 return _bdc.GetMessage(_dg);
00704 }
00705 #endif
00706 #ifdef HAVE_NET
00707 if (_net_conn) {
00708 _net_conn->consider_flush();
00709 if (_qcr.get_overflow_flag()) {
00710 throw_event(get_overflow_event_name());
00711 _qcr.reset_overflow_flag();
00712 }
00713 return (_qcr.data_available() && _qcr.get_data(_dg));
00714 }
00715 #endif // HAVE_NET
00716
00717 #ifdef HAVE_OPENSSL
00718 if (_http_conn) {
00719 _http_conn->consider_flush();
00720 return _http_conn->receive_datagram(_dg);
00721 }
00722 #endif // HAVE_OPENSSL
00723
00724
00725 return false;
00726 }
00727
00728
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738 bool CConnectionRepository::
00739 handle_update_field() {
00740 #ifdef HAVE_PYTHON
00741 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00742 PyGILState_STATE gstate;
00743 gstate = PyGILState_Ensure();
00744 #endif
00745
00746 PStatTimer timer(_update_pcollector);
00747 unsigned int do_id = _di.get_uint32();
00748 if (_python_repository != (PyObject *)NULL)
00749 {
00750 PyObject *doId2do =
00751 PyObject_GetAttrString(_python_repository, "doId2do");
00752 nassertr(doId2do != NULL, false);
00753
00754 #ifdef USE_PYTHON_2_2_OR_EARLIER
00755 PyObject *doId = PyInt_FromLong(do_id);
00756 #else
00757 PyObject *doId = PyLong_FromUnsignedLong(do_id);
00758 #endif
00759 PyObject *distobj = PyDict_GetItem(doId2do, doId);
00760 Py_DECREF(doId);
00761 Py_DECREF(doId2do);
00762
00763 if (distobj != NULL) {
00764 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
00765 nassertr(dclass_obj != NULL, false);
00766
00767
00768 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
00769 Py_DECREF(dclass_obj);
00770 nassertr(dclass_this != NULL, false);
00771
00772 DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this);
00773 Py_DECREF(dclass_this);
00774
00775
00776
00777 if (_in_quiet_zone) {
00778 PyObject *neverDisable = PyObject_GetAttrString(distobj, "neverDisable");
00779 nassertr(neverDisable != NULL, false);
00780
00781 unsigned int cNeverDisable = PyInt_AsLong(neverDisable);
00782 if (!cNeverDisable) {
00783
00784
00785 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00786 PyGILState_Release(gstate);
00787 #endif
00788 return true;
00789 }
00790 }
00791
00792
00793
00794
00795
00796 Py_INCREF(distobj);
00797 dclass->receive_update(distobj, _di);
00798 Py_DECREF(distobj);
00799
00800 if (PyErr_Occurred()) {
00801 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00802 PyGILState_Release(gstate);
00803 #endif
00804 return false;
00805 }
00806 }
00807
00808 }
00809
00810 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00811 PyGILState_Release(gstate);
00812 #endif
00813 #endif // HAVE_PYTHON
00814 return true;
00815 }
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825
00826
00827
00828
00829 bool CConnectionRepository::
00830 handle_update_field_owner() {
00831 #ifdef HAVE_PYTHON
00832 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00833 PyGILState_STATE gstate;
00834 gstate = PyGILState_Ensure();
00835 #endif
00836
00837 PStatTimer timer(_update_pcollector);
00838 unsigned int do_id = _di.get_uint32();
00839 if (_python_repository != (PyObject *)NULL) {
00840 PyObject *doId2do =
00841 PyObject_GetAttrString(_python_repository, "doId2do");
00842 nassertr(doId2do != NULL, false);
00843
00844 PyObject *doId2ownerView =
00845 PyObject_GetAttrString(_python_repository, "doId2ownerView");
00846 nassertr(doId2ownerView != NULL, false);
00847
00848 #ifdef USE_PYTHON_2_2_OR_EARLIER
00849 PyObject *doId = PyInt_FromLong(do_id);
00850 #else
00851 PyObject *doId = PyLong_FromUnsignedLong(do_id);
00852 #endif
00853
00854
00855 PyObject *distobjOV = PyDict_GetItem(doId2ownerView, doId);
00856 Py_DECREF(doId2ownerView);
00857
00858 if (distobjOV != NULL) {
00859 PyObject *dclass_obj = PyObject_GetAttrString(distobjOV, "dclass");
00860 nassertr(dclass_obj != NULL, false);
00861
00862 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
00863 Py_DECREF(dclass_obj);
00864 nassertr(dclass_this != NULL, false);
00865
00866 DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this);
00867 Py_DECREF(dclass_this);
00868
00869
00870 DCPacker packer;
00871 packer.set_unpack_data(_di.get_remaining_bytes());
00872 int field_id = packer.raw_unpack_uint16();
00873 DCField *field = dclass->get_field_by_index(field_id);
00874 if (field->is_ownrecv()) {
00875
00876
00877
00878
00879 Py_INCREF(distobjOV);
00880
00881
00882 DatagramIterator _odi(_di);
00883 dclass->receive_update(distobjOV, _odi);
00884 Py_DECREF(distobjOV);
00885
00886 if (PyErr_Occurred()) {
00887 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00888 PyGILState_Release(gstate);
00889 #endif
00890 return false;
00891 }
00892 }
00893 }
00894
00895
00896 PyObject *distobj = PyDict_GetItem(doId2do, doId);
00897 Py_DECREF(doId);
00898 Py_DECREF(doId2do);
00899
00900 if (distobj != NULL) {
00901 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
00902 nassertr(dclass_obj != NULL, false);
00903
00904 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
00905 Py_DECREF(dclass_obj);
00906 nassertr(dclass_this != NULL, false);
00907
00908 DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this);
00909 Py_DECREF(dclass_this);
00910
00911
00912 DCPacker packer;
00913 packer.set_unpack_data(_di.get_remaining_bytes());
00914 int field_id = packer.raw_unpack_uint16();
00915 DCField *field = dclass->get_field_by_index(field_id);
00916 if (true) {
00917
00918
00919
00920
00921 Py_INCREF(distobj);
00922 dclass->receive_update(distobj, _di);
00923 Py_DECREF(distobj);
00924
00925 if (PyErr_Occurred()) {
00926 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00927 PyGILState_Release(gstate);
00928 #endif
00929 return false;
00930 }
00931 }
00932 }
00933 }
00934
00935 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00936 PyGILState_Release(gstate);
00937 #endif
00938 #endif // HAVE_PYTHON
00939
00940 return true;
00941 }
00942
00943
00944
00945
00946
00947
00948
00949
00950 void CConnectionRepository::
00951 describe_message(ostream &out, const string &prefix,
00952 const Datagram &dg) const {
00953 DCPacker packer;
00954
00955 packer.set_unpack_data(dg.get_message());
00956 CHANNEL_TYPE do_id;
00957 int msg_type;
00958 bool is_update = false;
00959 string full_prefix = "CR::" + prefix;
00960
00961 if (!_client_datagram)
00962 {
00963 unsigned char mcnt = packer.raw_unpack_uint8();
00964 for( ;mcnt > 0; mcnt--)
00965 packer.RAW_UNPACK_CHANNEL();
00966
00967 packer.RAW_UNPACK_CHANNEL();
00968 msg_type = packer.raw_unpack_uint16();
00969 is_update = (msg_type == STATESERVER_OBJECT_UPDATE_FIELD);
00970
00971 } else {
00972 msg_type = packer.raw_unpack_uint16();
00973 is_update = (msg_type == CLIENT_OBJECT_UPDATE_FIELD);
00974 }
00975
00976 if (!is_update) {
00977
00978
00979 string msgName;
00980
00981 #ifdef HAVE_PYTHON
00982 if (_python_repository != (PyObject *)NULL) {
00983 PyObject *msgId = PyLong_FromLong(msg_type);
00984 nassertv(msgId != NULL);
00985 PyObject *methodName = PyString_FromString("_getMsgName");
00986 nassertv(methodName != NULL);
00987
00988 PyObject *result = PyObject_CallMethodObjArgs(_python_repository, methodName,
00989 msgId, NULL);
00990 nassertv(result != NULL);
00991
00992 msgName += string(PyString_AsString(result));
00993
00994 Py_DECREF(methodName);
00995 Py_DECREF(msgId);
00996 Py_DECREF(result);
00997 }
00998 #endif
00999 if (msgName.length() == 0) {
01000 msgName += "unknown message ";
01001 msgName += msg_type;
01002 msgName += "\n";
01003 }
01004 out << full_prefix << ":" << msgName << "\n";
01005 dg.dump_hex(out, 2);
01006
01007 } else {
01008
01009
01010 do_id = packer.raw_unpack_uint32();
01011 DCClass *dclass = NULL;
01012
01013 #ifdef HAVE_PYTHON
01014 if (_python_repository != (PyObject *)NULL) {
01015 PyObject *doId2do =
01016 PyObject_GetAttrString(_python_repository, "doId2do");
01017 nassertv(doId2do != NULL);
01018
01019 #ifdef USE_PYTHON_2_2_OR_EARLIER
01020 PyObject *doId = PyInt_FromLong(do_id);
01021 #else
01022 PyObject *doId = PyLong_FromUnsignedLong(do_id);
01023 #endif
01024 PyObject *distobj = PyDict_GetItem(doId2do, doId);
01025 Py_DECREF(doId);
01026 Py_DECREF(doId2do);
01027
01028 if (distobj != NULL) {
01029 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
01030 nassertv(dclass_obj != NULL);
01031
01032 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
01033 Py_DECREF(dclass_obj);
01034 nassertv(dclass_this != NULL);
01035
01036 dclass = (DCClass *)PyInt_AsLong(dclass_this);
01037 Py_DECREF(dclass_this);
01038 }
01039 }
01040 #endif // HAVE_PYTHON
01041
01042 int field_id = packer.raw_unpack_uint16();
01043
01044 if (dclass == (DCClass *)NULL) {
01045 out << full_prefix << "update for unknown object " << do_id
01046 << ", field " << field_id << "\n";
01047
01048 } else {
01049 out << full_prefix <<
01050 ":" << dclass->get_name() << "(" << do_id << ").";
01051 DCField *field = dclass->get_field_by_index(field_id);
01052 if (field == (DCField *)NULL) {
01053 out << "unknown field " << field_id << "\n";
01054
01055 } else {
01056 out << field->get_name();
01057 packer.begin_unpack(field);
01058 packer.unpack_and_format(out);
01059 packer.end_unpack();
01060 out << "\n";
01061 }
01062 }
01063 }
01064 }
01065
01066
01067
01068
01069 #ifdef HAVE_PYTHON
01070 #ifdef WANT_NATIVE_NET
01071
01072 bool CConnectionRepository::network_based_reader_and_yielder(PyObject *PycallBackFunction,ClockObject &clock, float returnBy)
01073 {
01074 ReMutexHolder holder(_lock);
01075 while(is_connected())
01076 {
01077 check_datagram_ai(PycallBackFunction);
01078 if(is_connected())
01079 _bdc.Flush();
01080 float currentTime = clock.get_real_time();
01081 float dif_time = returnBy - currentTime;
01082 if(dif_time <= 0.001)
01083 break;
01084 if(is_connected())
01085 _bdc.WaitForNetworkReadEvent(dif_time);
01086 }
01087 return false;
01088 }
01089
01090 bool CConnectionRepository::check_datagram_ai(PyObject *PycallBackFunction)
01091 {
01092 ReMutexHolder holder(_lock);
01093
01094 PyObject *doId2do = NULL;
01095 float startTime =0;
01096 float endTime = 0;
01097
01098 _bdc.Flush();
01099 while (_bdc.GetMessage(_dg))
01100 {
01101 if (get_verbose())
01102 describe_message(nout, "RECV", _dg);
01103
01104 if (_time_warning > 0)
01105 startTime = ClockObject::get_global_clock()->get_real_time();
01106
01107
01108 _di.assign(_dg);
01109 unsigned char wc_cnt = _di.get_uint8();
01110 _msg_channels.clear();
01111 for(unsigned char lp1 = 0; lp1 < wc_cnt; lp1++)
01112 _msg_channels.push_back(_di.get_uint64());
01113
01114 _msg_sender = _di.get_uint64();
01115 _msg_type = _di.get_uint16();
01116
01117 if( _msg_type == STATESERVER_OBJECT_UPDATE_FIELD)
01118 {
01119 if(doId2do == NULL)
01120 {
01121
01122 doId2do =PyObject_GetAttrString(_python_repository, "doId2do");
01123 nassertr(doId2do != NULL, false);
01124 }
01125
01126 if (!handle_update_field_ai(doId2do))
01127 {
01128 Py_XDECREF(doId2do);
01129 if (_time_warning > 0) {
01130 endTime = ClockObject::get_global_clock()->get_real_time();
01131 if ( _time_warning < (endTime - startTime)) {
01132 nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<< (endTime-startTime) << "secs to process\n";
01133 _dg.dump_hex(nout,2);
01134 }
01135 }
01136 return false;
01137 }
01138 }
01139 else
01140 {
01141 PyObject * result = PyEval_CallObject(PycallBackFunction, _python_ai_datagramiterator);
01142 if (PyErr_Occurred())
01143 {
01144 Py_XDECREF(doId2do);
01145 if (_time_warning > 0) {
01146 endTime = ClockObject::get_global_clock()->get_real_time();
01147 if ( _time_warning < (endTime - startTime)) {
01148 nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<< (endTime-startTime) << "secs to process\n";
01149 _dg.dump_hex(nout,2);
01150 }
01151 }
01152 return true;
01153 }
01154 }
01155
01156 if (_time_warning > 0) {
01157 endTime = ClockObject::get_global_clock()->get_real_time();
01158 if ( _time_warning < (endTime - startTime)) {
01159 nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<< (endTime-startTime) << "secs to process\n";
01160 _dg.dump_hex(nout,2);
01161 }
01162 }
01163
01164 }
01165
01166
01167 Py_XDECREF(doId2do);
01168 return false;
01169 }
01170
01171 #endif // #ifdef WANT_NATIVE_NET
01172 #endif // #ifdef HAVE_PYTHON
01173
01174
01175 #ifdef HAVE_PYTHON
01176 #ifdef WANT_NATIVE_NET
01177
01178
01179 bool CConnectionRepository::handle_update_field_ai(PyObject *doId2do)
01180 {
01181 PStatTimer timer(_update_pcollector);
01182 unsigned int do_id = _di.get_uint32();
01183
01184 PyObject *doId = PyLong_FromUnsignedLong(do_id);
01185 PyObject *distobj = PyDict_GetItem(doId2do, doId);
01186 Py_DECREF(doId);
01187
01188 if (distobj != NULL)
01189 {
01190 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
01191 nassertr(dclass_obj != NULL, false);
01192
01193 DCClass *dclass = NULL;
01194 DTOOL_Call_ExtractThisPointerForType(dclass_obj, &Dtool_DCClass, (void **) &dclass);
01195 if(dclass == NULL)
01196 return false;
01197
01198 Py_INCREF(distobj);
01199 dclass->receive_update(distobj, _di);
01200 Py_DECREF(distobj);
01201
01202 if (PyErr_Occurred())
01203 return false;
01204 }
01205 return true;
01206 }
01207
01208 #endif // #ifdef WANT_NATIVE_NET
01209 #endif // #ifdef HAVE_PYTHON