Panda3D
|
00001 // Filename: cConnectionRepository.cxx 00002 // Created by: drose (17May04) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "cConnectionRepository.h" 00016 #include "dcmsgtypes.h" 00017 #include "dcClass.h" 00018 #include "dcPacker.h" 00019 00020 #include "config_distributed.h" 00021 #include "config_downloader.h" 00022 #include "httpChannel.h" 00023 #include "urlSpec.h" 00024 #include "datagramIterator.h" 00025 #include "throw_event.h" 00026 #include "pStatTimer.h" 00027 00028 #ifdef HAVE_PYTHON 00029 #ifndef CPPPARSER 00030 #include "py_panda.h" 00031 IMPORT_THIS struct Dtool_PyTypedObject Dtool_DatagramIterator; 00032 IMPORT_THIS struct Dtool_PyTypedObject Dtool_DCClass; 00033 #endif 00034 #endif 00035 00036 const string CConnectionRepository::_overflow_event_name = "CRDatagramOverflow"; 00037 00038 #ifndef CPPPARSER 00039 PStatCollector CConnectionRepository::_update_pcollector("App:Show code:readerPollTask:Update"); 00040 #endif // CPPPARSER 00041 00042 //////////////////////////////////////////////////////////////////// 00043 // Function: CConnectionRepository::Constructor 00044 // Access: Published 00045 // Description: 00046 //////////////////////////////////////////////////////////////////// 00047 CConnectionRepository:: 00048 CConnectionRepository(bool has_owner_view, bool threaded_net) : 00049 _lock("CConnectionRepository::_lock"), 00050 #ifdef HAVE_PYTHON 00051 _python_repository(NULL), 00052 _python_ai_datagramiterator(NULL), 00053 #endif 00054 #ifdef HAVE_OPENSSL 00055 _http_conn(NULL), 00056 #endif 00057 #ifdef HAVE_NET 00058 _cw(&_qcm, threaded_net ? 1 : 0), 00059 _qcr(&_qcm, threaded_net ? 1 : 0), 00060 #endif 00061 #ifdef WANT_NATIVE_NET 00062 _bdc(4096000,4096000,1400), 00063 _native(false), 00064 #endif 00065 _client_datagram(true), 00066 _handle_datagrams_internally(handle_datagrams_internally), 00067 _simulated_disconnect(false), 00068 _verbose(distributed_cat.is_spam()), 00069 _time_warning(0.0), 00070 // _msg_channels(), 00071 _msg_sender(0), 00072 _msg_type(0), 00073 _has_owner_view(has_owner_view), 00074 _handle_c_updates(true), 00075 _want_message_bundling(true), 00076 _bundling_msgs(0), 00077 _in_quiet_zone(0) 00078 { 00079 #if defined(HAVE_NET) && defined(SIMULATE_NETWORK_DELAY) 00080 if (min_lag != 0.0 || max_lag != 0.0) { 00081 _qcr.start_delay(min_lag, max_lag); 00082 } 00083 #endif 00084 _tcp_header_size = tcp_header_size; 00085 00086 #ifdef HAVE_PYTHON 00087 PyObject * PyDitterator = DTool_CreatePyInstance(&_di,Dtool_DatagramIterator,false,false); 00088 if(PyDitterator != NULL) 00089 _python_ai_datagramiterator = Py_BuildValue("(O)",PyDitterator); 00090 #endif 00091 00092 } 00093 00094 //////////////////////////////////////////////////////////////////// 00095 // Function: CConnectionRepository::Destructor 00096 // Access: Published 00097 // Description: 00098 //////////////////////////////////////////////////////////////////// 00099 CConnectionRepository:: 00100 ~CConnectionRepository() { 00101 disconnect(); 00102 } 00103 00104 //////////////////////////////////////////////////////////////////// 00105 // Function: CConnectionRepository::set_tcp_header_size 00106 // Access: Public 00107 // Description: Sets the header size of TCP packets. At the present, 00108 // legal values for this are 0, 2, or 4; this specifies 00109 // the number of bytes to use encode the datagram length 00110 // at the start of each TCP datagram. Sender and 00111 // receiver must independently agree on this. 00112 //////////////////////////////////////////////////////////////////// 00113 void CConnectionRepository:: 00114 set_tcp_header_size(int tcp_header_size) { 00115 _tcp_header_size = tcp_header_size; 00116 00117 #ifdef HAVE_OPENSSL 00118 if (_http_conn != (SocketStream *)NULL) { 00119 _http_conn->set_tcp_header_size(tcp_header_size); 00120 } 00121 #endif 00122 00123 #ifdef HAVE_NET 00124 _cw.set_tcp_header_size(tcp_header_size); 00125 _qcr.set_tcp_header_size(tcp_header_size); 00126 #endif 00127 } 00128 00129 #ifdef HAVE_OPENSSL 00130 //////////////////////////////////////////////////////////////////// 00131 // Function: CConnectionRepository::set_connection_http 00132 // Access: Published 00133 // Description: Once a connection has been established via the HTTP 00134 // interface, gets the connection and uses it. The 00135 // supplied HTTPChannel object must have a connection 00136 // available via get_connection(). 00137 //////////////////////////////////////////////////////////////////// 00138 void CConnectionRepository:: 00139 set_connection_http(HTTPChannel *channel) { 00140 ReMutexHolder holder(_lock); 00141 00142 disconnect(); 00143 nassertv(channel->is_connection_ready()); 00144 _http_conn = channel->get_connection(); 00145 _http_conn->set_tcp_header_size(_tcp_header_size); 00146 #ifdef SIMULATE_NETWORK_DELAY 00147 if (min_lag != 0.0 || max_lag != 0.0) { 00148 _http_conn->start_delay(min_lag, max_lag); 00149 } 00150 #endif 00151 } 00152 #endif // HAVE_OPENSSL 00153 00154 #ifdef HAVE_OPENSSL 00155 //////////////////////////////////////////////////////////////////// 00156 // Function: CConnectionRepository::get_stream 00157 // Access: Published 00158 // Description: Returns the SocketStream that internally represents 00159 // the already-established HTTP connection. Returns 00160 // NULL if there is no current HTTP connection. 00161 //////////////////////////////////////////////////////////////////// 00162 SocketStream *CConnectionRepository:: 00163 get_stream() { 00164 ReMutexHolder holder(_lock); 00165 00166 return _http_conn; 00167 } 00168 #endif // HAVE_OPENSSL 00169 00170 00171 #ifdef HAVE_NET 00172 //////////////////////////////////////////////////////////////////// 00173 // Function: CConnectionRepository::try_connect_net 00174 // Access: Published 00175 // Description: Uses Panda's "net" library to try to connect to the 00176 // server and port named in the indicated URL. Returns 00177 // true if successful, false otherwise. 00178 //////////////////////////////////////////////////////////////////// 00179 bool CConnectionRepository:: 00180 try_connect_net(const URLSpec &url) { 00181 ReMutexHolder holder(_lock); 00182 00183 disconnect(); 00184 00185 _net_conn = 00186 _qcm.open_TCP_client_connection(url.get_server(), url.get_port(), 00187 game_server_timeout_ms); 00188 00189 if (_net_conn != (Connection *)NULL) { 00190 _net_conn->set_no_delay(true); 00191 _qcr.add_connection(_net_conn); 00192 return true; 00193 } 00194 00195 return false; 00196 } 00197 #endif // HAVE_NET 00198 00199 #ifdef WANT_NATIVE_NET 00200 //////////////////////////////////////////////////////////////////// 00201 // Function: CConnectionRepository::connect_native 00202 // Access: Published 00203 // Description: Connects to the server using Panda's low-level and 00204 // fast "native net" library. 00205 //////////////////////////////////////////////////////////////////// 00206 bool CConnectionRepository:: 00207 connect_native(const URLSpec &url) { 00208 ReMutexHolder holder(_lock); 00209 00210 _native=true; 00211 Socket_Address addr; 00212 addr.set_host(url.get_server(),url.get_port()); 00213 _bdc.ClearAddresses(); 00214 _bdc.AddAddress(addr); 00215 return _bdc.DoConnect(); 00216 } 00217 00218 #endif //WANT NATIVE NET 00219 00220 #ifdef SIMULATE_NETWORK_DELAY 00221 //////////////////////////////////////////////////////////////////// 00222 // Function: CConnectionRepository::start_delay 00223 // Access: Published 00224 // Description: Enables a simulated network latency. All datagrams 00225 // received from this point on will be held for a random 00226 // interval of least min_delay seconds, and no more than 00227 // max_delay seconds, before being visible. It is as if 00228 // datagrams suddenly took much longer to arrive. 00229 // 00230 // This should *only* be called if the underlying socket 00231 // is non-blocking. If you call this on a blocking 00232 // socket, it will force all datagrams to be held up 00233 // until the socket closes. 00234 // 00235 // This has no effect if the connection method is via 00236 // the "native net" library. 00237 //////////////////////////////////////////////////////////////////// 00238 void CConnectionRepository:: 00239 start_delay(double min_delay, double max_delay) { 00240 ReMutexHolder holder(_lock); 00241 00242 if (min_delay != 0.0 || max_delay != 0.0) { 00243 #ifdef HAVE_NET 00244 _qcr.start_delay(min_delay, max_delay); 00245 #endif // HAVE_NET 00246 #ifdef HAVE_OPENSSL 00247 if (_http_conn != (SocketStream *)NULL) { 00248 _http_conn->start_delay(min_delay, max_delay); 00249 } 00250 #endif // HAVE_OPENSSL 00251 } else { 00252 stop_delay(); 00253 } 00254 } 00255 #endif // SIMULATE_NETWORK_DELAY 00256 00257 #ifdef SIMULATE_NETWORK_DELAY 00258 //////////////////////////////////////////////////////////////////// 00259 // Function: CConnectionRepository::stop_delay 00260 // Access: Published 00261 // Description: Disables the simulated network latency started by a 00262 // previous call to start_delay(). Datagrams will once 00263 // again be visible as soon as they are received. 00264 //////////////////////////////////////////////////////////////////// 00265 void CConnectionRepository:: 00266 stop_delay() { 00267 ReMutexHolder holder(_lock); 00268 00269 #ifdef HAVE_NET 00270 _qcr.stop_delay(); 00271 #endif // HAVE_NET 00272 #ifdef HAVE_OPENSSL 00273 if (_http_conn != (SocketStream *)NULL) { 00274 _http_conn->stop_delay(); 00275 } 00276 #endif // HAVE_OPENSSL 00277 } 00278 #endif // SIMULATE_NETWORK_DELAY 00279 00280 //////////////////////////////////////////////////////////////////// 00281 // Function: CConnectionRepository::check_datagram 00282 // Access: Published 00283 // Description: Returns true if a new datagram is available, false 00284 // otherwise. If the return value is true, the new 00285 // datagram may be retrieved via get_datagram(), or 00286 // preferably, with get_datagram_iterator() and 00287 // get_msg_type(). 00288 //////////////////////////////////////////////////////////////////// 00289 bool CConnectionRepository:: 00290 check_datagram() { 00291 ReMutexHolder holder(_lock); 00292 00293 if (_simulated_disconnect) { 00294 return false; 00295 } 00296 #ifdef WANT_NATIVE_NET 00297 if(_native) 00298 _bdc.Flush(); 00299 #endif //WANT_NATIVE_NET 00300 00301 while (do_check_datagram()) { 00302 if (get_verbose()) { 00303 describe_message(nout, "RECV", _dg); 00304 } 00305 00306 // Start breaking apart the datagram. 00307 _di = DatagramIterator(_dg); 00308 00309 if (!_client_datagram) { 00310 unsigned char wc_cnt; 00311 wc_cnt = _di.get_uint8(); 00312 _msg_channels.clear(); 00313 for (unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) { 00314 CHANNEL_TYPE schan = _di.get_uint64(); 00315 _msg_channels.push_back(schan); 00316 } 00317 _msg_sender = _di.get_uint64(); 00318 00319 #ifdef HAVE_PYTHON 00320 // For now, we need to stuff this field onto the Python 00321 // structure, to support legacy code that expects to find it 00322 // there. 00323 if (_python_repository != (PyObject *)NULL) { 00324 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00325 PyGILState_STATE gstate; 00326 gstate = PyGILState_Ensure(); 00327 #endif 00328 PyObject *value = PyLong_FromUnsignedLongLong(_msg_sender); 00329 PyObject_SetAttrString(_python_repository, "msgSender", value); 00330 Py_DECREF(value); 00331 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00332 PyGILState_Release(gstate); 00333 #endif 00334 } 00335 #endif // HAVE_PYTHON 00336 } 00337 00338 _msg_type = _di.get_uint16(); 00339 // Is this a message that we can process directly? 00340 if (!_handle_datagrams_internally) { 00341 return true; 00342 } 00343 00344 switch (_msg_type) { 00345 #ifdef HAVE_PYTHON 00346 case CLIENT_OBJECT_UPDATE_FIELD: 00347 case STATESERVER_OBJECT_UPDATE_FIELD: 00348 if (_handle_c_updates) { 00349 if (_has_owner_view) { 00350 if (!handle_update_field_owner()) { 00351 return false; 00352 } 00353 } else { 00354 if (!handle_update_field()) { 00355 return false; 00356 } 00357 } 00358 } else { 00359 // Let the caller (Python) deal with this update. 00360 return true; 00361 } 00362 break; 00363 #endif // HAVE_PYTHON 00364 00365 default: 00366 // Some unknown message; let the caller deal with it. 00367 return true; 00368 } 00369 } 00370 00371 // No datagrams available. 00372 return false; 00373 } 00374 00375 00376 00377 00378 //////////////////////////////////////////////////////////////////// 00379 // Function: CConnectionRepository::is_connected 00380 // Access: Published 00381 // Description: Returns true if the connection to the gameserver is 00382 // established and still good, false if we are not 00383 // connected. A false value means either (a) we never 00384 // successfully connected, (b) we explicitly called 00385 // disconnect(), or (c) we were connected, but the 00386 // connection was spontaneously lost. 00387 //////////////////////////////////////////////////////////////////// 00388 bool CConnectionRepository:: 00389 is_connected() { 00390 ReMutexHolder holder(_lock); 00391 00392 #ifdef WANT_NATIVE_NET 00393 if(_native) 00394 return (_bdc.IsConnected()); 00395 #endif 00396 00397 #ifdef HAVE_NET 00398 if (_net_conn) { 00399 if (_qcm.reset_connection_available()) { 00400 PT(Connection) reset_connection; 00401 if (_qcm.get_reset_connection(reset_connection)) { 00402 _qcm.close_connection(reset_connection); 00403 if (reset_connection == _net_conn) { 00404 // Whoops, lost our connection. 00405 _net_conn = NULL; 00406 return false; 00407 } 00408 } 00409 } 00410 return true; 00411 } 00412 #endif // HAVE_NET 00413 00414 #ifdef HAVE_OPENSSL 00415 if (_http_conn) { 00416 if (!_http_conn->is_closed()) { 00417 return true; 00418 } 00419 00420 // Connection lost. 00421 delete _http_conn; 00422 _http_conn = NULL; 00423 } 00424 #endif // HAVE_OPENSSL 00425 00426 return false; 00427 } 00428 00429 //////////////////////////////////////////////////////////////////// 00430 // Function: CConnectionRepository::send_datagram 00431 // Access: Published 00432 // Description: Queues the indicated datagram for sending to the 00433 // server. It may not get sent immediately if 00434 // collect_tcp is in effect; call flush() to guarantee 00435 // it is sent now. 00436 //////////////////////////////////////////////////////////////////// 00437 bool CConnectionRepository:: 00438 send_datagram(const Datagram &dg) { 00439 ReMutexHolder holder(_lock); 00440 00441 if (_simulated_disconnect) { 00442 distributed_cat.warning() 00443 << "Unable to send datagram during simulated disconnect.\n"; 00444 return false; 00445 } 00446 00447 if (get_verbose()) { 00448 describe_message(nout, "SEND", dg); 00449 } 00450 00451 if (is_bundling_messages() && get_want_message_bundling()) { 00452 bundle_msg(dg); 00453 return true; 00454 } 00455 00456 #ifdef WANT_NATIVE_NET 00457 if(_native) 00458 return _bdc.SendMessage(dg); 00459 #endif 00460 00461 #ifdef HAVE_NET 00462 if (_net_conn) { 00463 _cw.send(dg, _net_conn); 00464 return true; 00465 } 00466 #endif // HAVE_NET 00467 00468 #ifdef HAVE_OPENSSL 00469 if (_http_conn) { 00470 if (!_http_conn->send_datagram(dg)) { 00471 distributed_cat.warning() 00472 << "Could not send datagram.\n"; 00473 return false; 00474 } 00475 00476 return true; 00477 } 00478 #endif // HAVE_OPENSSL 00479 00480 distributed_cat.warning() 00481 << "Unable to send datagram after connection is closed.\n"; 00482 return false; 00483 } 00484 00485 //////////////////////////////////////////////////////////////////// 00486 // Function: CConnectionRepository::start_message_bundle 00487 // Access: Published 00488 // Description: Send a set of messages to the state server that will 00489 // be processed atomically. For instance, you can do a 00490 // combined setLocation/setPos and prevent race 00491 // conditions where clients briefly get the setLocation 00492 // but not the setPos, because the state server hasn't 00493 // processed the setPos yet 00494 //////////////////////////////////////////////////////////////////// 00495 void CConnectionRepository:: 00496 start_message_bundle() { 00497 ReMutexHolder holder(_lock); 00498 00499 // store up network messages until sendMessageBundle is called 00500 // all updates in between must be sent from the same doId (updates 00501 // must all affect the same DistributedObject) 00502 // it is an error to call this again before calling sendMessageBundle 00503 if (get_verbose()) { 00504 nout << "CR::SEND:BUNDLE_START(" << _bundling_msgs << ")" << endl; 00505 } 00506 if (_bundling_msgs == 0) { 00507 _bundle_msgs.clear(); 00508 } 00509 ++_bundling_msgs; 00510 } 00511 00512 //////////////////////////////////////////////////////////////////// 00513 // Function: CConnectionRepository::send_message_bundle 00514 // Access: Published 00515 // Description: Send network messages queued up since 00516 // startMessageBundle was called. 00517 //////////////////////////////////////////////////////////////////// 00518 void CConnectionRepository:: 00519 send_message_bundle(unsigned int channel, unsigned int sender_channel) { 00520 ReMutexHolder holder(_lock); 00521 nassertv(_bundling_msgs); 00522 00523 --_bundling_msgs; 00524 00525 if (get_verbose()) { 00526 nout << "CR::SEND:BUNDLE_FINISH(" << _bundling_msgs << ")" << endl; 00527 } 00528 00529 // if _bundling_msgs ref count is zero, send the bundle out 00530 if (_bundling_msgs == 0 && get_want_message_bundling()) { 00531 Datagram dg; 00532 // add server header (see PyDatagram.addServerHeader) 00533 dg.add_int8(1); 00534 dg.add_uint64(channel); 00535 dg.add_uint64(sender_channel); 00536 dg.add_uint16(STATESERVER_BOUNCE_MESSAGE); 00537 // add each bundled message 00538 BundledMsgVector::const_iterator bmi; 00539 for (bmi = _bundle_msgs.begin(); bmi != _bundle_msgs.end(); bmi++) { 00540 dg.add_string(*bmi); 00541 } 00542 00543 send_datagram(dg); 00544 } 00545 } 00546 00547 //////////////////////////////////////////////////////////////////// 00548 // Function: CConnectionRepository::abandon_message_bundles 00549 // Access: Published 00550 // Description: throw out any msgs that have been queued up for 00551 // message bundles 00552 //////////////////////////////////////////////////////////////////// 00553 void CConnectionRepository:: 00554 abandon_message_bundles() { 00555 ReMutexHolder holder(_lock); 00556 00557 nassertv(is_bundling_messages()); 00558 _bundling_msgs = 0; 00559 _bundle_msgs.clear(); 00560 } 00561 00562 //////////////////////////////////////////////////////////////////// 00563 // Function: CConnectionRepository::bundle_msg 00564 // Access: Published 00565 // Description: 00566 //////////////////////////////////////////////////////////////////// 00567 void CConnectionRepository:: 00568 bundle_msg(const Datagram &dg) { 00569 ReMutexHolder holder(_lock); 00570 00571 nassertv(is_bundling_messages()); 00572 _bundle_msgs.push_back(dg.get_message()); 00573 } 00574 00575 //////////////////////////////////////////////////////////////////// 00576 // Function: CConnectionRepository::consider_flush 00577 // Access: Published 00578 // Description: Sends the most recently queued data if enough time 00579 // has elapsed. This only has meaning if 00580 // set_collect_tcp() has been set to true. 00581 //////////////////////////////////////////////////////////////////// 00582 bool CConnectionRepository:: 00583 consider_flush() { 00584 ReMutexHolder holder(_lock); 00585 00586 if (_simulated_disconnect) { 00587 return false; 00588 } 00589 00590 #ifdef WANT_NATIVE_NET 00591 if(_native) 00592 return true; //Maybe we should just flush here for now? 00593 #endif 00594 00595 #ifdef HAVE_NET 00596 if (_net_conn) { 00597 return _net_conn->consider_flush(); 00598 } 00599 #endif // HAVE_NET 00600 00601 #ifdef HAVE_OPENSSL 00602 if (_http_conn) { 00603 return _http_conn->consider_flush(); 00604 } 00605 #endif // HAVE_OPENSSL 00606 00607 return false; 00608 } 00609 00610 //////////////////////////////////////////////////////////////////// 00611 // Function: CConnectionRepository::flush 00612 // Access: Published 00613 // Description: Sends the most recently queued data now. This only 00614 // has meaning if set_collect_tcp() has been set to 00615 // true. 00616 //////////////////////////////////////////////////////////////////// 00617 bool CConnectionRepository:: 00618 flush() { 00619 ReMutexHolder holder(_lock); 00620 00621 if (_simulated_disconnect) { 00622 return false; 00623 } 00624 #ifdef WANT_NATIVE_NET 00625 if(_native) 00626 return _bdc.Flush(); 00627 #endif 00628 00629 #ifdef HAVE_NET 00630 if (_net_conn) { 00631 return _net_conn->flush(); 00632 } 00633 #endif // HAVE_NET 00634 00635 #ifdef HAVE_OPENSSL 00636 if (_http_conn) { 00637 return _http_conn->flush(); 00638 } 00639 #endif // HAVE_OPENSSL 00640 00641 return false; 00642 } 00643 00644 //////////////////////////////////////////////////////////////////// 00645 // Function: CConnectionRepository::disconnect 00646 // Access: Published 00647 // Description: Closes the connection to the server. 00648 //////////////////////////////////////////////////////////////////// 00649 void CConnectionRepository:: 00650 disconnect() { 00651 ReMutexHolder holder(_lock); 00652 00653 #ifdef WANT_NATIVE_NET 00654 if(_native) { 00655 _bdc.Reset(); 00656 _bdc.ClearAddresses(); 00657 } 00658 #endif 00659 #ifdef HAVE_NET 00660 if (_net_conn) { 00661 _qcm.close_connection(_net_conn); 00662 _net_conn = NULL; 00663 } 00664 #endif // HAVE_NET 00665 00666 #ifdef HAVE_OPENSSL 00667 if (_http_conn) { 00668 _http_conn->close(); 00669 delete _http_conn; 00670 _http_conn = NULL; 00671 } 00672 #endif // HAVE_OPENSSL 00673 00674 _simulated_disconnect = false; 00675 } 00676 00677 //////////////////////////////////////////////////////////////////// 00678 // Function: CConnectionRepository::shutdown 00679 // Access: Published 00680 // Description: May be called at application shutdown to ensure all 00681 // threads are cleaned up. 00682 //////////////////////////////////////////////////////////////////// 00683 void CConnectionRepository:: 00684 shutdown() { 00685 disconnect(); 00686 00687 #ifdef HAVE_NET 00688 _cw.shutdown(); 00689 _qcr.shutdown(); 00690 #endif // HAVE_NET 00691 } 00692 00693 //////////////////////////////////////////////////////////////////// 00694 // Function: CConnectionRepository::do_check_datagram 00695 // Access: Private 00696 // Description: The private implementation of check_datagram(), this 00697 // gets one datagram if it is available. 00698 //////////////////////////////////////////////////////////////////// 00699 bool CConnectionRepository:: 00700 do_check_datagram() { 00701 #ifdef WANT_NATIVE_NET 00702 if(_native) { 00703 return _bdc.GetMessage(_dg); 00704 } 00705 #endif 00706 #ifdef HAVE_NET 00707 if (_net_conn) { 00708 _net_conn->consider_flush(); 00709 if (_qcr.get_overflow_flag()) { 00710 throw_event(get_overflow_event_name()); 00711 _qcr.reset_overflow_flag(); 00712 } 00713 return (_qcr.data_available() && _qcr.get_data(_dg)); 00714 } 00715 #endif // HAVE_NET 00716 00717 #ifdef HAVE_OPENSSL 00718 if (_http_conn) { 00719 _http_conn->consider_flush(); 00720 return _http_conn->receive_datagram(_dg); 00721 } 00722 #endif // HAVE_OPENSSL 00723 00724 00725 return false; 00726 } 00727 00728 //////////////////////////////////////////////////////////////////// 00729 // Function: CConnectionRepository::handle_update_field 00730 // Access: Private 00731 // Description: Directly handles an update message on a field. 00732 // Python never touches the datagram; it just gets its 00733 // distributed method called with the appropriate 00734 // parameters. Returns true if everything is ok, false 00735 // if there was an error processing the field's update 00736 // method. 00737 //////////////////////////////////////////////////////////////////// 00738 bool CConnectionRepository:: 00739 handle_update_field() { 00740 #ifdef HAVE_PYTHON 00741 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00742 PyGILState_STATE gstate; 00743 gstate = PyGILState_Ensure(); 00744 #endif 00745 00746 PStatTimer timer(_update_pcollector); 00747 unsigned int do_id = _di.get_uint32(); 00748 if (_python_repository != (PyObject *)NULL) 00749 { 00750 PyObject *doId2do = 00751 PyObject_GetAttrString(_python_repository, "doId2do"); 00752 nassertr(doId2do != NULL, false); 00753 00754 #ifdef USE_PYTHON_2_2_OR_EARLIER 00755 PyObject *doId = PyInt_FromLong(do_id); 00756 #else 00757 PyObject *doId = PyLong_FromUnsignedLong(do_id); 00758 #endif 00759 PyObject *distobj = PyDict_GetItem(doId2do, doId); 00760 Py_DECREF(doId); 00761 Py_DECREF(doId2do); 00762 00763 if (distobj != NULL) { 00764 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass"); 00765 nassertr(dclass_obj != NULL, false); 00766 00767 00768 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this"); 00769 Py_DECREF(dclass_obj); 00770 nassertr(dclass_this != NULL, false); 00771 00772 DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this); 00773 Py_DECREF(dclass_this); 00774 00775 // If in quiet zone mode, throw update away unless distobj 00776 // has 'neverDisable' attribute set to non-zero 00777 if (_in_quiet_zone) { 00778 PyObject *neverDisable = PyObject_GetAttrString(distobj, "neverDisable"); 00779 nassertr(neverDisable != NULL, false); 00780 00781 unsigned int cNeverDisable = PyInt_AsLong(neverDisable); 00782 if (!cNeverDisable) { 00783 // in quiet zone and distobj is disable-able 00784 // drop update on the floor 00785 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00786 PyGILState_Release(gstate); 00787 #endif 00788 return true; 00789 } 00790 } 00791 00792 // It's a good idea to ensure the reference count to distobj is 00793 // raised while we call the update method--otherwise, the update 00794 // method might get into trouble if it tried to delete the 00795 // object from the doId2do map. 00796 Py_INCREF(distobj); 00797 dclass->receive_update(distobj, _di); 00798 Py_DECREF(distobj); 00799 00800 if (PyErr_Occurred()) { 00801 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00802 PyGILState_Release(gstate); 00803 #endif 00804 return false; 00805 } 00806 } 00807 00808 } 00809 00810 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00811 PyGILState_Release(gstate); 00812 #endif 00813 #endif // HAVE_PYTHON 00814 return true; 00815 } 00816 00817 00818 //////////////////////////////////////////////////////////////////// 00819 // Function: CConnectionRepository::handle_update_field_owner 00820 // Access: Private 00821 // Description: Directly handles an update message on a field. 00822 // Supports 'owner' views of objects, separate from 'visible' 00823 // view, and forwards fields to the appropriate view(s) based 00824 // on DC flags. Python never touches the datagram; it just 00825 // gets its distributed method called with the appropriate 00826 // parameters. Returns true if everything is ok, false if 00827 // there was an error processing the field's update method. 00828 //////////////////////////////////////////////////////////////////// 00829 bool CConnectionRepository:: 00830 handle_update_field_owner() { 00831 #ifdef HAVE_PYTHON 00832 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00833 PyGILState_STATE gstate; 00834 gstate = PyGILState_Ensure(); 00835 #endif 00836 00837 PStatTimer timer(_update_pcollector); 00838 unsigned int do_id = _di.get_uint32(); 00839 if (_python_repository != (PyObject *)NULL) { 00840 PyObject *doId2do = 00841 PyObject_GetAttrString(_python_repository, "doId2do"); 00842 nassertr(doId2do != NULL, false); 00843 00844 PyObject *doId2ownerView = 00845 PyObject_GetAttrString(_python_repository, "doId2ownerView"); 00846 nassertr(doId2ownerView != NULL, false); 00847 00848 #ifdef USE_PYTHON_2_2_OR_EARLIER 00849 PyObject *doId = PyInt_FromLong(do_id); 00850 #else 00851 PyObject *doId = PyLong_FromUnsignedLong(do_id); 00852 #endif 00853 00854 // pass the update to the owner view first 00855 PyObject *distobjOV = PyDict_GetItem(doId2ownerView, doId); 00856 Py_DECREF(doId2ownerView); 00857 00858 if (distobjOV != NULL) { 00859 PyObject *dclass_obj = PyObject_GetAttrString(distobjOV, "dclass"); 00860 nassertr(dclass_obj != NULL, false); 00861 00862 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this"); 00863 Py_DECREF(dclass_obj); 00864 nassertr(dclass_this != NULL, false); 00865 00866 DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this); 00867 Py_DECREF(dclass_this); 00868 00869 // check if we should forward this update to the owner view 00870 DCPacker packer; 00871 packer.set_unpack_data(_di.get_remaining_bytes()); 00872 int field_id = packer.raw_unpack_uint16(); 00873 DCField *field = dclass->get_field_by_index(field_id); 00874 if (field->is_ownrecv()) { 00875 // It's a good idea to ensure the reference count to distobjOV is 00876 // raised while we call the update method--otherwise, the update 00877 // method might get into trouble if it tried to delete the 00878 // object from the doId2do map. 00879 Py_INCREF(distobjOV); 00880 // make a copy of the datagram iterator so that we can use the main 00881 // iterator for the non-owner update 00882 DatagramIterator _odi(_di); 00883 dclass->receive_update(distobjOV, _odi); 00884 Py_DECREF(distobjOV); 00885 00886 if (PyErr_Occurred()) { 00887 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00888 PyGILState_Release(gstate); 00889 #endif 00890 return false; 00891 } 00892 } 00893 } 00894 00895 // now pass the update to the visible view 00896 PyObject *distobj = PyDict_GetItem(doId2do, doId); 00897 Py_DECREF(doId); 00898 Py_DECREF(doId2do); 00899 00900 if (distobj != NULL) { 00901 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass"); 00902 nassertr(dclass_obj != NULL, false); 00903 00904 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this"); 00905 Py_DECREF(dclass_obj); 00906 nassertr(dclass_this != NULL, false); 00907 00908 DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this); 00909 Py_DECREF(dclass_this); 00910 00911 // check if we should forward this update to the owner view 00912 DCPacker packer; 00913 packer.set_unpack_data(_di.get_remaining_bytes()); 00914 int field_id = packer.raw_unpack_uint16(); 00915 DCField *field = dclass->get_field_by_index(field_id); 00916 if (true) {//field->is_broadcast()) { 00917 // It's a good idea to ensure the reference count to distobj is 00918 // raised while we call the update method--otherwise, the update 00919 // method might get into trouble if it tried to delete the 00920 // object from the doId2do map. 00921 Py_INCREF(distobj); 00922 dclass->receive_update(distobj, _di); 00923 Py_DECREF(distobj); 00924 00925 if (PyErr_Occurred()) { 00926 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00927 PyGILState_Release(gstate); 00928 #endif 00929 return false; 00930 } 00931 } 00932 } 00933 } 00934 00935 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 00936 PyGILState_Release(gstate); 00937 #endif 00938 #endif // HAVE_PYTHON 00939 00940 return true; 00941 } 00942 00943 //////////////////////////////////////////////////////////////////// 00944 // Function: CConnectionRepository::describe_message 00945 // Access: Private 00946 // Description: Unpacks the message and reformats it for user 00947 // consumption, writing a description on the indicated 00948 // output stream. 00949 //////////////////////////////////////////////////////////////////// 00950 void CConnectionRepository:: 00951 describe_message(ostream &out, const string &prefix, 00952 const Datagram &dg) const { 00953 DCPacker packer; 00954 00955 packer.set_unpack_data(dg.get_message()); 00956 CHANNEL_TYPE do_id; 00957 int msg_type; 00958 bool is_update = false; 00959 string full_prefix = "CR::" + prefix; 00960 00961 if (!_client_datagram) 00962 { 00963 unsigned char mcnt = packer.raw_unpack_uint8(); 00964 for( ;mcnt > 0; mcnt--) 00965 packer.RAW_UNPACK_CHANNEL(); // msg_channel 00966 00967 packer.RAW_UNPACK_CHANNEL(); // msg_sender 00968 msg_type = packer.raw_unpack_uint16(); 00969 is_update = (msg_type == STATESERVER_OBJECT_UPDATE_FIELD); 00970 00971 } else { 00972 msg_type = packer.raw_unpack_uint16(); 00973 is_update = (msg_type == CLIENT_OBJECT_UPDATE_FIELD); 00974 } 00975 00976 if (!is_update) { 00977 // figure out the name of the message 00978 // TODO: print out the arguments to the message 00979 string msgName; 00980 00981 #ifdef HAVE_PYTHON 00982 if (_python_repository != (PyObject *)NULL) { 00983 PyObject *msgId = PyLong_FromLong(msg_type); 00984 nassertv(msgId != NULL); 00985 PyObject *methodName = PyString_FromString("_getMsgName"); 00986 nassertv(methodName != NULL); 00987 00988 PyObject *result = PyObject_CallMethodObjArgs(_python_repository, methodName, 00989 msgId, NULL); 00990 nassertv(result != NULL); 00991 00992 msgName += string(PyString_AsString(result)); 00993 00994 Py_DECREF(methodName); 00995 Py_DECREF(msgId); 00996 Py_DECREF(result); 00997 } 00998 #endif 00999 if (msgName.length() == 0) { 01000 msgName += "unknown message "; 01001 msgName += msg_type; 01002 msgName += "\n"; 01003 } 01004 out << full_prefix << ":" << msgName << "\n"; 01005 dg.dump_hex(out, 2); 01006 01007 } else { 01008 // It's an update message. Figure out what dclass the object is 01009 // based on its doId, so we can decode the rest of the message. 01010 do_id = packer.raw_unpack_uint32(); 01011 DCClass *dclass = NULL; 01012 01013 #ifdef HAVE_PYTHON 01014 if (_python_repository != (PyObject *)NULL) { 01015 PyObject *doId2do = 01016 PyObject_GetAttrString(_python_repository, "doId2do"); 01017 nassertv(doId2do != NULL); 01018 01019 #ifdef USE_PYTHON_2_2_OR_EARLIER 01020 PyObject *doId = PyInt_FromLong(do_id); 01021 #else 01022 PyObject *doId = PyLong_FromUnsignedLong(do_id); 01023 #endif 01024 PyObject *distobj = PyDict_GetItem(doId2do, doId); 01025 Py_DECREF(doId); 01026 Py_DECREF(doId2do); 01027 01028 if (distobj != NULL) { 01029 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass"); 01030 nassertv(dclass_obj != NULL); 01031 01032 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this"); 01033 Py_DECREF(dclass_obj); 01034 nassertv(dclass_this != NULL); 01035 01036 dclass = (DCClass *)PyInt_AsLong(dclass_this); 01037 Py_DECREF(dclass_this); 01038 } 01039 } 01040 #endif // HAVE_PYTHON 01041 01042 int field_id = packer.raw_unpack_uint16(); 01043 01044 if (dclass == (DCClass *)NULL) { 01045 out << full_prefix << "update for unknown object " << do_id 01046 << ", field " << field_id << "\n"; 01047 01048 } else { 01049 out << full_prefix << 01050 ":" << dclass->get_name() << "(" << do_id << ")."; 01051 DCField *field = dclass->get_field_by_index(field_id); 01052 if (field == (DCField *)NULL) { 01053 out << "unknown field " << field_id << "\n"; 01054 01055 } else { 01056 out << field->get_name(); 01057 packer.begin_unpack(field); 01058 packer.unpack_and_format(out); 01059 packer.end_unpack(); 01060 out << "\n"; 01061 } 01062 } 01063 } 01064 } 01065 01066 01067 01068 01069 #ifdef HAVE_PYTHON 01070 #ifdef WANT_NATIVE_NET 01071 01072 bool CConnectionRepository::network_based_reader_and_yielder(PyObject *PycallBackFunction,ClockObject &clock, float returnBy) 01073 { 01074 ReMutexHolder holder(_lock); 01075 while(is_connected()) 01076 { 01077 check_datagram_ai(PycallBackFunction); 01078 if(is_connected()) 01079 _bdc.Flush(); 01080 float currentTime = clock.get_real_time(); 01081 float dif_time = returnBy - currentTime; 01082 if(dif_time <= 0.001) // to avoi over runs.. 01083 break; 01084 if(is_connected()) 01085 _bdc.WaitForNetworkReadEvent(dif_time); 01086 } 01087 return false; 01088 } 01089 01090 bool CConnectionRepository::check_datagram_ai(PyObject *PycallBackFunction) 01091 { 01092 ReMutexHolder holder(_lock); 01093 // these could be static .. not 01094 PyObject *doId2do = NULL; 01095 float startTime =0; 01096 float endTime = 0; 01097 // this seems weird...here 01098 _bdc.Flush(); 01099 while (_bdc.GetMessage(_dg)) 01100 { 01101 if (get_verbose()) 01102 describe_message(nout, "RECV", _dg); 01103 01104 if (_time_warning > 0) 01105 startTime = ClockObject::get_global_clock()->get_real_time(); 01106 01107 // Start breaking apart the datagram. 01108 _di.assign(_dg); 01109 unsigned char wc_cnt = _di.get_uint8(); 01110 _msg_channels.clear(); 01111 for(unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) 01112 _msg_channels.push_back(_di.get_uint64()); 01113 01114 _msg_sender = _di.get_uint64(); 01115 _msg_type = _di.get_uint16(); 01116 01117 if( _msg_type == STATESERVER_OBJECT_UPDATE_FIELD) 01118 { 01119 if(doId2do == NULL) 01120 { 01121 // this is my attemp to take it out of the inner loop RHH 01122 doId2do =PyObject_GetAttrString(_python_repository, "doId2do"); 01123 nassertr(doId2do != NULL, false); 01124 } 01125 01126 if (!handle_update_field_ai(doId2do)) 01127 { 01128 Py_XDECREF(doId2do); 01129 if (_time_warning > 0) { 01130 endTime = ClockObject::get_global_clock()->get_real_time(); 01131 if ( _time_warning < (endTime - startTime)) { 01132 nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<< (endTime-startTime) << "secs to process\n"; 01133 _dg.dump_hex(nout,2); 01134 } 01135 } 01136 return false; 01137 } 01138 } 01139 else 01140 { 01141 PyObject * result = PyEval_CallObject(PycallBackFunction, _python_ai_datagramiterator); 01142 if (PyErr_Occurred()) 01143 { 01144 Py_XDECREF(doId2do); 01145 if (_time_warning > 0) { 01146 endTime = ClockObject::get_global_clock()->get_real_time(); 01147 if ( _time_warning < (endTime - startTime)) { 01148 nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<< (endTime-startTime) << "secs to process\n"; 01149 _dg.dump_hex(nout,2); 01150 } 01151 } 01152 return true; 01153 } 01154 } 01155 01156 if (_time_warning > 0) { 01157 endTime = ClockObject::get_global_clock()->get_real_time(); 01158 if ( _time_warning < (endTime - startTime)) { 01159 nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<< (endTime-startTime) << "secs to process\n"; 01160 _dg.dump_hex(nout,2); 01161 } 01162 } 01163 01164 } 01165 01166 01167 Py_XDECREF(doId2do); 01168 return false; 01169 } 01170 01171 #endif // #ifdef WANT_NATIVE_NET 01172 #endif // #ifdef HAVE_PYTHON 01173 01174 01175 #ifdef HAVE_PYTHON 01176 #ifdef WANT_NATIVE_NET 01177 01178 01179 bool CConnectionRepository::handle_update_field_ai(PyObject *doId2do) 01180 { 01181 PStatTimer timer(_update_pcollector); 01182 unsigned int do_id = _di.get_uint32(); 01183 01184 PyObject *doId = PyLong_FromUnsignedLong(do_id); 01185 PyObject *distobj = PyDict_GetItem(doId2do, doId); 01186 Py_DECREF(doId); 01187 01188 if (distobj != NULL) 01189 { 01190 PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass"); 01191 nassertr(dclass_obj != NULL, false); 01192 01193 DCClass *dclass = NULL; 01194 DTOOL_Call_ExtractThisPointerForType(dclass_obj, &Dtool_DCClass, (void **) &dclass); 01195 if(dclass == NULL) 01196 return false; 01197 01198 Py_INCREF(distobj); 01199 dclass->receive_update(distobj, _di); 01200 Py_DECREF(distobj); 01201 01202 if (PyErr_Occurred()) 01203 return false; 01204 } 01205 return true; 01206 } 01207 01208 #endif // #ifdef WANT_NATIVE_NET 01209 #endif // #ifdef HAVE_PYTHON