Panda3D
 All Classes Functions Variables Enumerations
connectionManager.cxx
00001 // Filename: connectionManager.cxx
00002 // Created by:  jns (07Feb00)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "connectionManager.h"
00016 #include "connection.h"
00017 #include "connectionReader.h"
00018 #include "connectionWriter.h"
00019 #include "netAddress.h"
00020 #include "config_net.h"
00021 #include "lightMutexHolder.h"
00022 #include "trueClock.h"
00023 
00024 #if defined(WIN32_VC) || defined(WIN64_VC)
00025 #include <winsock2.h>  // For gethostname()
00026 #endif
00027 
00028 ////////////////////////////////////////////////////////////////////
00029 //     Function: ConnectionManager::Constructor
00030 //       Access: Published
00031 //  Description:
00032 ////////////////////////////////////////////////////////////////////
00033 ConnectionManager::
00034 ConnectionManager() : _set_mutex("ConnectionManager::_set_mutex") 
00035 {
00036 }
00037 
00038 ////////////////////////////////////////////////////////////////////
00039 //     Function: ConnectionManager::Destructor
00040 //       Access: Published, Virtual
00041 //  Description:
00042 ////////////////////////////////////////////////////////////////////
00043 ConnectionManager::
00044 ~ConnectionManager() {
00045   // Notify all of our associated readers and writers that we're gone.
00046   Readers::iterator ri;
00047   for (ri = _readers.begin(); ri != _readers.end(); ++ri) {
00048     (*ri)->clear_manager();
00049   }
00050   Writers::iterator wi;
00051   for (wi = _writers.begin(); wi != _writers.end(); ++wi) {
00052     (*wi)->clear_manager();
00053   }
00054 }
00055 
00056 
00057 ////////////////////////////////////////////////////////////////////
00058 //     Function: ConnectionManager::open_UDP_connection
00059 //       Access: Published
00060 //  Description: Opens a socket for sending and/or receiving UDP
00061 //               packets.  If the port number is greater than zero,
00062 //               the UDP connection will be opened for listening on
00063 //               the indicated port; otherwise, it will be useful only
00064 //               for sending.
00065 //
00066 //               Use a ConnectionReader and ConnectionWriter to handle
00067 //               the actual communication.
00068 ////////////////////////////////////////////////////////////////////
00069 PT(Connection) ConnectionManager::
00070 open_UDP_connection(int port) {
00071   Socket_UDP *socket = new Socket_UDP;
00072 
00073   if (port > 0) {
00074     NetAddress address;
00075     address.set_any(port);
00076     
00077     if (!socket->OpenForInput(address.get_addr())) {
00078       net_cat.error()
00079         << "Unable to bind to port " << port << " for UDP.\n";
00080       delete socket;
00081       return PT(Connection)();
00082     }
00083 
00084     net_cat.info()
00085       << "Creating UDP connection for port " << port << "\n";
00086 
00087   } else {
00088     if (!socket->InitNoAddress()) {
00089       net_cat.error()
00090         << "Unable to initialize outgoing UDP.\n";
00091       delete socket;
00092       return PT(Connection)();
00093     }
00094 
00095     net_cat.info()
00096       << "Creating outgoing UDP connection\n";
00097   }
00098 
00099   PT(Connection) connection = new Connection(this, socket);
00100   new_connection(connection);
00101   return connection;
00102 }
00103 
00104 
00105 
00106 ////////////////////////////////////////////////////////////////////
00107 //     Function: ConnectionManager::open_TCP_server_rendezvous
00108 //       Access: Published
00109 //  Description: Creates a socket to be used as a rendezvous socket
00110 //               for a server to listen for TCP connections.  The
00111 //               socket returned by this call should only be added to
00112 //               a ConnectionListener (not to a generic
00113 //               ConnectionReader).
00114 //
00115 //               This variant of this method accepts a single port,
00116 //               and will listen to that port on all available
00117 //               interfaces.
00118 //
00119 //               backlog is the maximum length of the queue of pending
00120 //               connections.
00121 ////////////////////////////////////////////////////////////////////
00122 PT(Connection) ConnectionManager::
00123 open_TCP_server_rendezvous(int port, int backlog) {
00124   NetAddress address;
00125   address.set_any(port);
00126   return open_TCP_server_rendezvous(address, backlog);
00127 }
00128 
00129 ////////////////////////////////////////////////////////////////////
00130 //     Function: ConnectionManager::open_TCP_server_rendezvous
00131 //       Access: Published
00132 //  Description: Creates a socket to be used as a rendezvous socket
00133 //               for a server to listen for TCP connections.  The
00134 //               socket returned by this call should only be added to
00135 //               a ConnectionListener (not to a generic
00136 //               ConnectionReader).
00137 //
00138 //               This variant of this method accepts a "hostname",
00139 //               which is usually just an IP address in dotted
00140 //               notation, and a port number.  It will listen on the
00141 //               interface indicated by the IP address.  If the IP
00142 //               address is empty string, it will listen on all
00143 //               interfaces.
00144 //
00145 //               backlog is the maximum length of the queue of pending
00146 //               connections.
00147 ////////////////////////////////////////////////////////////////////
00148 PT(Connection) ConnectionManager::
00149 open_TCP_server_rendezvous(const string &hostname, int port, int backlog) {
00150   NetAddress address;
00151   if (hostname.empty()) {
00152     address.set_any(port);
00153   } else {
00154     address.set_host(hostname, port);
00155   }
00156   return open_TCP_server_rendezvous(address, backlog);
00157 }
00158 
00159 ////////////////////////////////////////////////////////////////////
00160 //     Function: ConnectionManager::open_TCP_server_rendezvous
00161 //       Access: Published
00162 //  Description: Creates a socket to be used as a rendezvous socket
00163 //               for a server to listen for TCP connections.  The
00164 //               socket returned by this call should only be added to
00165 //               a ConnectionListener (not to a generic
00166 //               ConnectionReader).
00167 //
00168 //               This variant of this method accepts a NetAddress,
00169 //               which allows you to specify a specific interface to
00170 //               listen to.
00171 //
00172 //               backlog is the maximum length of the queue of pending
00173 //               connections.
00174 ////////////////////////////////////////////////////////////////////
00175 PT(Connection) ConnectionManager::
00176 open_TCP_server_rendezvous(const NetAddress &address, int backlog) {
00177   ostringstream strm;
00178   if (address.get_ip() == 0) {
00179     strm << "port " << address.get_port();  
00180   } else {
00181     strm << address.get_ip_string() << ":" << address.get_port();
00182   }
00183 
00184   Socket_TCP_Listen *socket = new Socket_TCP_Listen;
00185   bool okflag = socket->OpenForListen(address.get_addr(), backlog);
00186   if (!okflag) {
00187     net_cat.info()
00188       << "Unable to listen to " << strm.str() << " for TCP.\n";
00189     delete socket;
00190     return PT(Connection)();
00191   }
00192 
00193   net_cat.info()
00194     << "Listening for TCP connections on " << strm.str() << "\n";
00195 
00196   PT(Connection) connection = new Connection(this, socket);
00197   new_connection(connection);
00198   return connection;
00199 }
00200 
00201 ////////////////////////////////////////////////////////////////////
00202 //     Function: ConnectionManager::open_TCP_client_connection
00203 //       Access: Published
00204 //  Description: Attempts to establish a TCP client connection to a
00205 //               server at the indicated address.  If the connection
00206 //               is not established within timeout_ms milliseconds, a
00207 //               null connection is returned.
00208 ////////////////////////////////////////////////////////////////////
00209 PT(Connection) ConnectionManager::
00210 open_TCP_client_connection(const NetAddress &address, int timeout_ms) {
00211   Socket_TCP *socket = new Socket_TCP;
00212 
00213   // We always open the connection with non-blocking mode first, so we
00214   // can implement the timeout.
00215   bool okflag = socket->ActiveOpenNonBlocking(address.get_addr());
00216   if (okflag && socket->GetLastError() == LOCAL_CONNECT_BLOCKING) {
00217     // Now wait for the socket to connect.
00218     TrueClock *clock = TrueClock::get_global_ptr();
00219     double start = clock->get_short_time();
00220     Thread::force_yield();
00221     Socket_fdset fset;
00222     fset.setForSocket(*socket);
00223     int ready = fset.WaitForWrite(true, 0);
00224     while (ready == 0) {
00225       double elapsed = clock->get_short_time() - start;
00226       if (elapsed * 1000.0 > timeout_ms) {
00227         // Timeout.
00228         okflag = false;
00229         break;
00230       }
00231       Thread::force_yield();
00232       fset.setForSocket(*socket);
00233       ready = fset.WaitForWrite(true, 0);
00234     }
00235   }
00236 
00237   if (okflag) {
00238     // So, the connect() operation finished, but did it succeed or fail?
00239     if (socket->GetPeerName().GetIPAddressRaw() == 0) {
00240       // No peer means it failed.
00241       okflag = false;
00242     }
00243   }
00244 
00245   if (!okflag) {
00246     net_cat.error()
00247       << "Unable to open TCP connection to server "
00248       << address.get_ip_string() << " on port " << address.get_port() << "\n";
00249     delete socket;
00250     return PT(Connection)();
00251   }
00252 
00253 #if !defined(HAVE_THREADS) || !defined(SIMPLE_THREADS)
00254   // Now we have opened the socket in nonblocking mode.  Unless we're
00255   // using SIMPLE_THREADS, though, we really want the socket in
00256   // blocking mode (since that's what we support here).  Change it.
00257   socket->SetBlocking();
00258 
00259 #endif  // SIMPLE_THREADS
00260 
00261   net_cat.info()
00262     << "Opened TCP connection to server " << address.get_ip_string() << " "
00263     << " on port " << address.get_port() << "\n";
00264 
00265   PT(Connection) connection = new Connection(this, socket);
00266   new_connection(connection);
00267   return connection;
00268 }
00269 
00270 ////////////////////////////////////////////////////////////////////
00271 //     Function: ConnectionManager::open_TCP_client_connection
00272 //       Access: Published
00273 //  Description: This is a shorthand version of the function to
00274 //               directly establish communications to a named host and
00275 //               port.
00276 ////////////////////////////////////////////////////////////////////
00277 PT(Connection) ConnectionManager::
00278 open_TCP_client_connection(const string &hostname, int port,
00279                            int timeout_ms) {
00280   NetAddress address;
00281   if (!address.set_host(hostname, port)) {
00282     return PT(Connection)();
00283   }
00284 
00285   return open_TCP_client_connection(address, timeout_ms);
00286 }
00287 
00288 ////////////////////////////////////////////////////////////////////
00289 //     Function: ConnectionManager::close_connection
00290 //       Access: Published
00291 //  Description: Terminates a UDP or TCP socket previously opened.
00292 //               This also removes it from any associated
00293 //               ConnectionReader or ConnectionListeners.
00294 //
00295 //               The socket itself may not be immediately closed--it
00296 //               will not be closed until all outstanding pointers to
00297 //               it are cleared, including any pointers remaining in
00298 //               NetDatagrams recently received from the socket.
00299 //
00300 //               The return value is true if the connection was marked
00301 //               to be closed, or false if close_connection() had
00302 //               already been called (or the connection did not belong
00303 //               to this ConnectionManager).  In neither case can you
00304 //               infer anything about whether the connection has
00305 //               *actually* been closed yet based on the return value.
00306 ////////////////////////////////////////////////////////////////////
00307 bool ConnectionManager::
00308 close_connection(const PT(Connection) &connection) {
00309   if (connection != (Connection *)NULL) {
00310     connection->flush();
00311   }
00312 
00313   {
00314     LightMutexHolder holder(_set_mutex);
00315     Connections::iterator ci = _connections.find(connection);
00316     if (ci == _connections.end()) {
00317       // Already closed, or not part of this ConnectionManager.
00318       return false;
00319     }
00320     _connections.erase(ci);
00321     
00322     Readers::iterator ri;
00323     for (ri = _readers.begin(); ri != _readers.end(); ++ri) {
00324       (*ri)->remove_connection(connection);
00325     }
00326   }
00327 
00328   Socket_IP *socket = connection->get_socket();
00329 
00330   // We can't *actually* close the connection right now, because
00331   // there might be outstanding pointers to it.  But we can at least
00332   // shut it down.  It will be eventually closed when all the
00333   // pointers let go.
00334   
00335   net_cat.info()
00336     << "Shutting down connection " << (void *)connection
00337     << " locally.\n";
00338   socket->Close();
00339 
00340   return true;
00341 }
00342 
00343 
00344 ////////////////////////////////////////////////////////////////////
00345 //     Function: ConnectionManager::wait_for_readers
00346 //       Access: Published
00347 //  Description: Blocks the process for timeout number of seconds, or
00348 //               until any data is available on any of the
00349 //               non-threaded ConnectionReaders or
00350 //               ConnectionListeners, whichever comes first.  The
00351 //               return value is true if there is data available (but
00352 //               you have to iterate through all readers to find it),
00353 //               or false if the timeout occurred without any data.
00354 //
00355 //               If the timeout value is negative, this will block
00356 //               forever or until data is available.
00357 //
00358 //               This only works if all ConnectionReaders and
00359 //               ConnectionListeners are non-threaded.  If any
00360 //               threaded ConnectionReaders are part of the
00361 //               ConnectionManager, the timeout value is implicitly
00362 //               treated as 0.
00363 ////////////////////////////////////////////////////////////////////
00364 bool ConnectionManager::
00365 wait_for_readers(double timeout) {
00366   bool block_forever = false;
00367   if (timeout < 0.0) {
00368     block_forever = true;
00369     timeout = 0.0;
00370   }
00371 
00372   TrueClock *clock = TrueClock::get_global_ptr();
00373   double now = clock->get_short_time();
00374   double stop = now + timeout;
00375   do {
00376     Socket_fdset fdset;
00377     fdset.clear();
00378     bool any_threaded = false;
00379     
00380     {
00381       LightMutexHolder holder(_set_mutex);
00382       
00383       Readers::iterator ri;
00384       for (ri = _readers.begin(); ri != _readers.end(); ++ri) {
00385         ConnectionReader *reader = (*ri);
00386         if (reader->is_polling()) {
00387           // If it's a polling reader, we can wait for its socket.
00388           // (If it's a threaded reader, we can't do anything here.)
00389           reader->accumulate_fdset(fdset);
00390         } else {
00391           any_threaded = true;
00392           stop = now;
00393           block_forever = false;
00394         }
00395       }
00396     }
00397 
00398     double wait_timeout = get_net_max_block();
00399     if (!block_forever) { 
00400       wait_timeout = min(wait_timeout, stop - now);
00401     }
00402 
00403     PN_uint32 wait_timeout_ms = (PN_uint32)(wait_timeout * 1000.0);
00404     if (any_threaded) {
00405       // If there are any threaded ConnectionReaders, we can't block
00406       // at all.
00407       wait_timeout_ms = 0;
00408     }
00409 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00410     // In the presence of SIMPLE_THREADS, we never wait at all,
00411     // but rather we yield the thread if we come up empty (so that
00412     // we won't block the entire process).
00413     wait_timeout_ms = 0;
00414 #endif
00415     int num_results = fdset.WaitForRead(false, wait_timeout_ms);
00416     if (num_results != 0) {
00417       // If we got an answer (or an error), return success.  The
00418       // caller can then figure out what happened.
00419       if (num_results < 0) {
00420         // Go ahead and yield the timeslice if we got an error.
00421         Thread::force_yield();
00422       }
00423       return true;
00424     }
00425 
00426     // No answer yet, so yield and wait some more.  We don't actually
00427     // block forever, even in the threaded case, so we can detect
00428     // ConnectionReaders being added and removed and such.
00429     Thread::force_yield();
00430 
00431     now = clock->get_short_time();
00432   } while (now < stop || block_forever);
00433 
00434   // Timeout occurred; no data.
00435   return false;
00436 }
00437 
00438 ////////////////////////////////////////////////////////////////////
00439 //     Function: ConnectionManager::get_host_name
00440 //       Access: Published, Static
00441 //  Description: Returns the name of this particular machine on the
00442 //               network, if available, or the empty string if the
00443 //               hostname cannot be determined.
00444 ////////////////////////////////////////////////////////////////////
00445 string ConnectionManager::
00446 get_host_name() {
00447   char temp_buff[1024];
00448   if (gethostname(temp_buff, 1024) == 0) {
00449     return string(temp_buff);
00450   }
00451 
00452   return string();
00453 }
00454 
00455 ////////////////////////////////////////////////////////////////////
00456 //     Function: ConnectionManager::new_connection
00457 //       Access: Protected
00458 //  Description: This internal function is called whenever a new
00459 //               connection is established.  It allows the
00460 //               ConnectionManager to save all of the pointers to open
00461 //               connections so they can't be inadvertently deleted
00462 //               until close_connection() is called.
00463 ////////////////////////////////////////////////////////////////////
00464 void ConnectionManager::
00465 new_connection(const PT(Connection) &connection) {
00466   LightMutexHolder holder(_set_mutex);
00467   _connections.insert(connection);
00468 }
00469 
00470 ////////////////////////////////////////////////////////////////////
00471 //     Function: ConnectionManager::flush_read_connection
00472 //       Access: Protected, Virtual
00473 //  Description: An internal function called by ConnectionWriter only
00474 //               when a write failure has occurred.  This method
00475 //               ensures that all of the read data has been flushed
00476 //               from the pipe before the connection is fully removed.
00477 ////////////////////////////////////////////////////////////////////
00478 void ConnectionManager::
00479 flush_read_connection(Connection *connection) {
00480   Readers readers;
00481   {
00482     LightMutexHolder holder(_set_mutex);
00483     Connections::iterator ci = _connections.find(connection);
00484     if (ci == _connections.end()) {
00485       // Already closed, or not part of this ConnectionManager.
00486       return;
00487     }
00488     _connections.erase(ci);
00489 
00490     // Get a copy first, so we can release the lock before traversing.
00491     readers = _readers;
00492   }
00493   Readers::iterator ri;
00494   for (ri = readers.begin(); ri != readers.end(); ++ri) {
00495     (*ri)->flush_read_connection(connection);
00496   }
00497 
00498   Socket_IP *socket = connection->get_socket();
00499   socket->Close();
00500 }
00501 
00502 ////////////////////////////////////////////////////////////////////
00503 //     Function: ConnectionManager::connection_reset
00504 //       Access: Protected, Virtual
00505 //  Description: An internal function called by the ConnectionReader,
00506 //               ConnectionWriter, or ConnectionListener when a
00507 //               connection has been externally reset.  This adds the
00508 //               connection to the queue of those which have recently
00509 //               been reset.
00510 ////////////////////////////////////////////////////////////////////
00511 void ConnectionManager::
00512 connection_reset(const PT(Connection) &connection, bool okflag) {
00513   if (net_cat.is_info()) {
00514     if (okflag) {
00515       net_cat.info()
00516         << "Connection " << (void *)connection
00517         << " was closed normally by the other end";
00518 
00519     } else {
00520       net_cat.info()
00521         << "Lost connection " << (void *)connection
00522         << " unexpectedly\n";
00523     }
00524   }
00525 
00526   // Turns out we do need to explicitly mark the connection as closed
00527   // immediately, rather than waiting for the user to do it, since
00528   // otherwise we'll keep trying to listen for noise on the socket and
00529   // we'll always hear a "yes" answer.
00530   close_connection(connection);
00531 }
00532 
00533 ////////////////////////////////////////////////////////////////////
00534 //     Function: ConnectionManager::add_reader
00535 //       Access: Protected
00536 //  Description: This internal function is called by ConnectionReader
00537 //               when it is constructed.
00538 ////////////////////////////////////////////////////////////////////
00539 void ConnectionManager::
00540 add_reader(ConnectionReader *reader) {
00541   LightMutexHolder holder(_set_mutex);
00542   _readers.insert(reader);
00543 }
00544 
00545 ////////////////////////////////////////////////////////////////////
00546 //     Function: ConnectionManager::remove_reader
00547 //       Access: Protected
00548 //  Description: This internal function is called by ConnectionReader
00549 //               when it is destructed.
00550 ////////////////////////////////////////////////////////////////////
00551 void ConnectionManager::
00552 remove_reader(ConnectionReader *reader) {
00553   LightMutexHolder holder(_set_mutex);
00554   _readers.erase(reader);
00555 }
00556 
00557 ////////////////////////////////////////////////////////////////////
00558 //     Function: ConnectionManager::add_writer
00559 //       Access: Protected
00560 //  Description: This internal function is called by ConnectionWriter
00561 //               when it is constructed.
00562 ////////////////////////////////////////////////////////////////////
00563 void ConnectionManager::
00564 add_writer(ConnectionWriter *writer) {
00565   LightMutexHolder holder(_set_mutex);
00566   _writers.insert(writer);
00567 }
00568 
00569 ////////////////////////////////////////////////////////////////////
00570 //     Function: ConnectionManager::remove_writer
00571 //       Access: Protected
00572 //  Description: This internal function is called by ConnectionWriter
00573 //               when it is destructed.
00574 ////////////////////////////////////////////////////////////////////
00575 void ConnectionManager::
00576 remove_writer(ConnectionWriter *writer) {
00577   LightMutexHolder holder(_set_mutex);
00578   _writers.erase(writer);
00579 }
 All Classes Functions Variables Enumerations