Panda3D
|
00001 // Filename: connectionReader.cxx 00002 // Created by: drose (08Feb00) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "connectionReader.h" 00016 #include "connectionManager.h" 00017 #include "netDatagram.h" 00018 #include "datagramTCPHeader.h" 00019 #include "datagramUDPHeader.h" 00020 #include "config_net.h" 00021 #include "trueClock.h" 00022 #include "socket_udp.h" 00023 #include "socket_tcp.h" 00024 #include "mutexHolder.h" 00025 #include "lightMutexHolder.h" 00026 #include "pnotify.h" 00027 #include "atomicAdjust.h" 00028 #include "config_downloader.h" 00029 00030 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size; 00031 00032 static const int max_timeout_ms = 100; 00033 00034 //////////////////////////////////////////////////////////////////// 00035 // Function: ConnectionReader::SocketInfo::Constructor 00036 // Access: Public 00037 // Description: 00038 //////////////////////////////////////////////////////////////////// 00039 ConnectionReader::SocketInfo:: 00040 SocketInfo(const PT(Connection) &connection) : 00041 _connection(connection) 00042 { 00043 _busy = false; 00044 _error = false; 00045 } 00046 00047 //////////////////////////////////////////////////////////////////// 00048 // Function: ConnectionReader::SocketInfo::is_udp 00049 // Access: Public 00050 // Description: 00051 //////////////////////////////////////////////////////////////////// 00052 bool ConnectionReader::SocketInfo:: 00053 is_udp() const { 00054 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type())); 00055 } 00056 00057 //////////////////////////////////////////////////////////////////// 00058 // Function: ConnectionReader::SocketInfo::get_socket 00059 // Access: Public 00060 // Description: 00061 //////////////////////////////////////////////////////////////////// 00062 Socket_IP *ConnectionReader::SocketInfo:: 00063 get_socket() const { 00064 return _connection->get_socket(); 00065 } 00066 00067 //////////////////////////////////////////////////////////////////// 00068 // Function: ConnectionReader::ReaderThread::Constructor 00069 // Access: Public 00070 // Description: 00071 //////////////////////////////////////////////////////////////////// 00072 ConnectionReader::ReaderThread:: 00073 ReaderThread(ConnectionReader *reader, const string &thread_name, 00074 int thread_index) : 00075 Thread(make_thread_name(thread_name, thread_index), 00076 make_thread_name(thread_name, thread_index)), 00077 _reader(reader), 00078 _thread_index(thread_index) 00079 { 00080 } 00081 00082 //////////////////////////////////////////////////////////////////// 00083 // Function: ConnectionReader::ReaderThread::thread_main 00084 // Access: Public, Virtual 00085 // Description: 00086 //////////////////////////////////////////////////////////////////// 00087 void ConnectionReader::ReaderThread:: 00088 thread_main() { 00089 _reader->thread_run(_thread_index); 00090 } 00091 00092 //////////////////////////////////////////////////////////////////// 00093 // Function: ConnectionReader::Constructor 00094 // Access: Published 00095 // Description: Creates a new ConnectionReader with the indicated 00096 // number of threads to handle requests. If num_threads 00097 // is 0, the sockets will only be read by polling, 00098 // during an explicit poll() call. 00099 // (QueuedConnectionReader will do this automatically.) 00100 //////////////////////////////////////////////////////////////////// 00101 ConnectionReader:: 00102 ConnectionReader(ConnectionManager *manager, int num_threads, 00103 const string &thread_name) : 00104 _manager(manager) 00105 { 00106 if (!Thread::is_threading_supported()) { 00107 #ifndef NDEBUG 00108 if (num_threads != 0) { 00109 if (net_cat.is_debug()) { 00110 net_cat.debug() 00111 << "Threading support is not available.\n"; 00112 } 00113 } 00114 #endif // NDEBUG 00115 num_threads = 0; 00116 } 00117 00118 _raw_mode = false; 00119 _tcp_header_size = tcp_header_size; 00120 _polling = (num_threads <= 0); 00121 00122 _shutdown = false; 00123 00124 _next_index = 0; 00125 _num_results = 0; 00126 00127 _currently_polling_thread = -1; 00128 00129 string reader_thread_name = thread_name; 00130 if (thread_name.empty()) { 00131 reader_thread_name = "ReaderThread"; 00132 } 00133 int i; 00134 for (i = 0; i < num_threads; i++) { 00135 PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i); 00136 _threads.push_back(thread); 00137 } 00138 for (i = 0; i < num_threads; i++) { 00139 _threads[i]->start(net_thread_priority, true); 00140 } 00141 00142 _manager->add_reader(this); 00143 } 00144 00145 //////////////////////////////////////////////////////////////////// 00146 // Function: ConnectionReader::Destructor 00147 // Access: Published, Virtual 00148 // Description: 00149 //////////////////////////////////////////////////////////////////// 00150 ConnectionReader:: 00151 ~ConnectionReader() { 00152 if (_manager != (ConnectionManager *)NULL) { 00153 _manager->remove_reader(this); 00154 } 00155 00156 shutdown(); 00157 00158 // Delete all of our old sockets. 00159 Sockets::iterator si; 00160 for (si = _sockets.begin(); si != _sockets.end(); ++si) { 00161 delete (*si); 00162 } 00163 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) { 00164 SocketInfo *sinfo = (*si); 00165 if (!sinfo->_busy) { 00166 delete sinfo; 00167 } else { 00168 net_cat.error() 00169 << "Reentrant deletion of ConnectionReader--don't delete these\n" 00170 << "in response to connection_reset().\n"; 00171 00172 // We'll have to do the best we can to recover. 00173 sinfo->_connection.clear(); 00174 } 00175 } 00176 } 00177 00178 //////////////////////////////////////////////////////////////////// 00179 // Function: ConnectionReader::add_connection 00180 // Access: Published 00181 // Description: Adds a new socket to the list of sockets the 00182 // ConnectionReader will monitor. A datagram that comes 00183 // in on any of the monitored sockets will be reported. 00184 // In the case of a ConnectionListener, this adds a new 00185 // rendezvous socket; any activity on any of the 00186 // monitored sockets will cause a connection to be 00187 // accepted. 00188 // 00189 // The return value is true if the connection was added, 00190 // false if it was already there. 00191 // 00192 // add_connection() is thread-safe, and may be called at 00193 // will by any thread. 00194 //////////////////////////////////////////////////////////////////// 00195 bool ConnectionReader:: 00196 add_connection(Connection *connection) { 00197 nassertr(connection != (Connection *)NULL, false); 00198 00199 LightMutexHolder holder(_sockets_mutex); 00200 00201 // Make sure it's not already on the _sockets list. 00202 Sockets::const_iterator si; 00203 for (si = _sockets.begin(); si != _sockets.end(); ++si) { 00204 if ((*si)->_connection == connection) { 00205 // Whoops, already there. 00206 return false; 00207 } 00208 } 00209 00210 _sockets.push_back(new SocketInfo(connection)); 00211 00212 return true; 00213 } 00214 00215 //////////////////////////////////////////////////////////////////// 00216 // Function: ConnectionReader::remove_connection 00217 // Access: Published 00218 // Description: Removes a socket from the list of sockets being 00219 // monitored. Returns true if the socket was correctly 00220 // removed, false if it was not on the list in the first 00221 // place. 00222 // 00223 // remove_connection() is thread-safe, and may be called 00224 // at will by any thread. 00225 //////////////////////////////////////////////////////////////////// 00226 bool ConnectionReader:: 00227 remove_connection(Connection *connection) { 00228 LightMutexHolder holder(_sockets_mutex); 00229 00230 // Walk through the list of sockets to find the one we're removing. 00231 Sockets::iterator si; 00232 si = _sockets.begin(); 00233 while (si != _sockets.end() && (*si)->_connection != connection) { 00234 ++si; 00235 } 00236 if (si == _sockets.end()) { 00237 return false; 00238 } 00239 00240 _removed_sockets.push_back(*si); 00241 _sockets.erase(si); 00242 00243 return true; 00244 } 00245 00246 //////////////////////////////////////////////////////////////////// 00247 // Function: ConnectionReader::is_connection_ok 00248 // Access: Published 00249 // Description: Returns true if the indicated connection has been 00250 // added to the ConnectionReader and is being monitored 00251 // properly, false if it is not known, or if there was 00252 // some error condition detected on the connection. (If 00253 // there was an error condition, normally the 00254 // ConnectionManager would have been informed and closed 00255 // the connection.) 00256 //////////////////////////////////////////////////////////////////// 00257 bool ConnectionReader:: 00258 is_connection_ok(Connection *connection) { 00259 LightMutexHolder holder(_sockets_mutex); 00260 00261 // Walk through the list of sockets to find the one we're asking 00262 // about. 00263 Sockets::iterator si; 00264 si = _sockets.begin(); 00265 while (si != _sockets.end() && (*si)->_connection != connection) { 00266 ++si; 00267 } 00268 if (si == _sockets.end()) { 00269 // Don't know that connection. 00270 return false; 00271 } 00272 00273 SocketInfo *sinfo = (*si); 00274 bool is_ok = !sinfo->_error; 00275 00276 return is_ok; 00277 } 00278 00279 //////////////////////////////////////////////////////////////////// 00280 // Function: ConnectionReader::poll 00281 // Access: Published 00282 // Description: Explicitly polls the available sockets to see if any 00283 // of them have any noise. This function does nothing 00284 // unless this is a polling-type ConnectionReader, 00285 // i.e. it was created with zero threads (and 00286 // is_polling() will return true). 00287 // 00288 // It is not necessary to call this explicitly for a 00289 // QueuedConnectionReader. 00290 //////////////////////////////////////////////////////////////////// 00291 void ConnectionReader:: 00292 poll() { 00293 if (!_polling) { 00294 return; 00295 } 00296 00297 SocketInfo *sinfo = get_next_available_socket(false, -2); 00298 if (sinfo != (SocketInfo *)NULL) { 00299 double max_poll_cycle = get_max_poll_cycle(); 00300 if (max_poll_cycle < 0.0) { 00301 // Continue to read all data. 00302 while (sinfo != (SocketInfo *)NULL) { 00303 process_incoming_data(sinfo); 00304 sinfo = get_next_available_socket(false, -2); 00305 } 00306 00307 } else { 00308 // Read only until a certain amount of time has elapsed. 00309 TrueClock *global_clock = TrueClock::get_global_ptr(); 00310 double stop = global_clock->get_short_time() + max_poll_cycle; 00311 00312 while (sinfo != (SocketInfo *)NULL) { 00313 process_incoming_data(sinfo); 00314 if (global_clock->get_short_time() >= stop) { 00315 return; 00316 } 00317 sinfo = get_next_available_socket(false, -2); 00318 } 00319 } 00320 } 00321 } 00322 00323 //////////////////////////////////////////////////////////////////// 00324 // Function: ConnectionReader::get_manager 00325 // Access: Published 00326 // Description: Returns a pointer to the ConnectionManager object 00327 // that serves this ConnectionReader. 00328 //////////////////////////////////////////////////////////////////// 00329 ConnectionManager *ConnectionReader:: 00330 get_manager() const { 00331 return _manager; 00332 } 00333 00334 //////////////////////////////////////////////////////////////////// 00335 // Function: ConnectionReader::get_num_threads 00336 // Access: Published 00337 // Description: Returns the number of threads the ConnectionReader 00338 // has been created with. 00339 //////////////////////////////////////////////////////////////////// 00340 int ConnectionReader:: 00341 get_num_threads() const { 00342 return _threads.size(); 00343 } 00344 00345 //////////////////////////////////////////////////////////////////// 00346 // Function: ConnectionReader::set_raw_mode 00347 // Access: Published 00348 // Description: Sets the ConnectionReader into raw mode (or turns off 00349 // raw mode). In raw mode, datagram headers are not 00350 // expected; instead, all the data available on the pipe 00351 // is treated as a single datagram. 00352 // 00353 // This is similar to set_tcp_header_size(0), except that it 00354 // also turns off headers for UDP packets. 00355 //////////////////////////////////////////////////////////////////// 00356 void ConnectionReader:: 00357 set_raw_mode(bool mode) { 00358 _raw_mode = mode; 00359 } 00360 00361 //////////////////////////////////////////////////////////////////// 00362 // Function: ConnectionReader::get_raw_mode 00363 // Access: Published 00364 // Description: Returns the current setting of the raw mode flag. 00365 // See set_raw_mode(). 00366 //////////////////////////////////////////////////////////////////// 00367 bool ConnectionReader:: 00368 get_raw_mode() const { 00369 return _raw_mode; 00370 } 00371 00372 //////////////////////////////////////////////////////////////////// 00373 // Function: ConnectionReader::set_tcp_header_size 00374 // Access: Published 00375 // Description: Sets the header size of TCP packets. At the present, 00376 // legal values for this are 0, 2, or 4; this specifies 00377 // the number of bytes to use encode the datagram length 00378 // at the start of each TCP datagram. Sender and 00379 // receiver must independently agree on this. 00380 //////////////////////////////////////////////////////////////////// 00381 void ConnectionReader:: 00382 set_tcp_header_size(int tcp_header_size) { 00383 _tcp_header_size = tcp_header_size; 00384 } 00385 00386 //////////////////////////////////////////////////////////////////// 00387 // Function: ConnectionReader::get_tcp_header_size 00388 // Access: Published 00389 // Description: Returns the current setting of TCP header size. 00390 // See set_tcp_header_size(). 00391 //////////////////////////////////////////////////////////////////// 00392 int ConnectionReader:: 00393 get_tcp_header_size() const { 00394 return _tcp_header_size; 00395 } 00396 00397 //////////////////////////////////////////////////////////////////// 00398 // Function: ConnectionReader::shutdown 00399 // Access: Published 00400 // Description: Terminates all threads cleanly. Normally this is 00401 // only called by the destructor, but it may be called 00402 // explicitly before destruction. 00403 //////////////////////////////////////////////////////////////////// 00404 void ConnectionReader:: 00405 shutdown() { 00406 if (_shutdown) { 00407 return; 00408 } 00409 00410 // First, begin the shutdown. This will tell our threads we want 00411 // them to quit. 00412 _shutdown = true; 00413 00414 // Now wait for all of our threads to terminate. 00415 Threads::iterator ti; 00416 for (ti = _threads.begin(); ti != _threads.end(); ++ti) { 00417 (*ti)->join(); 00418 } 00419 } 00420 00421 //////////////////////////////////////////////////////////////////// 00422 // Function: ConnectionReader::flush_read_connection 00423 // Access: Protected, Virtual 00424 // Description: Attempts to read all the possible data from the 00425 // indicated connection, which has just delivered a 00426 // write error (and has therefore already been closed). 00427 // If the connection is not monitered by this reader, 00428 // does nothing. 00429 //////////////////////////////////////////////////////////////////// 00430 void ConnectionReader:: 00431 flush_read_connection(Connection *connection) { 00432 // Ensure it doesn't get deleted. 00433 SocketInfo sinfo(connection); 00434 00435 if (!remove_connection(connection)) { 00436 // Not already in the reader. 00437 return; 00438 } 00439 00440 // The connection was previously in the reader, but has now been 00441 // removed. Now we can flush it completely. We check if there is 00442 // any read data available on just this one socket; we can do this 00443 // right here in this thread, since we've already removed this 00444 // connection from the reader. 00445 00446 Socket_fdset fdset; 00447 fdset.clear(); 00448 fdset.setForSocket(*(sinfo.get_socket())); 00449 int num_results = fdset.WaitForRead(true, 0); 00450 while (num_results != 0) { 00451 sinfo._busy = true; 00452 if (!process_incoming_data(&sinfo)) { 00453 break; 00454 } 00455 fdset.setForSocket(*(sinfo.get_socket())); 00456 num_results = fdset.WaitForRead(true, 0); 00457 } 00458 } 00459 00460 //////////////////////////////////////////////////////////////////// 00461 // Function: ConnectionReader::clear_manager 00462 // Access: Protected 00463 // Description: This should normally only be called when the 00464 // associated ConnectionManager destructs. It resets 00465 // the ConnectionManager pointer to NULL so we don't 00466 // have a floating pointer. This makes the 00467 // ConnectionReader invalid; presumably it also will be 00468 // destructed momentarily. 00469 //////////////////////////////////////////////////////////////////// 00470 void ConnectionReader:: 00471 clear_manager() { 00472 _manager = (ConnectionManager *)NULL; 00473 } 00474 00475 //////////////////////////////////////////////////////////////////// 00476 // Function: ConnectionReader::finish_socket 00477 // Access: Protected 00478 // Description: To be called when a socket has been fully read and is 00479 // ready for polling for additional data. 00480 //////////////////////////////////////////////////////////////////// 00481 void ConnectionReader:: 00482 finish_socket(SocketInfo *sinfo) { 00483 nassertv(sinfo->_busy); 00484 00485 // By marking the SocketInfo nonbusy, we make it available for 00486 // future polls. 00487 sinfo->_busy = false; 00488 } 00489 00490 //////////////////////////////////////////////////////////////////// 00491 // Function: ConnectionReader::process_incoming_data 00492 // Access: Protected, Virtual 00493 // Description: This is run within a thread when the call to 00494 // select() indicates there is data available on a 00495 // socket. Returns true if the data is read 00496 // successfully, false on failure (for instance, because 00497 // the connection is closed). 00498 //////////////////////////////////////////////////////////////////// 00499 bool ConnectionReader:: 00500 process_incoming_data(SocketInfo *sinfo) { 00501 if (_raw_mode) { 00502 if (sinfo->is_udp()) { 00503 return process_raw_incoming_udp_data(sinfo); 00504 } else { 00505 return process_raw_incoming_tcp_data(sinfo); 00506 } 00507 } else { 00508 if (sinfo->is_udp()) { 00509 return process_incoming_udp_data(sinfo); 00510 } else { 00511 return process_incoming_tcp_data(sinfo); 00512 } 00513 } 00514 } 00515 00516 //////////////////////////////////////////////////////////////////// 00517 // Function: ConnectionReader::process_incoming_udp_data 00518 // Access: Protected 00519 // Description: 00520 //////////////////////////////////////////////////////////////////// 00521 bool ConnectionReader:: 00522 process_incoming_udp_data(SocketInfo *sinfo) { 00523 Socket_UDP *socket; 00524 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00525 Socket_Address addr; 00526 00527 // Read as many bytes as we can. 00528 char buffer[read_buffer_size]; 00529 int bytes_read = read_buffer_size; 00530 00531 bool okflag = socket->GetPacket(buffer, &bytes_read, addr); 00532 00533 if (!okflag) { 00534 finish_socket(sinfo); 00535 return false; 00536 00537 } else if (bytes_read == 0) { 00538 // The socket was closed (!). This shouldn't happen with a UDP 00539 // connection. Oh well. Report that and return. 00540 if (_manager != (ConnectionManager *)NULL) { 00541 _manager->connection_reset(sinfo->_connection, 0); 00542 } 00543 finish_socket(sinfo); 00544 return false; 00545 } 00546 00547 // Since we are not running in raw mode, we decode the header to 00548 // determine how big the datagram is. This means we must have read 00549 // at least a full header. 00550 if (bytes_read < datagram_udp_header_size) { 00551 net_cat.error() 00552 << "Did not read entire header, discarding UDP datagram.\n"; 00553 finish_socket(sinfo); 00554 return true; 00555 } 00556 00557 DatagramUDPHeader header(buffer); 00558 00559 char *dp = buffer + datagram_udp_header_size; 00560 bytes_read -= datagram_udp_header_size; 00561 00562 NetDatagram datagram(dp, bytes_read); 00563 00564 // Now that we've read all the data, it's time to finish the socket 00565 // so another thread can read the next datagram. 00566 finish_socket(sinfo); 00567 00568 if (_shutdown) { 00569 return false; 00570 } 00571 00572 // And now do whatever we need to do to process the datagram. 00573 if (!header.verify_datagram(datagram)) { 00574 net_cat.error() 00575 << "Ignoring invalid UDP datagram.\n"; 00576 } else { 00577 datagram.set_connection(sinfo->_connection); 00578 datagram.set_address(NetAddress(addr)); 00579 00580 if (net_cat.is_spam()) { 00581 net_cat.spam() 00582 << "Received UDP datagram with " 00583 << datagram_udp_header_size + datagram.get_length() 00584 << " bytes on " << (void *)datagram.get_connection() 00585 << " from " << datagram.get_address() << "\n"; 00586 } 00587 00588 receive_datagram(datagram); 00589 } 00590 00591 return true; 00592 } 00593 00594 //////////////////////////////////////////////////////////////////// 00595 // Function: ConnectionReader::process_incoming_tcp_data 00596 // Access: Protected 00597 // Description: 00598 //////////////////////////////////////////////////////////////////// 00599 bool ConnectionReader:: 00600 process_incoming_tcp_data(SocketInfo *sinfo) { 00601 Socket_TCP *socket; 00602 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00603 00604 // Read only the header bytes to start with. 00605 char buffer[read_buffer_size]; 00606 int header_bytes_read = 0; 00607 00608 // First, we have to read the first _tcp_header_size bytes. 00609 while (header_bytes_read < _tcp_header_size) { 00610 int bytes_read = 00611 socket->RecvData(buffer + header_bytes_read, 00612 _tcp_header_size - header_bytes_read); 00613 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00614 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 00615 socket->Active()) { 00616 Thread::force_yield(); 00617 bytes_read = socket->RecvData(buffer + header_bytes_read, 00618 _tcp_header_size - header_bytes_read); 00619 } 00620 #endif // SIMPLE_THREADS 00621 00622 if (bytes_read <= 0) { 00623 // The socket was closed. Report that and return. 00624 if (_manager != (ConnectionManager *)NULL) { 00625 _manager->connection_reset(sinfo->_connection, 0); 00626 } 00627 finish_socket(sinfo); 00628 return false; 00629 } 00630 00631 header_bytes_read += bytes_read; 00632 Thread::consider_yield(); 00633 } 00634 00635 // Now we must decode the header to determine how big the datagram 00636 // is. This means we must have read at least a full header. 00637 if (header_bytes_read != _tcp_header_size) { 00638 // This should actually be impossible, by the read-loop logic 00639 // above. 00640 net_cat.error() 00641 << "Did not read entire header, discarding TCP datagram.\n"; 00642 finish_socket(sinfo); 00643 return true; 00644 } 00645 00646 DatagramTCPHeader header(buffer, _tcp_header_size); 00647 int size = header.get_datagram_size(_tcp_header_size); 00648 00649 // We have to loop until the entire datagram is read. 00650 NetDatagram datagram; 00651 00652 while (!_shutdown && (int)datagram.get_length() < size) { 00653 int bytes_read; 00654 00655 int read_bytes = read_buffer_size; 00656 #ifdef SIMPLE_THREADS 00657 // In the SIMPLE_THREADS case, we want to limit the number of 00658 // bytes we read in a single epoch, to minimize the impact on the 00659 // other threads. 00660 read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch); 00661 #endif 00662 00663 bytes_read = 00664 socket->RecvData(buffer, min(read_bytes, 00665 (int)(size - datagram.get_length()))); 00666 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00667 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 00668 socket->Active()) { 00669 Thread::force_yield(); 00670 bytes_read = 00671 socket->RecvData(buffer, min(read_bytes, 00672 (int)(size - datagram.get_length()))); 00673 } 00674 #endif // SIMPLE_THREADS 00675 00676 char *dp = buffer; 00677 00678 if (bytes_read <= 0) { 00679 // The socket was closed. Report that and return. 00680 if (_manager != (ConnectionManager *)NULL) { 00681 _manager->connection_reset(sinfo->_connection, 0); 00682 } 00683 finish_socket(sinfo); 00684 return false; 00685 } 00686 00687 int datagram_bytes = 00688 min(bytes_read, (int)(size - datagram.get_length())); 00689 datagram.append_data(dp, datagram_bytes); 00690 00691 if (bytes_read > datagram_bytes) { 00692 // There were some extra bytes at the end of the datagram. Maybe 00693 // the beginning of the next datagram? Huh. 00694 net_cat.error() 00695 << "Discarding " << bytes_read - datagram_bytes 00696 << " bytes following TCP datagram.\n"; 00697 } 00698 Thread::consider_yield(); 00699 } 00700 00701 // Now that we've read all the data, it's time to finish the socket 00702 // so another thread can read the next datagram. 00703 finish_socket(sinfo); 00704 00705 if (_shutdown) { 00706 return false; 00707 } 00708 00709 // And now do whatever we need to do to process the datagram. 00710 if (!header.verify_datagram(datagram, _tcp_header_size)) { 00711 net_cat.error() 00712 << "Ignoring invalid TCP datagram.\n"; 00713 } else { 00714 datagram.set_connection(sinfo->_connection); 00715 datagram.set_address(NetAddress(socket->GetPeerName())); 00716 00717 if (net_cat.is_spam()) { 00718 net_cat.spam() 00719 << "Received TCP datagram with " 00720 << _tcp_header_size + datagram.get_length() 00721 << " bytes on " << (void *)datagram.get_connection() 00722 << " from " << datagram.get_address() << "\n"; 00723 } 00724 00725 receive_datagram(datagram); 00726 } 00727 00728 return true; 00729 } 00730 00731 //////////////////////////////////////////////////////////////////// 00732 // Function: ConnectionReader::process_raw_incoming_udp_data 00733 // Access: Protected 00734 // Description: 00735 //////////////////////////////////////////////////////////////////// 00736 bool ConnectionReader:: 00737 process_raw_incoming_udp_data(SocketInfo *sinfo) { 00738 Socket_UDP *socket; 00739 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00740 Socket_Address addr; 00741 00742 // Read as many bytes as we can. 00743 char buffer[read_buffer_size]; 00744 int bytes_read = read_buffer_size; 00745 00746 bool okflag = socket->GetPacket(buffer, &bytes_read, addr); 00747 00748 if (!okflag) { 00749 finish_socket(sinfo); 00750 return false; 00751 00752 } else if (bytes_read == 0) { 00753 // The socket was closed (!). This shouldn't happen with a UDP 00754 // connection. Oh well. Report that and return. 00755 if (_manager != (ConnectionManager *)NULL) { 00756 _manager->connection_reset(sinfo->_connection, 0); 00757 } 00758 finish_socket(sinfo); 00759 return false; 00760 } 00761 00762 // In raw mode, we simply extract all the bytes and make that a 00763 // datagram. 00764 NetDatagram datagram(buffer, bytes_read); 00765 00766 // Now that we've read all the data, it's time to finish the socket 00767 // so another thread can read the next datagram. 00768 finish_socket(sinfo); 00769 00770 if (_shutdown) { 00771 return false; 00772 } 00773 00774 datagram.set_connection(sinfo->_connection); 00775 datagram.set_address(NetAddress(addr)); 00776 00777 if (net_cat.is_spam()) { 00778 net_cat.spam() 00779 << "Received raw UDP datagram with " << datagram.get_length() 00780 << " bytes on " << (void *)datagram.get_connection() 00781 << " from " << datagram.get_address() << "\n"; 00782 } 00783 00784 receive_datagram(datagram); 00785 00786 return true; 00787 } 00788 00789 //////////////////////////////////////////////////////////////////// 00790 // Function: ConnectionReader::process_raw_incoming_tcp_data 00791 // Access: Protected 00792 // Description: 00793 //////////////////////////////////////////////////////////////////// 00794 bool ConnectionReader:: 00795 process_raw_incoming_tcp_data(SocketInfo *sinfo) { 00796 Socket_TCP *socket; 00797 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00798 00799 // Read as many bytes as we can. 00800 char buffer[read_buffer_size]; 00801 int bytes_read = socket->RecvData(buffer, read_buffer_size); 00802 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00803 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 00804 socket->Active()) { 00805 Thread::force_yield(); 00806 bytes_read = socket->RecvData(buffer, read_buffer_size); 00807 } 00808 #endif // SIMPLE_THREADS 00809 00810 if (bytes_read <= 0) { 00811 // The socket was closed. Report that and return. 00812 if (_manager != (ConnectionManager *)NULL) { 00813 _manager->connection_reset(sinfo->_connection, 0); 00814 } 00815 finish_socket(sinfo); 00816 return false; 00817 } 00818 00819 // In raw mode, we simply extract all the bytes and make that a 00820 // datagram. 00821 NetDatagram datagram(buffer, bytes_read); 00822 00823 // Now that we've read all the data, it's time to finish the socket 00824 // so another thread can read the next datagram. 00825 finish_socket(sinfo); 00826 00827 if (_shutdown) { 00828 return false; 00829 } 00830 00831 datagram.set_connection(sinfo->_connection); 00832 datagram.set_address(NetAddress(socket->GetPeerName())); 00833 00834 if (net_cat.is_spam()) { 00835 net_cat.spam() 00836 << "Received raw TCP datagram with " << datagram.get_length() 00837 << " bytes on " << (void *)datagram.get_connection() 00838 << " from " << datagram.get_address() << "\n"; 00839 } 00840 00841 receive_datagram(datagram); 00842 00843 return true; 00844 } 00845 00846 //////////////////////////////////////////////////////////////////// 00847 // Function: ConnectionReader::thread_run 00848 // Access: Private 00849 // Description: This is the actual executing function for each 00850 // thread. 00851 //////////////////////////////////////////////////////////////////// 00852 void ConnectionReader:: 00853 thread_run(int thread_index) { 00854 nassertv(!_polling); 00855 nassertv(_threads[thread_index] == Thread::get_current_thread()); 00856 00857 while (!_shutdown) { 00858 SocketInfo *sinfo = 00859 get_next_available_socket(true, thread_index); 00860 if (sinfo != (SocketInfo *)NULL) { 00861 process_incoming_data(sinfo); 00862 Thread::consider_yield(); 00863 } else { 00864 Thread::force_yield(); 00865 } 00866 } 00867 } 00868 00869 00870 //////////////////////////////////////////////////////////////////// 00871 // Function: ConnectionReader::get_next_available_socket 00872 // Access: Private 00873 // Description: Polls the known connections for activity and returns 00874 // the next one known to have activity, or NULL if no 00875 // activity is detected within the timeout interval. 00876 // 00877 // This function may block indefinitely if it is being 00878 // called by multiple threads; if there are no other 00879 // threads, it may block only if allow_block is true. 00880 //////////////////////////////////////////////////////////////////// 00881 ConnectionReader::SocketInfo *ConnectionReader:: 00882 get_next_available_socket(bool allow_block, int current_thread_index) { 00883 // Go to sleep on the select() mutex. This guarantees that only one 00884 // thread is in this function at a time. 00885 MutexHolder holder(_select_mutex); 00886 00887 do { 00888 // First, check the result from the previous select call. If 00889 // there are any sockets remaining there, process them first. 00890 while (!_shutdown && _num_results > 0) { 00891 nassertr(_next_index < (int)_selecting_sockets.size(), NULL); 00892 int i = _next_index; 00893 _next_index++; 00894 00895 if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) { 00896 _num_results--; 00897 SocketInfo *sinfo = _selecting_sockets[i]; 00898 00899 // Some noise on this socket. 00900 sinfo->_busy = true; 00901 return sinfo; 00902 } 00903 } 00904 00905 bool interrupted; 00906 do { 00907 interrupted = false; 00908 00909 // Ok, no results from previous select calls. Prepare to set up 00910 // for a new select. 00911 00912 // First, report to anyone else who cares that we're the thread 00913 // about to do the poll. That way, if any new sockets come 00914 // available while we're polling, we can service them. 00915 AtomicAdjust::set(_currently_polling_thread, current_thread_index); 00916 00917 rebuild_select_list(); 00918 00919 // Now we can execute the select. 00920 _num_results = 0; 00921 _next_index = 0; 00922 00923 if (!_shutdown) { 00924 PN_uint32 timeout = max_timeout_ms; 00925 if (!allow_block) { 00926 timeout = 0; 00927 } 00928 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00929 // In the presence of SIMPLE_THREADS, we never wait at all, 00930 // but rather we yield the thread if we come up empty (so that 00931 // we won't block the entire process). 00932 timeout = 0; 00933 #endif 00934 00935 _num_results = _fdset.WaitForRead(false, timeout); 00936 } 00937 00938 if (_num_results == 0 && allow_block) { 00939 // If we reached max_timeout_ms, go back and reconsider. (We 00940 // never timeout indefinitely, so we can check the shutdown 00941 // flag every once in a while.) 00942 interrupted = true; 00943 Thread::force_yield(); 00944 00945 } else if (_num_results < 0) { 00946 // If we had an error, just return. 00947 return (SocketInfo *)NULL; 00948 } 00949 } while (!_shutdown && interrupted); 00950 00951 AtomicAdjust::set(_currently_polling_thread, current_thread_index); 00952 00953 // Repeat the above until we (a) find a socket with actual noise 00954 // on it, or (b) return from PR_Poll() with no sockets available. 00955 } while (!_shutdown && _num_results > 0); 00956 00957 return (SocketInfo *)NULL; 00958 } 00959 00960 00961 //////////////////////////////////////////////////////////////////// 00962 // Function: ConnectionReader::rebuild_select_list 00963 // Access: Private 00964 // Description: Rebuilds the _fdset and _selecting_sockets arrays 00965 // based on the sockets that are currently available for 00966 // selecting. 00967 //////////////////////////////////////////////////////////////////// 00968 void ConnectionReader:: 00969 rebuild_select_list() { 00970 _fdset.clear(); 00971 _selecting_sockets.clear(); 00972 00973 LightMutexHolder holder(_sockets_mutex); 00974 Sockets::const_iterator si; 00975 for (si = _sockets.begin(); si != _sockets.end(); ++si) { 00976 SocketInfo *sinfo = (*si); 00977 if (!sinfo->_busy && !sinfo->_error) { 00978 _fdset.setForSocket(*sinfo->get_socket()); 00979 _selecting_sockets.push_back(sinfo); 00980 } 00981 } 00982 00983 // This is also a fine time to delete the contents of the 00984 // _removed_sockets list. 00985 if (!_removed_sockets.empty()) { 00986 Sockets still_busy_sockets; 00987 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) { 00988 SocketInfo *sinfo = (*si); 00989 if (sinfo->_busy) { 00990 still_busy_sockets.push_back(sinfo); 00991 } else { 00992 delete sinfo; 00993 } 00994 } 00995 _removed_sockets.swap(still_busy_sockets); 00996 } 00997 }