Panda3D
|
00001 // Filename: connectionManager.cxx 00002 // Created by: jns (07Feb00) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "connectionManager.h" 00016 #include "connection.h" 00017 #include "connectionReader.h" 00018 #include "connectionWriter.h" 00019 #include "netAddress.h" 00020 #include "config_net.h" 00021 #include "lightMutexHolder.h" 00022 #include "trueClock.h" 00023 00024 #if defined(WIN32_VC) || defined(WIN64_VC) 00025 #include <winsock2.h> // For gethostname() 00026 #endif 00027 00028 //////////////////////////////////////////////////////////////////// 00029 // Function: ConnectionManager::Constructor 00030 // Access: Published 00031 // Description: 00032 //////////////////////////////////////////////////////////////////// 00033 ConnectionManager:: 00034 ConnectionManager() : _set_mutex("ConnectionManager::_set_mutex") 00035 { 00036 } 00037 00038 //////////////////////////////////////////////////////////////////// 00039 // Function: ConnectionManager::Destructor 00040 // Access: Published, Virtual 00041 // Description: 00042 //////////////////////////////////////////////////////////////////// 00043 ConnectionManager:: 00044 ~ConnectionManager() { 00045 // Notify all of our associated readers and writers that we're gone. 00046 Readers::iterator ri; 00047 for (ri = _readers.begin(); ri != _readers.end(); ++ri) { 00048 (*ri)->clear_manager(); 00049 } 00050 Writers::iterator wi; 00051 for (wi = _writers.begin(); wi != _writers.end(); ++wi) { 00052 (*wi)->clear_manager(); 00053 } 00054 } 00055 00056 00057 //////////////////////////////////////////////////////////////////// 00058 // Function: ConnectionManager::open_UDP_connection 00059 // Access: Published 00060 // Description: Opens a socket for sending and/or receiving UDP 00061 // packets. If the port number is greater than zero, 00062 // the UDP connection will be opened for listening on 00063 // the indicated port; otherwise, it will be useful only 00064 // for sending. 00065 // 00066 // Use a ConnectionReader and ConnectionWriter to handle 00067 // the actual communication. 00068 //////////////////////////////////////////////////////////////////// 00069 PT(Connection) ConnectionManager:: 00070 open_UDP_connection(int port) { 00071 Socket_UDP *socket = new Socket_UDP; 00072 00073 if (port > 0) { 00074 NetAddress address; 00075 address.set_any(port); 00076 00077 if (!socket->OpenForInput(address.get_addr())) { 00078 net_cat.error() 00079 << "Unable to bind to port " << port << " for UDP.\n"; 00080 delete socket; 00081 return PT(Connection)(); 00082 } 00083 00084 net_cat.info() 00085 << "Creating UDP connection for port " << port << "\n"; 00086 00087 } else { 00088 if (!socket->InitNoAddress()) { 00089 net_cat.error() 00090 << "Unable to initialize outgoing UDP.\n"; 00091 delete socket; 00092 return PT(Connection)(); 00093 } 00094 00095 net_cat.info() 00096 << "Creating outgoing UDP connection\n"; 00097 } 00098 00099 PT(Connection) connection = new Connection(this, socket); 00100 new_connection(connection); 00101 return connection; 00102 } 00103 00104 00105 00106 //////////////////////////////////////////////////////////////////// 00107 // Function: ConnectionManager::open_TCP_server_rendezvous 00108 // Access: Published 00109 // Description: Creates a socket to be used as a rendezvous socket 00110 // for a server to listen for TCP connections. The 00111 // socket returned by this call should only be added to 00112 // a ConnectionListener (not to a generic 00113 // ConnectionReader). 00114 // 00115 // This variant of this method accepts a single port, 00116 // and will listen to that port on all available 00117 // interfaces. 00118 // 00119 // backlog is the maximum length of the queue of pending 00120 // connections. 00121 //////////////////////////////////////////////////////////////////// 00122 PT(Connection) ConnectionManager:: 00123 open_TCP_server_rendezvous(int port, int backlog) { 00124 NetAddress address; 00125 address.set_any(port); 00126 return open_TCP_server_rendezvous(address, backlog); 00127 } 00128 00129 //////////////////////////////////////////////////////////////////// 00130 // Function: ConnectionManager::open_TCP_server_rendezvous 00131 // Access: Published 00132 // Description: Creates a socket to be used as a rendezvous socket 00133 // for a server to listen for TCP connections. The 00134 // socket returned by this call should only be added to 00135 // a ConnectionListener (not to a generic 00136 // ConnectionReader). 00137 // 00138 // This variant of this method accepts a "hostname", 00139 // which is usually just an IP address in dotted 00140 // notation, and a port number. It will listen on the 00141 // interface indicated by the IP address. If the IP 00142 // address is empty string, it will listen on all 00143 // interfaces. 00144 // 00145 // backlog is the maximum length of the queue of pending 00146 // connections. 00147 //////////////////////////////////////////////////////////////////// 00148 PT(Connection) ConnectionManager:: 00149 open_TCP_server_rendezvous(const string &hostname, int port, int backlog) { 00150 NetAddress address; 00151 if (hostname.empty()) { 00152 address.set_any(port); 00153 } else { 00154 address.set_host(hostname, port); 00155 } 00156 return open_TCP_server_rendezvous(address, backlog); 00157 } 00158 00159 //////////////////////////////////////////////////////////////////// 00160 // Function: ConnectionManager::open_TCP_server_rendezvous 00161 // Access: Published 00162 // Description: Creates a socket to be used as a rendezvous socket 00163 // for a server to listen for TCP connections. The 00164 // socket returned by this call should only be added to 00165 // a ConnectionListener (not to a generic 00166 // ConnectionReader). 00167 // 00168 // This variant of this method accepts a NetAddress, 00169 // which allows you to specify a specific interface to 00170 // listen to. 00171 // 00172 // backlog is the maximum length of the queue of pending 00173 // connections. 00174 //////////////////////////////////////////////////////////////////// 00175 PT(Connection) ConnectionManager:: 00176 open_TCP_server_rendezvous(const NetAddress &address, int backlog) { 00177 ostringstream strm; 00178 if (address.get_ip() == 0) { 00179 strm << "port " << address.get_port(); 00180 } else { 00181 strm << address.get_ip_string() << ":" << address.get_port(); 00182 } 00183 00184 Socket_TCP_Listen *socket = new Socket_TCP_Listen; 00185 bool okflag = socket->OpenForListen(address.get_addr(), backlog); 00186 if (!okflag) { 00187 net_cat.info() 00188 << "Unable to listen to " << strm.str() << " for TCP.\n"; 00189 delete socket; 00190 return PT(Connection)(); 00191 } 00192 00193 net_cat.info() 00194 << "Listening for TCP connections on " << strm.str() << "\n"; 00195 00196 PT(Connection) connection = new Connection(this, socket); 00197 new_connection(connection); 00198 return connection; 00199 } 00200 00201 //////////////////////////////////////////////////////////////////// 00202 // Function: ConnectionManager::open_TCP_client_connection 00203 // Access: Published 00204 // Description: Attempts to establish a TCP client connection to a 00205 // server at the indicated address. If the connection 00206 // is not established within timeout_ms milliseconds, a 00207 // null connection is returned. 00208 //////////////////////////////////////////////////////////////////// 00209 PT(Connection) ConnectionManager:: 00210 open_TCP_client_connection(const NetAddress &address, int timeout_ms) { 00211 Socket_TCP *socket = new Socket_TCP; 00212 00213 // We always open the connection with non-blocking mode first, so we 00214 // can implement the timeout. 00215 bool okflag = socket->ActiveOpenNonBlocking(address.get_addr()); 00216 if (okflag && socket->GetLastError() == LOCAL_CONNECT_BLOCKING) { 00217 // Now wait for the socket to connect. 00218 TrueClock *clock = TrueClock::get_global_ptr(); 00219 double start = clock->get_short_time(); 00220 Thread::force_yield(); 00221 Socket_fdset fset; 00222 fset.setForSocket(*socket); 00223 int ready = fset.WaitForWrite(true, 0); 00224 while (ready == 0) { 00225 double elapsed = clock->get_short_time() - start; 00226 if (elapsed * 1000.0 > timeout_ms) { 00227 // Timeout. 00228 okflag = false; 00229 break; 00230 } 00231 Thread::force_yield(); 00232 fset.setForSocket(*socket); 00233 ready = fset.WaitForWrite(true, 0); 00234 } 00235 } 00236 00237 if (okflag) { 00238 // So, the connect() operation finished, but did it succeed or fail? 00239 if (socket->GetPeerName().GetIPAddressRaw() == 0) { 00240 // No peer means it failed. 00241 okflag = false; 00242 } 00243 } 00244 00245 if (!okflag) { 00246 net_cat.error() 00247 << "Unable to open TCP connection to server " 00248 << address.get_ip_string() << " on port " << address.get_port() << "\n"; 00249 delete socket; 00250 return PT(Connection)(); 00251 } 00252 00253 #if !defined(HAVE_THREADS) || !defined(SIMPLE_THREADS) 00254 // Now we have opened the socket in nonblocking mode. Unless we're 00255 // using SIMPLE_THREADS, though, we really want the socket in 00256 // blocking mode (since that's what we support here). Change it. 00257 socket->SetBlocking(); 00258 00259 #endif // SIMPLE_THREADS 00260 00261 net_cat.info() 00262 << "Opened TCP connection to server " << address.get_ip_string() << " " 00263 << " on port " << address.get_port() << "\n"; 00264 00265 PT(Connection) connection = new Connection(this, socket); 00266 new_connection(connection); 00267 return connection; 00268 } 00269 00270 //////////////////////////////////////////////////////////////////// 00271 // Function: ConnectionManager::open_TCP_client_connection 00272 // Access: Published 00273 // Description: This is a shorthand version of the function to 00274 // directly establish communications to a named host and 00275 // port. 00276 //////////////////////////////////////////////////////////////////// 00277 PT(Connection) ConnectionManager:: 00278 open_TCP_client_connection(const string &hostname, int port, 00279 int timeout_ms) { 00280 NetAddress address; 00281 if (!address.set_host(hostname, port)) { 00282 return PT(Connection)(); 00283 } 00284 00285 return open_TCP_client_connection(address, timeout_ms); 00286 } 00287 00288 //////////////////////////////////////////////////////////////////// 00289 // Function: ConnectionManager::close_connection 00290 // Access: Published 00291 // Description: Terminates a UDP or TCP socket previously opened. 00292 // This also removes it from any associated 00293 // ConnectionReader or ConnectionListeners. 00294 // 00295 // The socket itself may not be immediately closed--it 00296 // will not be closed until all outstanding pointers to 00297 // it are cleared, including any pointers remaining in 00298 // NetDatagrams recently received from the socket. 00299 // 00300 // The return value is true if the connection was marked 00301 // to be closed, or false if close_connection() had 00302 // already been called (or the connection did not belong 00303 // to this ConnectionManager). In neither case can you 00304 // infer anything about whether the connection has 00305 // *actually* been closed yet based on the return value. 00306 //////////////////////////////////////////////////////////////////// 00307 bool ConnectionManager:: 00308 close_connection(const PT(Connection) &connection) { 00309 if (connection != (Connection *)NULL) { 00310 connection->flush(); 00311 } 00312 00313 { 00314 LightMutexHolder holder(_set_mutex); 00315 Connections::iterator ci = _connections.find(connection); 00316 if (ci == _connections.end()) { 00317 // Already closed, or not part of this ConnectionManager. 00318 return false; 00319 } 00320 _connections.erase(ci); 00321 00322 Readers::iterator ri; 00323 for (ri = _readers.begin(); ri != _readers.end(); ++ri) { 00324 (*ri)->remove_connection(connection); 00325 } 00326 } 00327 00328 Socket_IP *socket = connection->get_socket(); 00329 00330 // We can't *actually* close the connection right now, because 00331 // there might be outstanding pointers to it. But we can at least 00332 // shut it down. It will be eventually closed when all the 00333 // pointers let go. 00334 00335 net_cat.info() 00336 << "Shutting down connection " << (void *)connection 00337 << " locally.\n"; 00338 socket->Close(); 00339 00340 return true; 00341 } 00342 00343 00344 //////////////////////////////////////////////////////////////////// 00345 // Function: ConnectionManager::wait_for_readers 00346 // Access: Published 00347 // Description: Blocks the process for timeout number of seconds, or 00348 // until any data is available on any of the 00349 // non-threaded ConnectionReaders or 00350 // ConnectionListeners, whichever comes first. The 00351 // return value is true if there is data available (but 00352 // you have to iterate through all readers to find it), 00353 // or false if the timeout occurred without any data. 00354 // 00355 // If the timeout value is negative, this will block 00356 // forever or until data is available. 00357 // 00358 // This only works if all ConnectionReaders and 00359 // ConnectionListeners are non-threaded. If any 00360 // threaded ConnectionReaders are part of the 00361 // ConnectionManager, the timeout value is implicitly 00362 // treated as 0. 00363 //////////////////////////////////////////////////////////////////// 00364 bool ConnectionManager:: 00365 wait_for_readers(double timeout) { 00366 bool block_forever = false; 00367 if (timeout < 0.0) { 00368 block_forever = true; 00369 timeout = 0.0; 00370 } 00371 00372 TrueClock *clock = TrueClock::get_global_ptr(); 00373 double now = clock->get_short_time(); 00374 double stop = now + timeout; 00375 do { 00376 Socket_fdset fdset; 00377 fdset.clear(); 00378 bool any_threaded = false; 00379 00380 { 00381 LightMutexHolder holder(_set_mutex); 00382 00383 Readers::iterator ri; 00384 for (ri = _readers.begin(); ri != _readers.end(); ++ri) { 00385 ConnectionReader *reader = (*ri); 00386 if (reader->is_polling()) { 00387 // If it's a polling reader, we can wait for its socket. 00388 // (If it's a threaded reader, we can't do anything here.) 00389 reader->accumulate_fdset(fdset); 00390 } else { 00391 any_threaded = true; 00392 stop = now; 00393 block_forever = false; 00394 } 00395 } 00396 } 00397 00398 double wait_timeout = get_net_max_block(); 00399 if (!block_forever) { 00400 wait_timeout = min(wait_timeout, stop - now); 00401 } 00402 00403 PN_uint32 wait_timeout_ms = (PN_uint32)(wait_timeout * 1000.0); 00404 if (any_threaded) { 00405 // If there are any threaded ConnectionReaders, we can't block 00406 // at all. 00407 wait_timeout_ms = 0; 00408 } 00409 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00410 // In the presence of SIMPLE_THREADS, we never wait at all, 00411 // but rather we yield the thread if we come up empty (so that 00412 // we won't block the entire process). 00413 wait_timeout_ms = 0; 00414 #endif 00415 int num_results = fdset.WaitForRead(false, wait_timeout_ms); 00416 if (num_results != 0) { 00417 // If we got an answer (or an error), return success. The 00418 // caller can then figure out what happened. 00419 if (num_results < 0) { 00420 // Go ahead and yield the timeslice if we got an error. 00421 Thread::force_yield(); 00422 } 00423 return true; 00424 } 00425 00426 // No answer yet, so yield and wait some more. We don't actually 00427 // block forever, even in the threaded case, so we can detect 00428 // ConnectionReaders being added and removed and such. 00429 Thread::force_yield(); 00430 00431 now = clock->get_short_time(); 00432 } while (now < stop || block_forever); 00433 00434 // Timeout occurred; no data. 00435 return false; 00436 } 00437 00438 //////////////////////////////////////////////////////////////////// 00439 // Function: ConnectionManager::get_host_name 00440 // Access: Published, Static 00441 // Description: Returns the name of this particular machine on the 00442 // network, if available, or the empty string if the 00443 // hostname cannot be determined. 00444 //////////////////////////////////////////////////////////////////// 00445 string ConnectionManager:: 00446 get_host_name() { 00447 char temp_buff[1024]; 00448 if (gethostname(temp_buff, 1024) == 0) { 00449 return string(temp_buff); 00450 } 00451 00452 return string(); 00453 } 00454 00455 //////////////////////////////////////////////////////////////////// 00456 // Function: ConnectionManager::new_connection 00457 // Access: Protected 00458 // Description: This internal function is called whenever a new 00459 // connection is established. It allows the 00460 // ConnectionManager to save all of the pointers to open 00461 // connections so they can't be inadvertently deleted 00462 // until close_connection() is called. 00463 //////////////////////////////////////////////////////////////////// 00464 void ConnectionManager:: 00465 new_connection(const PT(Connection) &connection) { 00466 LightMutexHolder holder(_set_mutex); 00467 _connections.insert(connection); 00468 } 00469 00470 //////////////////////////////////////////////////////////////////// 00471 // Function: ConnectionManager::flush_read_connection 00472 // Access: Protected, Virtual 00473 // Description: An internal function called by ConnectionWriter only 00474 // when a write failure has occurred. This method 00475 // ensures that all of the read data has been flushed 00476 // from the pipe before the connection is fully removed. 00477 //////////////////////////////////////////////////////////////////// 00478 void ConnectionManager:: 00479 flush_read_connection(Connection *connection) { 00480 Readers readers; 00481 { 00482 LightMutexHolder holder(_set_mutex); 00483 Connections::iterator ci = _connections.find(connection); 00484 if (ci == _connections.end()) { 00485 // Already closed, or not part of this ConnectionManager. 00486 return; 00487 } 00488 _connections.erase(ci); 00489 00490 // Get a copy first, so we can release the lock before traversing. 00491 readers = _readers; 00492 } 00493 Readers::iterator ri; 00494 for (ri = readers.begin(); ri != readers.end(); ++ri) { 00495 (*ri)->flush_read_connection(connection); 00496 } 00497 00498 Socket_IP *socket = connection->get_socket(); 00499 socket->Close(); 00500 } 00501 00502 //////////////////////////////////////////////////////////////////// 00503 // Function: ConnectionManager::connection_reset 00504 // Access: Protected, Virtual 00505 // Description: An internal function called by the ConnectionReader, 00506 // ConnectionWriter, or ConnectionListener when a 00507 // connection has been externally reset. This adds the 00508 // connection to the queue of those which have recently 00509 // been reset. 00510 //////////////////////////////////////////////////////////////////// 00511 void ConnectionManager:: 00512 connection_reset(const PT(Connection) &connection, bool okflag) { 00513 if (net_cat.is_info()) { 00514 if (okflag) { 00515 net_cat.info() 00516 << "Connection " << (void *)connection 00517 << " was closed normally by the other end"; 00518 00519 } else { 00520 net_cat.info() 00521 << "Lost connection " << (void *)connection 00522 << " unexpectedly\n"; 00523 } 00524 } 00525 00526 // Turns out we do need to explicitly mark the connection as closed 00527 // immediately, rather than waiting for the user to do it, since 00528 // otherwise we'll keep trying to listen for noise on the socket and 00529 // we'll always hear a "yes" answer. 00530 close_connection(connection); 00531 } 00532 00533 //////////////////////////////////////////////////////////////////// 00534 // Function: ConnectionManager::add_reader 00535 // Access: Protected 00536 // Description: This internal function is called by ConnectionReader 00537 // when it is constructed. 00538 //////////////////////////////////////////////////////////////////// 00539 void ConnectionManager:: 00540 add_reader(ConnectionReader *reader) { 00541 LightMutexHolder holder(_set_mutex); 00542 _readers.insert(reader); 00543 } 00544 00545 //////////////////////////////////////////////////////////////////// 00546 // Function: ConnectionManager::remove_reader 00547 // Access: Protected 00548 // Description: This internal function is called by ConnectionReader 00549 // when it is destructed. 00550 //////////////////////////////////////////////////////////////////// 00551 void ConnectionManager:: 00552 remove_reader(ConnectionReader *reader) { 00553 LightMutexHolder holder(_set_mutex); 00554 _readers.erase(reader); 00555 } 00556 00557 //////////////////////////////////////////////////////////////////// 00558 // Function: ConnectionManager::add_writer 00559 // Access: Protected 00560 // Description: This internal function is called by ConnectionWriter 00561 // when it is constructed. 00562 //////////////////////////////////////////////////////////////////// 00563 void ConnectionManager:: 00564 add_writer(ConnectionWriter *writer) { 00565 LightMutexHolder holder(_set_mutex); 00566 _writers.insert(writer); 00567 } 00568 00569 //////////////////////////////////////////////////////////////////// 00570 // Function: ConnectionManager::remove_writer 00571 // Access: Protected 00572 // Description: This internal function is called by ConnectionWriter 00573 // when it is destructed. 00574 //////////////////////////////////////////////////////////////////// 00575 void ConnectionManager:: 00576 remove_writer(ConnectionWriter *writer) { 00577 LightMutexHolder holder(_set_mutex); 00578 _writers.erase(writer); 00579 }