Panda3D
 All Classes Functions Variables Enumerations
cConnectionRepository.cxx
00001 // Filename: cConnectionRepository.cxx
00002 // Created by:  drose (17May04)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "cConnectionRepository.h"
00016 #include "dcmsgtypes.h"
00017 #include "dcClass.h"
00018 #include "dcPacker.h"
00019 
00020 #include "config_distributed.h"
00021 #include "config_downloader.h"
00022 #include "httpChannel.h"
00023 #include "urlSpec.h"
00024 #include "datagramIterator.h"
00025 #include "throw_event.h"
00026 #include "pStatTimer.h"
00027 
00028 #ifdef HAVE_PYTHON
00029 #ifndef CPPPARSER
00030 #include "py_panda.h"  
00031 IMPORT_THIS struct   Dtool_PyTypedObject Dtool_DatagramIterator;
00032 IMPORT_THIS struct   Dtool_PyTypedObject Dtool_DCClass;
00033 #endif
00034 #endif
00035 
00036 const string CConnectionRepository::_overflow_event_name = "CRDatagramOverflow";
00037 
00038 #ifndef CPPPARSER
00039 PStatCollector CConnectionRepository::_update_pcollector("App:Show code:readerPollTask:Update");
00040 #endif  // CPPPARSER
00041 
00042 ////////////////////////////////////////////////////////////////////
00043 //     Function: CConnectionRepository::Constructor
00044 //       Access: Published
00045 //  Description: 
00046 ////////////////////////////////////////////////////////////////////
00047 CConnectionRepository::
00048 CConnectionRepository(bool has_owner_view, bool threaded_net) :
00049   _lock("CConnectionRepository::_lock"),
00050 #ifdef HAVE_PYTHON
00051   _python_repository(NULL),
00052   _python_ai_datagramiterator(NULL),
00053 #endif
00054 #ifdef HAVE_OPENSSL
00055   _http_conn(NULL),
00056 #endif
00057 #ifdef HAVE_NET
00058   _cw(&_qcm, threaded_net ? 1 : 0),
00059   _qcr(&_qcm, threaded_net ? 1 : 0),
00060 #endif
00061 #ifdef WANT_NATIVE_NET
00062   _bdc(4096000,4096000,1400),
00063   _native(false),
00064 #endif
00065   _client_datagram(true),
00066   _handle_datagrams_internally(handle_datagrams_internally),
00067   _simulated_disconnect(false),
00068   _verbose(distributed_cat.is_spam()),
00069   _time_warning(0.0),
00070 //  _msg_channels(),
00071   _msg_sender(0),
00072   _msg_type(0),
00073   _has_owner_view(has_owner_view),
00074   _handle_c_updates(true),
00075   _want_message_bundling(true),
00076   _bundling_msgs(0),
00077   _in_quiet_zone(0)
00078 {
00079 #if defined(HAVE_NET) && defined(SIMULATE_NETWORK_DELAY)
00080   if (min_lag != 0.0 || max_lag != 0.0) {
00081     _qcr.start_delay(min_lag, max_lag);
00082   }
00083 #endif
00084   _tcp_header_size = tcp_header_size;
00085 
00086 #ifdef HAVE_PYTHON
00087   PyObject *  PyDitterator = DTool_CreatePyInstance(&_di,Dtool_DatagramIterator,false,false);
00088   if(PyDitterator != NULL)
00089       _python_ai_datagramiterator = Py_BuildValue("(O)",PyDitterator);
00090 #endif
00091 
00092 }
00093 
00094 ////////////////////////////////////////////////////////////////////
00095 //     Function: CConnectionRepository::Destructor
00096 //       Access: Published
00097 //  Description: 
00098 ////////////////////////////////////////////////////////////////////
00099 CConnectionRepository::
00100 ~CConnectionRepository() {
00101   disconnect();
00102 }
00103 
00104 ////////////////////////////////////////////////////////////////////
00105 //     Function: CConnectionRepository::set_tcp_header_size
00106 //       Access: Public
00107 //  Description: Sets the header size of TCP packets.  At the present,
00108 //               legal values for this are 0, 2, or 4; this specifies
00109 //               the number of bytes to use encode the datagram length
00110 //               at the start of each TCP datagram.  Sender and
00111 //               receiver must independently agree on this.
00112 ////////////////////////////////////////////////////////////////////
00113 void CConnectionRepository::
00114 set_tcp_header_size(int tcp_header_size) {
00115   _tcp_header_size = tcp_header_size;
00116 
00117 #ifdef HAVE_OPENSSL
00118   if (_http_conn != (SocketStream *)NULL) {
00119     _http_conn->set_tcp_header_size(tcp_header_size);
00120   }
00121 #endif
00122 
00123 #ifdef HAVE_NET
00124   _cw.set_tcp_header_size(tcp_header_size);
00125   _qcr.set_tcp_header_size(tcp_header_size);
00126 #endif
00127 }
00128 
00129 #ifdef HAVE_OPENSSL
00130 ////////////////////////////////////////////////////////////////////
00131 //     Function: CConnectionRepository::set_connection_http
00132 //       Access: Published
00133 //  Description: Once a connection has been established via the HTTP
00134 //               interface, gets the connection and uses it.  The
00135 //               supplied HTTPChannel object must have a connection
00136 //               available via get_connection().
00137 ////////////////////////////////////////////////////////////////////
00138 void CConnectionRepository::
00139 set_connection_http(HTTPChannel *channel) {
00140   ReMutexHolder holder(_lock);
00141 
00142   disconnect();
00143   nassertv(channel->is_connection_ready());
00144   _http_conn = channel->get_connection();
00145   _http_conn->set_tcp_header_size(_tcp_header_size);
00146 #ifdef SIMULATE_NETWORK_DELAY
00147   if (min_lag != 0.0 || max_lag != 0.0) {
00148     _http_conn->start_delay(min_lag, max_lag);
00149   }
00150 #endif
00151 }
00152 #endif  // HAVE_OPENSSL
00153 
00154 #ifdef HAVE_OPENSSL
00155 ////////////////////////////////////////////////////////////////////
00156 //     Function: CConnectionRepository::get_stream
00157 //       Access: Published
00158 //  Description: Returns the SocketStream that internally represents
00159 //               the already-established HTTP connection.  Returns
00160 //               NULL if there is no current HTTP connection.
00161 ////////////////////////////////////////////////////////////////////
00162 SocketStream *CConnectionRepository::
00163 get_stream() {
00164   ReMutexHolder holder(_lock);
00165 
00166   return _http_conn;
00167 }
00168 #endif  // HAVE_OPENSSL
00169 
00170 
00171 #ifdef HAVE_NET
00172 ////////////////////////////////////////////////////////////////////
00173 //     Function: CConnectionRepository::try_connect_net
00174 //       Access: Published
00175 //  Description: Uses Panda's "net" library to try to connect to the
00176 //               server and port named in the indicated URL.  Returns
00177 //               true if successful, false otherwise.
00178 ////////////////////////////////////////////////////////////////////
00179 bool CConnectionRepository::
00180 try_connect_net(const URLSpec &url) {
00181   ReMutexHolder holder(_lock);
00182 
00183   disconnect();
00184 
00185   _net_conn = 
00186     _qcm.open_TCP_client_connection(url.get_server(), url.get_port(),
00187                                     game_server_timeout_ms);
00188 
00189   if (_net_conn != (Connection *)NULL) {
00190     _net_conn->set_no_delay(true);
00191     _qcr.add_connection(_net_conn);
00192     return true;
00193   }
00194 
00195   return false;
00196 }
00197 #endif  // HAVE_NET
00198 
00199 #ifdef WANT_NATIVE_NET
00200 ////////////////////////////////////////////////////////////////////
00201 //     Function: CConnectionRepository::connect_native
00202 //       Access: Published
00203 //  Description: Connects to the server using Panda's low-level and
00204 //               fast "native net" library.
00205 ////////////////////////////////////////////////////////////////////
00206 bool CConnectionRepository::
00207 connect_native(const URLSpec &url) {
00208   ReMutexHolder holder(_lock);
00209 
00210   _native=true;
00211   Socket_Address addr;
00212   addr.set_host(url.get_server(),url.get_port());
00213   _bdc.ClearAddresses();
00214   _bdc.AddAddress(addr);
00215   return _bdc.DoConnect();
00216 }
00217 
00218 #endif //WANT NATIVE NET
00219 
00220 #ifdef SIMULATE_NETWORK_DELAY
00221 ////////////////////////////////////////////////////////////////////
00222 //     Function: CConnectionRepository::start_delay
00223 //       Access: Published
00224 //  Description: Enables a simulated network latency.  All datagrams
00225 //               received from this point on will be held for a random
00226 //               interval of least min_delay seconds, and no more than
00227 //               max_delay seconds, before being visible.  It is as if
00228 //               datagrams suddenly took much longer to arrive.
00229 //
00230 //               This should *only* be called if the underlying socket
00231 //               is non-blocking.  If you call this on a blocking
00232 //               socket, it will force all datagrams to be held up
00233 //               until the socket closes.
00234 //
00235 //               This has no effect if the connection method is via
00236 //               the "native net" library.
00237 ////////////////////////////////////////////////////////////////////
00238 void CConnectionRepository::
00239 start_delay(double min_delay, double max_delay) {
00240   ReMutexHolder holder(_lock);
00241 
00242   if (min_delay != 0.0 || max_delay != 0.0) {
00243 #ifdef HAVE_NET
00244     _qcr.start_delay(min_delay, max_delay);
00245 #endif  // HAVE_NET
00246 #ifdef HAVE_OPENSSL
00247     if (_http_conn != (SocketStream *)NULL) {
00248       _http_conn->start_delay(min_delay, max_delay);
00249     }
00250 #endif  // HAVE_OPENSSL
00251   } else {
00252     stop_delay();
00253   }
00254 }
00255 #endif  // SIMULATE_NETWORK_DELAY
00256 
00257 #ifdef SIMULATE_NETWORK_DELAY
00258 ////////////////////////////////////////////////////////////////////
00259 //     Function: CConnectionRepository::stop_delay
00260 //       Access: Published
00261 //  Description: Disables the simulated network latency started by a
00262 //               previous call to start_delay().  Datagrams will once
00263 //               again be visible as soon as they are received.
00264 ////////////////////////////////////////////////////////////////////
00265 void CConnectionRepository::
00266 stop_delay() {
00267   ReMutexHolder holder(_lock);
00268 
00269 #ifdef HAVE_NET
00270   _qcr.stop_delay();
00271 #endif  // HAVE_NET
00272 #ifdef HAVE_OPENSSL
00273   if (_http_conn != (SocketStream *)NULL) {
00274     _http_conn->stop_delay();
00275   }
00276 #endif  // HAVE_OPENSSL
00277 }
00278 #endif  // SIMULATE_NETWORK_DELAY
00279 
00280 ////////////////////////////////////////////////////////////////////
00281 //     Function: CConnectionRepository::check_datagram
00282 //       Access: Published
00283 //  Description: Returns true if a new datagram is available, false
00284 //               otherwise.  If the return value is true, the new
00285 //               datagram may be retrieved via get_datagram(), or
00286 //               preferably, with get_datagram_iterator() and
00287 //               get_msg_type().
00288 ////////////////////////////////////////////////////////////////////
00289 bool CConnectionRepository::
00290 check_datagram() {
00291   ReMutexHolder holder(_lock);
00292 
00293   if (_simulated_disconnect) {
00294     return false;
00295   }
00296   #ifdef WANT_NATIVE_NET
00297   if(_native)
00298     _bdc.Flush();
00299   #endif //WANT_NATIVE_NET
00300 
00301   while (do_check_datagram()) {
00302     if (get_verbose()) {
00303       describe_message(nout, "RECV", _dg);
00304     }
00305 
00306     // Start breaking apart the datagram.
00307     _di = DatagramIterator(_dg);
00308 
00309     if (!_client_datagram) {
00310       unsigned char  wc_cnt;
00311       wc_cnt = _di.get_uint8();
00312       _msg_channels.clear();
00313       for (unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) {
00314         CHANNEL_TYPE  schan  = _di.get_uint64();
00315         _msg_channels.push_back(schan);
00316       }
00317       _msg_sender = _di.get_uint64();
00318       
00319 #ifdef HAVE_PYTHON
00320       // For now, we need to stuff this field onto the Python
00321       // structure, to support legacy code that expects to find it
00322       // there.
00323       if (_python_repository != (PyObject *)NULL) {
00324 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00325         PyGILState_STATE gstate;
00326         gstate = PyGILState_Ensure();
00327 #endif
00328         PyObject *value = PyLong_FromUnsignedLongLong(_msg_sender);
00329         PyObject_SetAttrString(_python_repository, "msgSender", value);
00330         Py_DECREF(value);
00331 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00332         PyGILState_Release(gstate);
00333 #endif
00334       }
00335 #endif  // HAVE_PYTHON
00336     }
00337 
00338     _msg_type = _di.get_uint16();
00339     // Is this a message that we can process directly?
00340     if (!_handle_datagrams_internally) {
00341       return true;
00342     }
00343 
00344     switch (_msg_type) {
00345 #ifdef HAVE_PYTHON
00346     case CLIENT_OBJECT_UPDATE_FIELD:
00347     case STATESERVER_OBJECT_UPDATE_FIELD:
00348       if (_handle_c_updates) {
00349         if (_has_owner_view) {
00350           if (!handle_update_field_owner()) {
00351             return false;
00352           }
00353         } else {
00354           if (!handle_update_field()) {
00355             return false;
00356           }
00357         }
00358       } else {
00359         // Let the caller (Python) deal with this update.
00360         return true;
00361       }
00362       break;
00363 #endif  // HAVE_PYTHON
00364       
00365     default:
00366       // Some unknown message; let the caller deal with it.
00367       return true;
00368     }
00369   }
00370 
00371   // No datagrams available.
00372   return false;
00373 }
00374 
00375 
00376 
00377 
00378 ////////////////////////////////////////////////////////////////////
00379 //     Function: CConnectionRepository::is_connected
00380 //       Access: Published
00381 //  Description: Returns true if the connection to the gameserver is
00382 //               established and still good, false if we are not
00383 //               connected.  A false value means either (a) we never
00384 //               successfully connected, (b) we explicitly called
00385 //               disconnect(), or (c) we were connected, but the
00386 //               connection was spontaneously lost.
00387 ////////////////////////////////////////////////////////////////////
00388 bool CConnectionRepository::
00389 is_connected() {
00390   ReMutexHolder holder(_lock);
00391 
00392 #ifdef WANT_NATIVE_NET
00393   if(_native)
00394     return (_bdc.IsConnected());
00395 #endif
00396 
00397 #ifdef HAVE_NET
00398   if (_net_conn) {
00399     if (_qcm.reset_connection_available()) {
00400       PT(Connection) reset_connection;
00401       if (_qcm.get_reset_connection(reset_connection)) {
00402         _qcm.close_connection(reset_connection);
00403         if (reset_connection == _net_conn) {
00404           // Whoops, lost our connection.
00405           _net_conn = NULL;
00406           return false;
00407         }
00408       }
00409     }
00410     return true;
00411   }
00412 #endif  // HAVE_NET
00413 
00414 #ifdef HAVE_OPENSSL
00415   if (_http_conn) {
00416     if (!_http_conn->is_closed()) {
00417       return true;
00418     }
00419 
00420     // Connection lost.
00421     delete _http_conn;
00422     _http_conn = NULL;
00423   }
00424 #endif  // HAVE_OPENSSL
00425 
00426   return false;
00427 }
00428 
00429 ////////////////////////////////////////////////////////////////////
00430 //     Function: CConnectionRepository::send_datagram
00431 //       Access: Published
00432 //  Description: Queues the indicated datagram for sending to the
00433 //               server.  It may not get sent immediately if
00434 //               collect_tcp is in effect; call flush() to guarantee
00435 //               it is sent now.
00436 ////////////////////////////////////////////////////////////////////
00437 bool CConnectionRepository::
00438 send_datagram(const Datagram &dg) {
00439   ReMutexHolder holder(_lock);
00440 
00441   if (_simulated_disconnect) {
00442     distributed_cat.warning()
00443       << "Unable to send datagram during simulated disconnect.\n";
00444     return false;
00445   }
00446 
00447   if (get_verbose()) {
00448     describe_message(nout, "SEND", dg);
00449   }
00450 
00451   if (is_bundling_messages() && get_want_message_bundling()) {
00452     bundle_msg(dg);
00453     return true;
00454   }
00455 
00456 #ifdef WANT_NATIVE_NET
00457   if(_native)
00458     return _bdc.SendMessage(dg);
00459 #endif
00460 
00461 #ifdef HAVE_NET
00462   if (_net_conn) {
00463     _cw.send(dg, _net_conn);
00464     return true;
00465   }
00466 #endif  // HAVE_NET
00467 
00468 #ifdef HAVE_OPENSSL
00469   if (_http_conn) {
00470     if (!_http_conn->send_datagram(dg)) {
00471       distributed_cat.warning()
00472         << "Could not send datagram.\n";
00473       return false;
00474     }
00475 
00476     return true;
00477   }
00478 #endif  // HAVE_OPENSSL
00479 
00480   distributed_cat.warning()
00481     << "Unable to send datagram after connection is closed.\n";
00482   return false;
00483 }
00484 
00485 ////////////////////////////////////////////////////////////////////
00486 //     Function: CConnectionRepository::start_message_bundle
00487 //       Access: Published
00488 //  Description: Send a set of messages to the state server that will
00489 //               be processed atomically.  For instance, you can do a
00490 //               combined setLocation/setPos and prevent race
00491 //               conditions where clients briefly get the setLocation
00492 //               but not the setPos, because the state server hasn't
00493 //               processed the setPos yet
00494 ////////////////////////////////////////////////////////////////////
00495 void CConnectionRepository::
00496 start_message_bundle() {
00497   ReMutexHolder holder(_lock);
00498 
00499   // store up network messages until sendMessageBundle is called
00500   // all updates in between must be sent from the same doId (updates
00501   // must all affect the same DistributedObject)
00502   // it is an error to call this again before calling sendMessageBundle
00503   if (get_verbose()) {
00504     nout << "CR::SEND:BUNDLE_START(" << _bundling_msgs << ")" << endl;
00505   }
00506   if (_bundling_msgs == 0) {
00507     _bundle_msgs.clear();
00508   }
00509   ++_bundling_msgs;
00510 }
00511 
00512 ////////////////////////////////////////////////////////////////////
00513 //     Function: CConnectionRepository::send_message_bundle
00514 //       Access: Published
00515 //  Description: Send network messages queued up since
00516 //               startMessageBundle was called.
00517 ////////////////////////////////////////////////////////////////////
00518 void CConnectionRepository::
00519 send_message_bundle(unsigned int channel, unsigned int sender_channel) {
00520   ReMutexHolder holder(_lock);
00521   nassertv(_bundling_msgs);
00522 
00523   --_bundling_msgs;
00524 
00525   if (get_verbose()) {
00526     nout << "CR::SEND:BUNDLE_FINISH(" << _bundling_msgs << ")" << endl;
00527   }
00528 
00529   // if _bundling_msgs ref count is zero, send the bundle out
00530   if (_bundling_msgs == 0 && get_want_message_bundling()) {
00531     Datagram dg;
00532     // add server header (see PyDatagram.addServerHeader)
00533     dg.add_int8(1);
00534     dg.add_uint64(channel);
00535     dg.add_uint64(sender_channel);
00536     dg.add_uint16(STATESERVER_BOUNCE_MESSAGE);
00537     // add each bundled message
00538     BundledMsgVector::const_iterator bmi;
00539     for (bmi = _bundle_msgs.begin(); bmi != _bundle_msgs.end(); bmi++) {
00540       dg.add_string(*bmi);
00541     }
00542 
00543     send_datagram(dg);
00544   }
00545 }
00546 
00547 ////////////////////////////////////////////////////////////////////
00548 //     Function: CConnectionRepository::abandon_message_bundles
00549 //       Access: Published
00550 //  Description: throw out any msgs that have been queued up for
00551 //               message bundles
00552 ////////////////////////////////////////////////////////////////////
00553 void CConnectionRepository::
00554 abandon_message_bundles() {
00555   ReMutexHolder holder(_lock);
00556 
00557   nassertv(is_bundling_messages());
00558   _bundling_msgs = 0;
00559   _bundle_msgs.clear();
00560 }
00561 
00562 ////////////////////////////////////////////////////////////////////
00563 //     Function: CConnectionRepository::bundle_msg
00564 //       Access: Published
00565 //  Description:
00566 ////////////////////////////////////////////////////////////////////
00567 void CConnectionRepository::
00568 bundle_msg(const Datagram &dg) {
00569   ReMutexHolder holder(_lock);
00570 
00571   nassertv(is_bundling_messages());
00572   _bundle_msgs.push_back(dg.get_message());
00573 }
00574 
00575 ////////////////////////////////////////////////////////////////////
00576 //     Function: CConnectionRepository::consider_flush
00577 //       Access: Published
00578 //  Description: Sends the most recently queued data if enough time
00579 //               has elapsed.  This only has meaning if
00580 //               set_collect_tcp() has been set to true.
00581 ////////////////////////////////////////////////////////////////////
00582 bool CConnectionRepository::
00583 consider_flush() {
00584   ReMutexHolder holder(_lock);
00585 
00586   if (_simulated_disconnect) {
00587     return false;
00588   }
00589 
00590 #ifdef WANT_NATIVE_NET
00591   if(_native)
00592     return true;  //Maybe we should just flush here for now?
00593 #endif
00594 
00595 #ifdef HAVE_NET
00596   if (_net_conn) {
00597     return _net_conn->consider_flush();
00598   }
00599 #endif  // HAVE_NET
00600 
00601 #ifdef HAVE_OPENSSL
00602   if (_http_conn) {
00603     return _http_conn->consider_flush();
00604   }
00605 #endif  // HAVE_OPENSSL
00606 
00607   return false;
00608 }
00609 
00610 ////////////////////////////////////////////////////////////////////
00611 //     Function: CConnectionRepository::flush
00612 //       Access: Published
00613 //  Description: Sends the most recently queued data now.  This only
00614 //               has meaning if set_collect_tcp() has been set to
00615 //               true.
00616 ////////////////////////////////////////////////////////////////////
00617 bool CConnectionRepository::
00618 flush() {
00619   ReMutexHolder holder(_lock);
00620 
00621   if (_simulated_disconnect) {
00622     return false;
00623   }
00624   #ifdef WANT_NATIVE_NET
00625   if(_native)
00626     return _bdc.Flush();
00627   #endif
00628 
00629   #ifdef HAVE_NET
00630   if (_net_conn) {
00631     return _net_conn->flush();
00632   }
00633   #endif  // HAVE_NET
00634 
00635   #ifdef HAVE_OPENSSL
00636   if (_http_conn) {
00637     return _http_conn->flush();
00638   }
00639   #endif  // HAVE_OPENSSL
00640 
00641   return false;
00642 }
00643 
00644 ////////////////////////////////////////////////////////////////////
00645 //     Function: CConnectionRepository::disconnect
00646 //       Access: Published
00647 //  Description: Closes the connection to the server.
00648 ////////////////////////////////////////////////////////////////////
00649 void CConnectionRepository::
00650 disconnect() {
00651   ReMutexHolder holder(_lock);
00652 
00653   #ifdef WANT_NATIVE_NET
00654   if(_native) {
00655     _bdc.Reset();
00656     _bdc.ClearAddresses();
00657   }
00658   #endif
00659   #ifdef HAVE_NET
00660   if (_net_conn) {
00661     _qcm.close_connection(_net_conn);
00662     _net_conn = NULL;
00663   }
00664   #endif  // HAVE_NET
00665 
00666   #ifdef HAVE_OPENSSL
00667   if (_http_conn) {
00668     _http_conn->close();
00669     delete _http_conn;
00670     _http_conn = NULL;
00671   }
00672   #endif  // HAVE_OPENSSL
00673 
00674   _simulated_disconnect = false;
00675 }
00676 
00677 ////////////////////////////////////////////////////////////////////
00678 //     Function: CConnectionRepository::shutdown
00679 //       Access: Published
00680 //  Description: May be called at application shutdown to ensure all
00681 //               threads are cleaned up.
00682 ////////////////////////////////////////////////////////////////////
00683 void CConnectionRepository::
00684 shutdown() {
00685   disconnect();
00686 
00687   #ifdef HAVE_NET
00688   _cw.shutdown();
00689   _qcr.shutdown();
00690   #endif  // HAVE_NET
00691 }
00692 
00693 ////////////////////////////////////////////////////////////////////
00694 //     Function: CConnectionRepository::do_check_datagram
00695 //       Access: Private
00696 //  Description: The private implementation of check_datagram(), this
00697 //               gets one datagram if it is available.
00698 ////////////////////////////////////////////////////////////////////
00699 bool CConnectionRepository::
00700 do_check_datagram() {
00701   #ifdef WANT_NATIVE_NET
00702   if(_native) {
00703     return _bdc.GetMessage(_dg);
00704   }
00705   #endif
00706   #ifdef HAVE_NET
00707   if (_net_conn) {
00708     _net_conn->consider_flush();
00709     if (_qcr.get_overflow_flag()) {
00710       throw_event(get_overflow_event_name());
00711       _qcr.reset_overflow_flag();
00712     }
00713     return (_qcr.data_available() && _qcr.get_data(_dg));
00714   }
00715   #endif  // HAVE_NET
00716 
00717   #ifdef HAVE_OPENSSL
00718   if (_http_conn) {
00719     _http_conn->consider_flush();
00720     return _http_conn->receive_datagram(_dg);
00721   }
00722   #endif  // HAVE_OPENSSL
00723 
00724 
00725   return false;
00726 }
00727 
00728 ////////////////////////////////////////////////////////////////////
00729 //     Function: CConnectionRepository::handle_update_field
00730 //       Access: Private
00731 //  Description: Directly handles an update message on a field.
00732 //               Python never touches the datagram; it just gets its
00733 //               distributed method called with the appropriate
00734 //               parameters.  Returns true if everything is ok, false
00735 //               if there was an error processing the field's update
00736 //               method.
00737 ////////////////////////////////////////////////////////////////////
00738 bool CConnectionRepository::
00739 handle_update_field() {
00740 #ifdef HAVE_PYTHON
00741 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00742   PyGILState_STATE gstate;
00743   gstate = PyGILState_Ensure();
00744 #endif
00745 
00746   PStatTimer timer(_update_pcollector);
00747   unsigned int do_id = _di.get_uint32();
00748   if (_python_repository != (PyObject *)NULL) 
00749   {
00750     PyObject *doId2do =
00751       PyObject_GetAttrString(_python_repository, "doId2do");
00752     nassertr(doId2do != NULL, false);
00753 
00754     #ifdef USE_PYTHON_2_2_OR_EARLIER
00755     PyObject *doId = PyInt_FromLong(do_id);
00756     #else
00757     PyObject *doId = PyLong_FromUnsignedLong(do_id);
00758     #endif
00759     PyObject *distobj = PyDict_GetItem(doId2do, doId);
00760     Py_DECREF(doId);
00761     Py_DECREF(doId2do);
00762 
00763     if (distobj != NULL) {
00764       PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
00765       nassertr(dclass_obj != NULL, false);
00766 
00767 
00768       PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
00769       Py_DECREF(dclass_obj);
00770       nassertr(dclass_this != NULL, false);
00771 
00772       DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this);
00773       Py_DECREF(dclass_this);
00774 
00775       // If in quiet zone mode, throw update away unless distobj
00776       // has 'neverDisable' attribute set to non-zero
00777       if (_in_quiet_zone) {
00778         PyObject *neverDisable = PyObject_GetAttrString(distobj, "neverDisable");
00779         nassertr(neverDisable != NULL, false);
00780 
00781         unsigned int cNeverDisable = PyInt_AsLong(neverDisable);
00782         if (!cNeverDisable) {
00783           // in quiet zone and distobj is disable-able
00784           // drop update on the floor
00785 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00786           PyGILState_Release(gstate);
00787 #endif
00788           return true;
00789         }
00790       }
00791 
00792       // It's a good idea to ensure the reference count to distobj is
00793       // raised while we call the update method--otherwise, the update
00794       // method might get into trouble if it tried to delete the
00795       // object from the doId2do map.
00796       Py_INCREF(distobj);
00797       dclass->receive_update(distobj, _di); 
00798       Py_DECREF(distobj);
00799       
00800       if (PyErr_Occurred()) {
00801 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00802         PyGILState_Release(gstate);
00803 #endif
00804         return false;
00805       }
00806     }
00807 
00808   }
00809 
00810 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00811   PyGILState_Release(gstate);
00812 #endif
00813   #endif  // HAVE_PYTHON  
00814   return true;
00815 }
00816 
00817 
00818 ////////////////////////////////////////////////////////////////////
00819 //     Function: CConnectionRepository::handle_update_field_owner
00820 //       Access: Private
00821 //  Description: Directly handles an update message on a field.
00822 //               Supports 'owner' views of objects, separate from 'visible'
00823 //               view, and forwards fields to the appropriate view(s) based
00824 //               on DC flags.  Python never touches the datagram; it just
00825 //               gets its distributed method called with the appropriate
00826 //               parameters.  Returns true if everything is ok, false if
00827 //               there was an error processing the field's update method.
00828 ////////////////////////////////////////////////////////////////////
00829 bool CConnectionRepository::
00830 handle_update_field_owner() {
00831 #ifdef HAVE_PYTHON
00832 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00833   PyGILState_STATE gstate;
00834   gstate = PyGILState_Ensure();
00835 #endif
00836 
00837   PStatTimer timer(_update_pcollector);
00838   unsigned int do_id = _di.get_uint32();
00839   if (_python_repository != (PyObject *)NULL) {
00840     PyObject *doId2do =
00841       PyObject_GetAttrString(_python_repository, "doId2do");
00842     nassertr(doId2do != NULL, false);
00843 
00844     PyObject *doId2ownerView =
00845       PyObject_GetAttrString(_python_repository, "doId2ownerView");
00846     nassertr(doId2ownerView != NULL, false);
00847 
00848     #ifdef USE_PYTHON_2_2_OR_EARLIER
00849     PyObject *doId = PyInt_FromLong(do_id);
00850     #else
00851     PyObject *doId = PyLong_FromUnsignedLong(do_id);
00852     #endif
00853 
00854     // pass the update to the owner view first
00855     PyObject *distobjOV = PyDict_GetItem(doId2ownerView, doId);
00856     Py_DECREF(doId2ownerView);
00857 
00858     if (distobjOV != NULL) {
00859       PyObject *dclass_obj = PyObject_GetAttrString(distobjOV, "dclass");
00860       nassertr(dclass_obj != NULL, false);
00861 
00862       PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
00863       Py_DECREF(dclass_obj);
00864       nassertr(dclass_this != NULL, false);
00865 
00866       DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this);
00867       Py_DECREF(dclass_this);
00868 
00869       // check if we should forward this update to the owner view
00870       DCPacker packer;
00871       packer.set_unpack_data(_di.get_remaining_bytes());
00872       int field_id = packer.raw_unpack_uint16();
00873       DCField *field = dclass->get_field_by_index(field_id);
00874       if (field->is_ownrecv()) {
00875         // It's a good idea to ensure the reference count to distobjOV is
00876         // raised while we call the update method--otherwise, the update
00877         // method might get into trouble if it tried to delete the
00878         // object from the doId2do map.
00879         Py_INCREF(distobjOV);
00880         // make a copy of the datagram iterator so that we can use the main
00881         // iterator for the non-owner update
00882         DatagramIterator _odi(_di);
00883         dclass->receive_update(distobjOV, _odi); 
00884         Py_DECREF(distobjOV);
00885       
00886         if (PyErr_Occurred()) {
00887 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00888           PyGILState_Release(gstate);
00889 #endif
00890           return false;
00891         }
00892       }
00893     }
00894 
00895     // now pass the update to the visible view
00896     PyObject *distobj = PyDict_GetItem(doId2do, doId);
00897     Py_DECREF(doId);
00898     Py_DECREF(doId2do);
00899 
00900     if (distobj != NULL) {
00901       PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
00902       nassertr(dclass_obj != NULL, false);
00903 
00904       PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
00905       Py_DECREF(dclass_obj);
00906       nassertr(dclass_this != NULL, false);
00907 
00908       DCClass *dclass = (DCClass *)PyInt_AsLong(dclass_this);
00909       Py_DECREF(dclass_this);
00910 
00911       // check if we should forward this update to the owner view
00912       DCPacker packer;
00913       packer.set_unpack_data(_di.get_remaining_bytes());
00914       int field_id = packer.raw_unpack_uint16();
00915       DCField *field = dclass->get_field_by_index(field_id);
00916       if (true) {//field->is_broadcast()) {
00917         // It's a good idea to ensure the reference count to distobj is
00918         // raised while we call the update method--otherwise, the update
00919         // method might get into trouble if it tried to delete the
00920         // object from the doId2do map.
00921         Py_INCREF(distobj);
00922         dclass->receive_update(distobj, _di); 
00923         Py_DECREF(distobj);
00924       
00925         if (PyErr_Occurred()) {
00926 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00927           PyGILState_Release(gstate);
00928 #endif
00929           return false;
00930         }
00931       }
00932     }
00933   }
00934 
00935 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
00936   PyGILState_Release(gstate);
00937 #endif
00938 #endif  // HAVE_PYTHON  
00939 
00940   return true;
00941 }
00942 
00943 ////////////////////////////////////////////////////////////////////
00944 //     Function: CConnectionRepository::describe_message
00945 //       Access: Private
00946 //  Description: Unpacks the message and reformats it for user
00947 //               consumption, writing a description on the indicated
00948 //               output stream.
00949 ////////////////////////////////////////////////////////////////////
00950 void CConnectionRepository::
00951 describe_message(ostream &out, const string &prefix, 
00952                  const Datagram &dg) const {
00953   DCPacker packer;
00954   
00955   packer.set_unpack_data(dg.get_message());
00956   CHANNEL_TYPE do_id;
00957   int msg_type;
00958   bool is_update = false;
00959   string full_prefix = "CR::" + prefix;
00960 
00961   if (!_client_datagram) 
00962   {
00963     unsigned char mcnt = packer.raw_unpack_uint8();
00964     for( ;mcnt > 0; mcnt--)
00965         packer.RAW_UNPACK_CHANNEL();  // msg_channel
00966 
00967     packer.RAW_UNPACK_CHANNEL();  // msg_sender
00968     msg_type = packer.raw_unpack_uint16();
00969     is_update = (msg_type == STATESERVER_OBJECT_UPDATE_FIELD);
00970     
00971   } else {
00972     msg_type = packer.raw_unpack_uint16();
00973     is_update = (msg_type == CLIENT_OBJECT_UPDATE_FIELD);
00974   }
00975 
00976   if (!is_update) {
00977     // figure out the name of the message
00978     // TODO: print out the arguments to the message
00979     string msgName;
00980 
00981     #ifdef HAVE_PYTHON
00982     if (_python_repository != (PyObject *)NULL) {
00983       PyObject *msgId = PyLong_FromLong(msg_type);
00984       nassertv(msgId != NULL);
00985       PyObject *methodName = PyString_FromString("_getMsgName");
00986       nassertv(methodName != NULL);
00987 
00988       PyObject *result = PyObject_CallMethodObjArgs(_python_repository, methodName,
00989                                                     msgId, NULL);
00990       nassertv(result != NULL);
00991 
00992       msgName += string(PyString_AsString(result));
00993 
00994       Py_DECREF(methodName);
00995       Py_DECREF(msgId);
00996       Py_DECREF(result);
00997     }
00998     #endif
00999     if (msgName.length() == 0) {
01000       msgName += "unknown message ";
01001       msgName += msg_type;
01002       msgName += "\n";
01003     }
01004     out << full_prefix << ":" << msgName << "\n";
01005     dg.dump_hex(out, 2);
01006 
01007   } else {
01008     // It's an update message.  Figure out what dclass the object is
01009     // based on its doId, so we can decode the rest of the message.
01010     do_id = packer.raw_unpack_uint32();
01011     DCClass *dclass = NULL;
01012 
01013     #ifdef HAVE_PYTHON
01014     if (_python_repository != (PyObject *)NULL) {
01015       PyObject *doId2do =
01016         PyObject_GetAttrString(_python_repository, "doId2do");
01017       nassertv(doId2do != NULL);
01018 
01019       #ifdef USE_PYTHON_2_2_OR_EARLIER
01020       PyObject *doId = PyInt_FromLong(do_id);
01021       #else
01022       PyObject *doId = PyLong_FromUnsignedLong(do_id);
01023       #endif
01024       PyObject *distobj = PyDict_GetItem(doId2do, doId);
01025       Py_DECREF(doId);
01026       Py_DECREF(doId2do);
01027 
01028       if (distobj != NULL) {
01029         PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
01030         nassertv(dclass_obj != NULL);
01031 
01032         PyObject *dclass_this = PyObject_GetAttrString(dclass_obj, "this");
01033         Py_DECREF(dclass_obj);
01034         nassertv(dclass_this != NULL);
01035         
01036         dclass = (DCClass *)PyInt_AsLong(dclass_this);
01037         Py_DECREF(dclass_this);
01038       }
01039     }
01040     #endif  // HAVE_PYTHON  
01041 
01042     int field_id = packer.raw_unpack_uint16();
01043 
01044     if (dclass == (DCClass *)NULL) {
01045       out << full_prefix << "update for unknown object " << do_id 
01046           << ", field " << field_id << "\n";
01047 
01048     } else {
01049       out << full_prefix <<
01050         ":" << dclass->get_name() << "(" << do_id << ").";
01051       DCField *field = dclass->get_field_by_index(field_id);
01052       if (field == (DCField *)NULL) {
01053         out << "unknown field " << field_id << "\n";
01054         
01055       } else {
01056         out << field->get_name();
01057         packer.begin_unpack(field);
01058         packer.unpack_and_format(out);
01059         packer.end_unpack();
01060         out << "\n";
01061       }
01062     }
01063   }
01064 }
01065 
01066 
01067 
01068 
01069 #ifdef HAVE_PYTHON
01070 #ifdef WANT_NATIVE_NET
01071 
01072 bool CConnectionRepository::network_based_reader_and_yielder(PyObject *PycallBackFunction,ClockObject &clock, float returnBy)
01073 {
01074   ReMutexHolder holder(_lock);
01075     while(is_connected())
01076     {        
01077         check_datagram_ai(PycallBackFunction);
01078         if(is_connected())
01079             _bdc.Flush();
01080         float currentTime = clock.get_real_time();
01081         float dif_time = returnBy - currentTime;
01082         if(dif_time <= 0.001) // to avoi over runs..
01083             break;
01084         if(is_connected())
01085             _bdc.WaitForNetworkReadEvent(dif_time);
01086     }
01087     return false;
01088 }
01089 
01090 bool CConnectionRepository::check_datagram_ai(PyObject *PycallBackFunction)
01091 {
01092   ReMutexHolder holder(_lock);
01093     // these could be static .. not 
01094   PyObject *doId2do = NULL; 
01095   float startTime =0;
01096   float endTime = 0;
01097   // this seems weird...here
01098   _bdc.Flush();
01099   while (_bdc.GetMessage(_dg))
01100   { 
01101       if (get_verbose()) 
01102           describe_message(nout, "RECV", _dg);
01103 
01104       if (_time_warning > 0) 
01105         startTime = ClockObject::get_global_clock()->get_real_time();
01106 
01107       // Start breaking apart the datagram.
01108       _di.assign(_dg);
01109       unsigned char  wc_cnt = _di.get_uint8();
01110       _msg_channels.clear();
01111       for(unsigned char lp1 = 0; lp1 < wc_cnt; lp1++)
01112           _msg_channels.push_back(_di.get_uint64());
01113 
01114       _msg_sender = _di.get_uint64();
01115       _msg_type = _di.get_uint16();
01116 
01117       if( _msg_type == STATESERVER_OBJECT_UPDATE_FIELD)
01118       {
01119           if(doId2do == NULL)
01120           {
01121               // this is my attemp to take it out of the inner loop  RHH
01122               doId2do =PyObject_GetAttrString(_python_repository, "doId2do");
01123               nassertr(doId2do != NULL, false);
01124           }
01125 
01126           if (!handle_update_field_ai(doId2do)) 
01127           {
01128               Py_XDECREF(doId2do);
01129               if (_time_warning > 0) {
01130                 endTime = ClockObject::get_global_clock()->get_real_time(); 
01131                 if ( _time_warning < (endTime - startTime)) {
01132                   nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<<  (endTime-startTime) << "secs to process\n";
01133                   _dg.dump_hex(nout,2);
01134                 }
01135               }
01136               return false; 
01137           }
01138       }
01139       else
01140       {
01141           PyObject * result = PyEval_CallObject(PycallBackFunction, _python_ai_datagramiterator);
01142           if (PyErr_Occurred()) 
01143           {        
01144               Py_XDECREF(doId2do);
01145               if (_time_warning > 0) {
01146                 endTime = ClockObject::get_global_clock()->get_real_time(); 
01147                 if ( _time_warning < (endTime - startTime)) {
01148                   nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<<  (endTime-startTime) << "secs to process\n";
01149                   _dg.dump_hex(nout,2);                
01150                 }
01151               }
01152               return true;
01153           }
01154       }
01155 
01156       if (_time_warning > 0) {
01157         endTime = ClockObject::get_global_clock()->get_real_time(); 
01158         if ( _time_warning < (endTime - startTime)) {
01159           nout << "msg " << _msg_type <<" from " << _msg_sender << " took "<<  (endTime-startTime) << "secs to process\n";
01160           _dg.dump_hex(nout,2);   
01161         }
01162       }
01163              
01164   }
01165 
01166 
01167   Py_XDECREF(doId2do);
01168   return false;
01169 }
01170 
01171 #endif  // #ifdef WANT_NATIVE_NET
01172 #endif  // #ifdef HAVE_PYTHON
01173 
01174 
01175 #ifdef HAVE_PYTHON
01176 #ifdef WANT_NATIVE_NET
01177 
01178 
01179 bool CConnectionRepository::handle_update_field_ai(PyObject *doId2do) 
01180 {
01181   PStatTimer timer(_update_pcollector);
01182   unsigned int do_id = _di.get_uint32();
01183  
01184   PyObject *doId = PyLong_FromUnsignedLong(do_id);
01185   PyObject *distobj = PyDict_GetItem(doId2do, doId);
01186   Py_DECREF(doId);
01187 
01188   if (distobj != NULL)
01189   {
01190       PyObject *dclass_obj = PyObject_GetAttrString(distobj, "dclass");
01191       nassertr(dclass_obj != NULL, false);
01192 
01193       DCClass *dclass = NULL;
01194       DTOOL_Call_ExtractThisPointerForType(dclass_obj, &Dtool_DCClass, (void **) &dclass);
01195       if(dclass == NULL)
01196           return false;
01197 
01198       Py_INCREF(distobj);
01199       dclass->receive_update(distobj, _di); 
01200       Py_DECREF(distobj);
01201 
01202       if (PyErr_Occurred()) 
01203           return false;
01204   }
01205   return true;
01206 }
01207 
01208 #endif  // #ifdef WANT_NATIVE_NET
01209 #endif  // #ifdef HAVE_PYTHON
 All Classes Functions Variables Enumerations