Panda3D
|
00001 // Filename: connectionReader.cxx 00002 // Created by: drose (08Feb00) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "connectionReader.h" 00016 #include "connectionManager.h" 00017 #include "netDatagram.h" 00018 #include "datagramTCPHeader.h" 00019 #include "datagramUDPHeader.h" 00020 #include "config_net.h" 00021 #include "trueClock.h" 00022 #include "socket_udp.h" 00023 #include "socket_tcp.h" 00024 #include "mutexHolder.h" 00025 #include "lightMutexHolder.h" 00026 #include "pnotify.h" 00027 #include "atomicAdjust.h" 00028 #include "config_downloader.h" 00029 00030 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size; 00031 00032 //////////////////////////////////////////////////////////////////// 00033 // Function: ConnectionReader::SocketInfo::Constructor 00034 // Access: Public 00035 // Description: 00036 //////////////////////////////////////////////////////////////////// 00037 ConnectionReader::SocketInfo:: 00038 SocketInfo(const PT(Connection) &connection) : 00039 _connection(connection) 00040 { 00041 _busy = false; 00042 _error = false; 00043 } 00044 00045 //////////////////////////////////////////////////////////////////// 00046 // Function: ConnectionReader::SocketInfo::is_udp 00047 // Access: Public 00048 // Description: 00049 //////////////////////////////////////////////////////////////////// 00050 bool ConnectionReader::SocketInfo:: 00051 is_udp() const { 00052 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type())); 00053 } 00054 00055 //////////////////////////////////////////////////////////////////// 00056 // Function: ConnectionReader::SocketInfo::get_socket 00057 // Access: Public 00058 // Description: 00059 //////////////////////////////////////////////////////////////////// 00060 Socket_IP *ConnectionReader::SocketInfo:: 00061 get_socket() const { 00062 return _connection->get_socket(); 00063 } 00064 00065 //////////////////////////////////////////////////////////////////// 00066 // Function: ConnectionReader::ReaderThread::Constructor 00067 // Access: Public 00068 // Description: 00069 //////////////////////////////////////////////////////////////////// 00070 ConnectionReader::ReaderThread:: 00071 ReaderThread(ConnectionReader *reader, const string &thread_name, 00072 int thread_index) : 00073 Thread(make_thread_name(thread_name, thread_index), 00074 make_thread_name(thread_name, thread_index)), 00075 _reader(reader), 00076 _thread_index(thread_index) 00077 { 00078 } 00079 00080 //////////////////////////////////////////////////////////////////// 00081 // Function: ConnectionReader::ReaderThread::thread_main 00082 // Access: Public, Virtual 00083 // Description: 00084 //////////////////////////////////////////////////////////////////// 00085 void ConnectionReader::ReaderThread:: 00086 thread_main() { 00087 _reader->thread_run(_thread_index); 00088 } 00089 00090 //////////////////////////////////////////////////////////////////// 00091 // Function: ConnectionReader::Constructor 00092 // Access: Published 00093 // Description: Creates a new ConnectionReader with the indicated 00094 // number of threads to handle requests. If num_threads 00095 // is 0, the sockets will only be read by polling, 00096 // during an explicit poll() call. 00097 // (QueuedConnectionReader will do this automatically.) 00098 //////////////////////////////////////////////////////////////////// 00099 ConnectionReader:: 00100 ConnectionReader(ConnectionManager *manager, int num_threads, 00101 const string &thread_name) : 00102 _manager(manager) 00103 { 00104 if (!Thread::is_threading_supported()) { 00105 #ifndef NDEBUG 00106 if (num_threads != 0) { 00107 if (net_cat.is_debug()) { 00108 net_cat.debug() 00109 << "Threading support is not available.\n"; 00110 } 00111 } 00112 #endif // NDEBUG 00113 num_threads = 0; 00114 } 00115 00116 _raw_mode = false; 00117 _tcp_header_size = tcp_header_size; 00118 _polling = (num_threads <= 0); 00119 00120 _shutdown = false; 00121 00122 _next_index = 0; 00123 _num_results = 0; 00124 00125 _currently_polling_thread = -1; 00126 00127 string reader_thread_name = thread_name; 00128 if (thread_name.empty()) { 00129 reader_thread_name = "ReaderThread"; 00130 } 00131 int i; 00132 for (i = 0; i < num_threads; i++) { 00133 PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i); 00134 _threads.push_back(thread); 00135 } 00136 for (i = 0; i < num_threads; i++) { 00137 _threads[i]->start(net_thread_priority, true); 00138 } 00139 00140 _manager->add_reader(this); 00141 } 00142 00143 //////////////////////////////////////////////////////////////////// 00144 // Function: ConnectionReader::Destructor 00145 // Access: Published, Virtual 00146 // Description: 00147 //////////////////////////////////////////////////////////////////// 00148 ConnectionReader:: 00149 ~ConnectionReader() { 00150 if (_manager != (ConnectionManager *)NULL) { 00151 _manager->remove_reader(this); 00152 } 00153 00154 shutdown(); 00155 00156 // Delete all of our old sockets. 00157 Sockets::iterator si; 00158 for (si = _sockets.begin(); si != _sockets.end(); ++si) { 00159 delete (*si); 00160 } 00161 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) { 00162 SocketInfo *sinfo = (*si); 00163 if (!sinfo->_busy) { 00164 delete sinfo; 00165 } else { 00166 net_cat.error() 00167 << "Reentrant deletion of ConnectionReader--don't delete these\n" 00168 << "in response to connection_reset().\n"; 00169 00170 // We'll have to do the best we can to recover. 00171 sinfo->_connection.clear(); 00172 } 00173 } 00174 } 00175 00176 //////////////////////////////////////////////////////////////////// 00177 // Function: ConnectionReader::add_connection 00178 // Access: Published 00179 // Description: Adds a new socket to the list of sockets the 00180 // ConnectionReader will monitor. A datagram that comes 00181 // in on any of the monitored sockets will be reported. 00182 // In the case of a ConnectionListener, this adds a new 00183 // rendezvous socket; any activity on any of the 00184 // monitored sockets will cause a connection to be 00185 // accepted. 00186 // 00187 // The return value is true if the connection was added, 00188 // false if it was already there. 00189 // 00190 // add_connection() is thread-safe, and may be called at 00191 // will by any thread. 00192 //////////////////////////////////////////////////////////////////// 00193 bool ConnectionReader:: 00194 add_connection(Connection *connection) { 00195 nassertr(connection != (Connection *)NULL, false); 00196 00197 LightMutexHolder holder(_sockets_mutex); 00198 00199 // Make sure it's not already on the _sockets list. 00200 Sockets::const_iterator si; 00201 for (si = _sockets.begin(); si != _sockets.end(); ++si) { 00202 if ((*si)->_connection == connection) { 00203 // Whoops, already there. 00204 return false; 00205 } 00206 } 00207 00208 _sockets.push_back(new SocketInfo(connection)); 00209 00210 return true; 00211 } 00212 00213 //////////////////////////////////////////////////////////////////// 00214 // Function: ConnectionReader::remove_connection 00215 // Access: Published 00216 // Description: Removes a socket from the list of sockets being 00217 // monitored. Returns true if the socket was correctly 00218 // removed, false if it was not on the list in the first 00219 // place. 00220 // 00221 // remove_connection() is thread-safe, and may be called 00222 // at will by any thread. 00223 //////////////////////////////////////////////////////////////////// 00224 bool ConnectionReader:: 00225 remove_connection(Connection *connection) { 00226 LightMutexHolder holder(_sockets_mutex); 00227 00228 // Walk through the list of sockets to find the one we're removing. 00229 Sockets::iterator si; 00230 si = _sockets.begin(); 00231 while (si != _sockets.end() && (*si)->_connection != connection) { 00232 ++si; 00233 } 00234 if (si == _sockets.end()) { 00235 return false; 00236 } 00237 00238 _removed_sockets.push_back(*si); 00239 _sockets.erase(si); 00240 00241 return true; 00242 } 00243 00244 //////////////////////////////////////////////////////////////////// 00245 // Function: ConnectionReader::is_connection_ok 00246 // Access: Published 00247 // Description: Returns true if the indicated connection has been 00248 // added to the ConnectionReader and is being monitored 00249 // properly, false if it is not known, or if there was 00250 // some error condition detected on the connection. (If 00251 // there was an error condition, normally the 00252 // ConnectionManager would have been informed and closed 00253 // the connection.) 00254 //////////////////////////////////////////////////////////////////// 00255 bool ConnectionReader:: 00256 is_connection_ok(Connection *connection) { 00257 LightMutexHolder holder(_sockets_mutex); 00258 00259 // Walk through the list of sockets to find the one we're asking 00260 // about. 00261 Sockets::iterator si; 00262 si = _sockets.begin(); 00263 while (si != _sockets.end() && (*si)->_connection != connection) { 00264 ++si; 00265 } 00266 if (si == _sockets.end()) { 00267 // Don't know that connection. 00268 return false; 00269 } 00270 00271 SocketInfo *sinfo = (*si); 00272 bool is_ok = !sinfo->_error; 00273 00274 return is_ok; 00275 } 00276 00277 //////////////////////////////////////////////////////////////////// 00278 // Function: ConnectionReader::poll 00279 // Access: Published 00280 // Description: Explicitly polls the available sockets to see if any 00281 // of them have any noise. This function does nothing 00282 // unless this is a polling-type ConnectionReader, 00283 // i.e. it was created with zero threads (and 00284 // is_polling() will return true). 00285 // 00286 // It is not necessary to call this explicitly for a 00287 // QueuedConnectionReader. 00288 //////////////////////////////////////////////////////////////////// 00289 void ConnectionReader:: 00290 poll() { 00291 if (!_polling) { 00292 return; 00293 } 00294 00295 SocketInfo *sinfo = get_next_available_socket(false, -2); 00296 if (sinfo != (SocketInfo *)NULL) { 00297 double max_poll_cycle = get_net_max_poll_cycle(); 00298 if (max_poll_cycle < 0.0) { 00299 // Continue to read all data. 00300 while (sinfo != (SocketInfo *)NULL) { 00301 process_incoming_data(sinfo); 00302 sinfo = get_next_available_socket(false, -2); 00303 } 00304 00305 } else { 00306 // Read only until a certain amount of time has elapsed. 00307 TrueClock *global_clock = TrueClock::get_global_ptr(); 00308 double stop = global_clock->get_short_time() + max_poll_cycle; 00309 00310 while (sinfo != (SocketInfo *)NULL) { 00311 process_incoming_data(sinfo); 00312 if (global_clock->get_short_time() >= stop) { 00313 return; 00314 } 00315 sinfo = get_next_available_socket(false, -2); 00316 } 00317 } 00318 } 00319 } 00320 00321 //////////////////////////////////////////////////////////////////// 00322 // Function: ConnectionReader::get_manager 00323 // Access: Published 00324 // Description: Returns a pointer to the ConnectionManager object 00325 // that serves this ConnectionReader. 00326 //////////////////////////////////////////////////////////////////// 00327 ConnectionManager *ConnectionReader:: 00328 get_manager() const { 00329 return _manager; 00330 } 00331 00332 //////////////////////////////////////////////////////////////////// 00333 // Function: ConnectionReader::get_num_threads 00334 // Access: Published 00335 // Description: Returns the number of threads the ConnectionReader 00336 // has been created with. 00337 //////////////////////////////////////////////////////////////////// 00338 int ConnectionReader:: 00339 get_num_threads() const { 00340 return _threads.size(); 00341 } 00342 00343 //////////////////////////////////////////////////////////////////// 00344 // Function: ConnectionReader::set_raw_mode 00345 // Access: Published 00346 // Description: Sets the ConnectionReader into raw mode (or turns off 00347 // raw mode). In raw mode, datagram headers are not 00348 // expected; instead, all the data available on the pipe 00349 // is treated as a single datagram. 00350 // 00351 // This is similar to set_tcp_header_size(0), except that it 00352 // also turns off headers for UDP packets. 00353 //////////////////////////////////////////////////////////////////// 00354 void ConnectionReader:: 00355 set_raw_mode(bool mode) { 00356 _raw_mode = mode; 00357 } 00358 00359 //////////////////////////////////////////////////////////////////// 00360 // Function: ConnectionReader::get_raw_mode 00361 // Access: Published 00362 // Description: Returns the current setting of the raw mode flag. 00363 // See set_raw_mode(). 00364 //////////////////////////////////////////////////////////////////// 00365 bool ConnectionReader:: 00366 get_raw_mode() const { 00367 return _raw_mode; 00368 } 00369 00370 //////////////////////////////////////////////////////////////////// 00371 // Function: ConnectionReader::set_tcp_header_size 00372 // Access: Published 00373 // Description: Sets the header size of TCP packets. At the present, 00374 // legal values for this are 0, 2, or 4; this specifies 00375 // the number of bytes to use encode the datagram length 00376 // at the start of each TCP datagram. Sender and 00377 // receiver must independently agree on this. 00378 //////////////////////////////////////////////////////////////////// 00379 void ConnectionReader:: 00380 set_tcp_header_size(int tcp_header_size) { 00381 _tcp_header_size = tcp_header_size; 00382 } 00383 00384 //////////////////////////////////////////////////////////////////// 00385 // Function: ConnectionReader::get_tcp_header_size 00386 // Access: Published 00387 // Description: Returns the current setting of TCP header size. 00388 // See set_tcp_header_size(). 00389 //////////////////////////////////////////////////////////////////// 00390 int ConnectionReader:: 00391 get_tcp_header_size() const { 00392 return _tcp_header_size; 00393 } 00394 00395 //////////////////////////////////////////////////////////////////// 00396 // Function: ConnectionReader::shutdown 00397 // Access: Published 00398 // Description: Terminates all threads cleanly. Normally this is 00399 // only called by the destructor, but it may be called 00400 // explicitly before destruction. 00401 //////////////////////////////////////////////////////////////////// 00402 void ConnectionReader:: 00403 shutdown() { 00404 if (_shutdown) { 00405 return; 00406 } 00407 00408 // First, begin the shutdown. This will tell our threads we want 00409 // them to quit. 00410 _shutdown = true; 00411 00412 // Now wait for all of our threads to terminate. 00413 Threads::iterator ti; 00414 for (ti = _threads.begin(); ti != _threads.end(); ++ti) { 00415 (*ti)->join(); 00416 } 00417 } 00418 00419 //////////////////////////////////////////////////////////////////// 00420 // Function: ConnectionReader::flush_read_connection 00421 // Access: Protected, Virtual 00422 // Description: Attempts to read all the possible data from the 00423 // indicated connection, which has just delivered a 00424 // write error (and has therefore already been closed). 00425 // If the connection is not monitered by this reader, 00426 // does nothing. 00427 //////////////////////////////////////////////////////////////////// 00428 void ConnectionReader:: 00429 flush_read_connection(Connection *connection) { 00430 // Ensure it doesn't get deleted. 00431 SocketInfo sinfo(connection); 00432 00433 if (!remove_connection(connection)) { 00434 // Not already in the reader. 00435 return; 00436 } 00437 00438 // The connection was previously in the reader, but has now been 00439 // removed. Now we can flush it completely. We check if there is 00440 // any read data available on just this one socket; we can do this 00441 // right here in this thread, since we've already removed this 00442 // connection from the reader. 00443 00444 Socket_fdset fdset; 00445 fdset.clear(); 00446 fdset.setForSocket(*(sinfo.get_socket())); 00447 int num_results = fdset.WaitForRead(true, 0); 00448 while (num_results != 0) { 00449 sinfo._busy = true; 00450 if (!process_incoming_data(&sinfo)) { 00451 break; 00452 } 00453 fdset.setForSocket(*(sinfo.get_socket())); 00454 num_results = fdset.WaitForRead(true, 0); 00455 } 00456 } 00457 00458 //////////////////////////////////////////////////////////////////// 00459 // Function: ConnectionReader::clear_manager 00460 // Access: Protected 00461 // Description: This should normally only be called when the 00462 // associated ConnectionManager destructs. It resets 00463 // the ConnectionManager pointer to NULL so we don't 00464 // have a floating pointer. This makes the 00465 // ConnectionReader invalid; presumably it also will be 00466 // destructed momentarily. 00467 //////////////////////////////////////////////////////////////////// 00468 void ConnectionReader:: 00469 clear_manager() { 00470 _manager = (ConnectionManager *)NULL; 00471 } 00472 00473 //////////////////////////////////////////////////////////////////// 00474 // Function: ConnectionReader::finish_socket 00475 // Access: Protected 00476 // Description: To be called when a socket has been fully read and is 00477 // ready for polling for additional data. 00478 //////////////////////////////////////////////////////////////////// 00479 void ConnectionReader:: 00480 finish_socket(SocketInfo *sinfo) { 00481 nassertv(sinfo->_busy); 00482 00483 // By marking the SocketInfo nonbusy, we make it available for 00484 // future polls. 00485 sinfo->_busy = false; 00486 } 00487 00488 //////////////////////////////////////////////////////////////////// 00489 // Function: ConnectionReader::process_incoming_data 00490 // Access: Protected, Virtual 00491 // Description: This is run within a thread when the call to 00492 // select() indicates there is data available on a 00493 // socket. Returns true if the data is read 00494 // successfully, false on failure (for instance, because 00495 // the connection is closed). 00496 //////////////////////////////////////////////////////////////////// 00497 bool ConnectionReader:: 00498 process_incoming_data(SocketInfo *sinfo) { 00499 if (_raw_mode) { 00500 if (sinfo->is_udp()) { 00501 return process_raw_incoming_udp_data(sinfo); 00502 } else { 00503 return process_raw_incoming_tcp_data(sinfo); 00504 } 00505 } else { 00506 if (sinfo->is_udp()) { 00507 return process_incoming_udp_data(sinfo); 00508 } else { 00509 return process_incoming_tcp_data(sinfo); 00510 } 00511 } 00512 } 00513 00514 //////////////////////////////////////////////////////////////////// 00515 // Function: ConnectionReader::process_incoming_udp_data 00516 // Access: Protected 00517 // Description: 00518 //////////////////////////////////////////////////////////////////// 00519 bool ConnectionReader:: 00520 process_incoming_udp_data(SocketInfo *sinfo) { 00521 Socket_UDP *socket; 00522 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00523 Socket_Address addr; 00524 00525 // Read as many bytes as we can. 00526 char buffer[read_buffer_size]; 00527 int bytes_read = read_buffer_size; 00528 00529 bool okflag = socket->GetPacket(buffer, &bytes_read, addr); 00530 00531 if (!okflag) { 00532 finish_socket(sinfo); 00533 return false; 00534 00535 } else if (bytes_read == 0) { 00536 // The socket was closed (!). This shouldn't happen with a UDP 00537 // connection. Oh well. Report that and return. 00538 if (_manager != (ConnectionManager *)NULL) { 00539 _manager->connection_reset(sinfo->_connection, 0); 00540 } 00541 finish_socket(sinfo); 00542 return false; 00543 } 00544 00545 // Since we are not running in raw mode, we decode the header to 00546 // determine how big the datagram is. This means we must have read 00547 // at least a full header. 00548 if (bytes_read < datagram_udp_header_size) { 00549 net_cat.error() 00550 << "Did not read entire header, discarding UDP datagram.\n"; 00551 finish_socket(sinfo); 00552 return true; 00553 } 00554 00555 DatagramUDPHeader header(buffer); 00556 00557 char *dp = buffer + datagram_udp_header_size; 00558 bytes_read -= datagram_udp_header_size; 00559 00560 NetDatagram datagram(dp, bytes_read); 00561 00562 // Now that we've read all the data, it's time to finish the socket 00563 // so another thread can read the next datagram. 00564 finish_socket(sinfo); 00565 00566 if (_shutdown) { 00567 return false; 00568 } 00569 00570 // And now do whatever we need to do to process the datagram. 00571 if (!header.verify_datagram(datagram)) { 00572 net_cat.error() 00573 << "Ignoring invalid UDP datagram.\n"; 00574 } else { 00575 datagram.set_connection(sinfo->_connection); 00576 datagram.set_address(NetAddress(addr)); 00577 00578 if (net_cat.is_spam()) { 00579 net_cat.spam() 00580 << "Received UDP datagram with " 00581 << datagram_udp_header_size + datagram.get_length() 00582 << " bytes on " << (void *)datagram.get_connection() 00583 << " from " << datagram.get_address() << "\n"; 00584 } 00585 00586 receive_datagram(datagram); 00587 } 00588 00589 return true; 00590 } 00591 00592 //////////////////////////////////////////////////////////////////// 00593 // Function: ConnectionReader::process_incoming_tcp_data 00594 // Access: Protected 00595 // Description: 00596 //////////////////////////////////////////////////////////////////// 00597 bool ConnectionReader:: 00598 process_incoming_tcp_data(SocketInfo *sinfo) { 00599 Socket_TCP *socket; 00600 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00601 00602 // Read only the header bytes to start with. 00603 char buffer[read_buffer_size]; 00604 int header_bytes_read = 0; 00605 00606 // First, we have to read the first _tcp_header_size bytes. 00607 while (header_bytes_read < _tcp_header_size) { 00608 int bytes_read = 00609 socket->RecvData(buffer + header_bytes_read, 00610 _tcp_header_size - header_bytes_read); 00611 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00612 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 00613 socket->Active()) { 00614 Thread::force_yield(); 00615 bytes_read = socket->RecvData(buffer + header_bytes_read, 00616 _tcp_header_size - header_bytes_read); 00617 } 00618 #endif // SIMPLE_THREADS 00619 00620 if (bytes_read <= 0) { 00621 // The socket was closed. Report that and return. 00622 if (_manager != (ConnectionManager *)NULL) { 00623 _manager->connection_reset(sinfo->_connection, 0); 00624 } 00625 finish_socket(sinfo); 00626 return false; 00627 } 00628 00629 header_bytes_read += bytes_read; 00630 Thread::consider_yield(); 00631 } 00632 00633 // Now we must decode the header to determine how big the datagram 00634 // is. This means we must have read at least a full header. 00635 if (header_bytes_read != _tcp_header_size) { 00636 // This should actually be impossible, by the read-loop logic 00637 // above. 00638 net_cat.error() 00639 << "Did not read entire header, discarding TCP datagram.\n"; 00640 finish_socket(sinfo); 00641 return true; 00642 } 00643 00644 DatagramTCPHeader header(buffer, _tcp_header_size); 00645 int size = header.get_datagram_size(_tcp_header_size); 00646 00647 // We have to loop until the entire datagram is read. 00648 NetDatagram datagram; 00649 00650 while (!_shutdown && (int)datagram.get_length() < size) { 00651 int bytes_read; 00652 00653 int read_bytes = read_buffer_size; 00654 #ifdef SIMPLE_THREADS 00655 // In the SIMPLE_THREADS case, we want to limit the number of 00656 // bytes we read in a single epoch, to minimize the impact on the 00657 // other threads. 00658 read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch); 00659 #endif 00660 00661 bytes_read = 00662 socket->RecvData(buffer, min(read_bytes, 00663 (int)(size - datagram.get_length()))); 00664 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00665 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 00666 socket->Active()) { 00667 Thread::force_yield(); 00668 bytes_read = 00669 socket->RecvData(buffer, min(read_bytes, 00670 (int)(size - datagram.get_length()))); 00671 } 00672 #endif // SIMPLE_THREADS 00673 00674 char *dp = buffer; 00675 00676 if (bytes_read <= 0) { 00677 // The socket was closed. Report that and return. 00678 if (_manager != (ConnectionManager *)NULL) { 00679 _manager->connection_reset(sinfo->_connection, 0); 00680 } 00681 finish_socket(sinfo); 00682 return false; 00683 } 00684 00685 int datagram_bytes = 00686 min(bytes_read, (int)(size - datagram.get_length())); 00687 datagram.append_data(dp, datagram_bytes); 00688 00689 if (bytes_read > datagram_bytes) { 00690 // There were some extra bytes at the end of the datagram. Maybe 00691 // the beginning of the next datagram? Huh. 00692 net_cat.error() 00693 << "Discarding " << bytes_read - datagram_bytes 00694 << " bytes following TCP datagram.\n"; 00695 } 00696 Thread::consider_yield(); 00697 } 00698 00699 // Now that we've read all the data, it's time to finish the socket 00700 // so another thread can read the next datagram. 00701 finish_socket(sinfo); 00702 00703 if (_shutdown) { 00704 return false; 00705 } 00706 00707 // And now do whatever we need to do to process the datagram. 00708 if (!header.verify_datagram(datagram, _tcp_header_size)) { 00709 net_cat.error() 00710 << "Ignoring invalid TCP datagram.\n"; 00711 } else { 00712 datagram.set_connection(sinfo->_connection); 00713 datagram.set_address(NetAddress(socket->GetPeerName())); 00714 00715 if (net_cat.is_spam()) { 00716 net_cat.spam() 00717 << "Received TCP datagram with " 00718 << _tcp_header_size + datagram.get_length() 00719 << " bytes on " << (void *)datagram.get_connection() 00720 << " from " << datagram.get_address() << "\n"; 00721 } 00722 00723 receive_datagram(datagram); 00724 } 00725 00726 return true; 00727 } 00728 00729 //////////////////////////////////////////////////////////////////// 00730 // Function: ConnectionReader::process_raw_incoming_udp_data 00731 // Access: Protected 00732 // Description: 00733 //////////////////////////////////////////////////////////////////// 00734 bool ConnectionReader:: 00735 process_raw_incoming_udp_data(SocketInfo *sinfo) { 00736 Socket_UDP *socket; 00737 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00738 Socket_Address addr; 00739 00740 // Read as many bytes as we can. 00741 char buffer[read_buffer_size]; 00742 int bytes_read = read_buffer_size; 00743 00744 bool okflag = socket->GetPacket(buffer, &bytes_read, addr); 00745 00746 if (!okflag) { 00747 finish_socket(sinfo); 00748 return false; 00749 00750 } else if (bytes_read == 0) { 00751 // The socket was closed (!). This shouldn't happen with a UDP 00752 // connection. Oh well. Report that and return. 00753 if (_manager != (ConnectionManager *)NULL) { 00754 _manager->connection_reset(sinfo->_connection, 0); 00755 } 00756 finish_socket(sinfo); 00757 return false; 00758 } 00759 00760 // In raw mode, we simply extract all the bytes and make that a 00761 // datagram. 00762 NetDatagram datagram(buffer, bytes_read); 00763 00764 // Now that we've read all the data, it's time to finish the socket 00765 // so another thread can read the next datagram. 00766 finish_socket(sinfo); 00767 00768 if (_shutdown) { 00769 return false; 00770 } 00771 00772 datagram.set_connection(sinfo->_connection); 00773 datagram.set_address(NetAddress(addr)); 00774 00775 if (net_cat.is_spam()) { 00776 net_cat.spam() 00777 << "Received raw UDP datagram with " << datagram.get_length() 00778 << " bytes on " << (void *)datagram.get_connection() 00779 << " from " << datagram.get_address() << "\n"; 00780 } 00781 00782 receive_datagram(datagram); 00783 00784 return true; 00785 } 00786 00787 //////////////////////////////////////////////////////////////////// 00788 // Function: ConnectionReader::process_raw_incoming_tcp_data 00789 // Access: Protected 00790 // Description: 00791 //////////////////////////////////////////////////////////////////// 00792 bool ConnectionReader:: 00793 process_raw_incoming_tcp_data(SocketInfo *sinfo) { 00794 Socket_TCP *socket; 00795 DCAST_INTO_R(socket, sinfo->get_socket(), false); 00796 00797 // Read as many bytes as we can. 00798 char buffer[read_buffer_size]; 00799 int bytes_read = socket->RecvData(buffer, read_buffer_size); 00800 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00801 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 00802 socket->Active()) { 00803 Thread::force_yield(); 00804 bytes_read = socket->RecvData(buffer, read_buffer_size); 00805 } 00806 #endif // SIMPLE_THREADS 00807 00808 if (bytes_read <= 0) { 00809 // The socket was closed. Report that and return. 00810 if (_manager != (ConnectionManager *)NULL) { 00811 _manager->connection_reset(sinfo->_connection, 0); 00812 } 00813 finish_socket(sinfo); 00814 return false; 00815 } 00816 00817 // In raw mode, we simply extract all the bytes and make that a 00818 // datagram. 00819 NetDatagram datagram(buffer, bytes_read); 00820 00821 // Now that we've read all the data, it's time to finish the socket 00822 // so another thread can read the next datagram. 00823 finish_socket(sinfo); 00824 00825 if (_shutdown) { 00826 return false; 00827 } 00828 00829 datagram.set_connection(sinfo->_connection); 00830 datagram.set_address(NetAddress(socket->GetPeerName())); 00831 00832 if (net_cat.is_spam()) { 00833 net_cat.spam() 00834 << "Received raw TCP datagram with " << datagram.get_length() 00835 << " bytes on " << (void *)datagram.get_connection() 00836 << " from " << datagram.get_address() << "\n"; 00837 } 00838 00839 receive_datagram(datagram); 00840 00841 return true; 00842 } 00843 00844 //////////////////////////////////////////////////////////////////// 00845 // Function: ConnectionReader::thread_run 00846 // Access: Private 00847 // Description: This is the actual executing function for each 00848 // thread. 00849 //////////////////////////////////////////////////////////////////// 00850 void ConnectionReader:: 00851 thread_run(int thread_index) { 00852 nassertv(!_polling); 00853 nassertv(_threads[thread_index] == Thread::get_current_thread()); 00854 00855 while (!_shutdown) { 00856 SocketInfo *sinfo = 00857 get_next_available_socket(true, thread_index); 00858 if (sinfo != (SocketInfo *)NULL) { 00859 process_incoming_data(sinfo); 00860 Thread::consider_yield(); 00861 } else { 00862 Thread::force_yield(); 00863 } 00864 } 00865 } 00866 00867 00868 //////////////////////////////////////////////////////////////////// 00869 // Function: ConnectionReader::get_next_available_socket 00870 // Access: Private 00871 // Description: Polls the known connections for activity and returns 00872 // the next one known to have activity, or NULL if no 00873 // activity is detected within the timeout interval. 00874 // 00875 // This function may block indefinitely if it is being 00876 // called by multiple threads; if there are no other 00877 // threads, it may block only if allow_block is true. 00878 //////////////////////////////////////////////////////////////////// 00879 ConnectionReader::SocketInfo *ConnectionReader:: 00880 get_next_available_socket(bool allow_block, int current_thread_index) { 00881 // Go to sleep on the select() mutex. This guarantees that only one 00882 // thread is in this function at a time. 00883 MutexHolder holder(_select_mutex); 00884 00885 do { 00886 // First, check the result from the previous select call. If 00887 // there are any sockets remaining there, process them first. 00888 while (!_shutdown && _num_results > 0) { 00889 nassertr(_next_index < (int)_selecting_sockets.size(), NULL); 00890 int i = _next_index; 00891 _next_index++; 00892 00893 if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) { 00894 _num_results--; 00895 SocketInfo *sinfo = _selecting_sockets[i]; 00896 00897 // Some noise on this socket. 00898 sinfo->_busy = true; 00899 return sinfo; 00900 } 00901 } 00902 00903 bool interrupted; 00904 do { 00905 interrupted = false; 00906 00907 // Ok, no results from previous select calls. Prepare to set up 00908 // for a new select. 00909 00910 // First, report to anyone else who cares that we're the thread 00911 // about to do the poll. That way, if any new sockets come 00912 // available while we're polling, we can service them. 00913 AtomicAdjust::set(_currently_polling_thread, current_thread_index); 00914 00915 rebuild_select_list(); 00916 00917 // Now we can execute the select. 00918 _num_results = 0; 00919 _next_index = 0; 00920 00921 if (!_shutdown) { 00922 PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0); 00923 if (!allow_block) { 00924 timeout = 0; 00925 } 00926 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00927 // In the presence of SIMPLE_THREADS, we never wait at all, 00928 // but rather we yield the thread if we come up empty (so that 00929 // we won't block the entire process). 00930 timeout = 0; 00931 #endif 00932 00933 _num_results = _fdset.WaitForRead(false, timeout); 00934 } 00935 00936 if (_num_results == 0 && allow_block) { 00937 // If we reached net_max_block, go back and reconsider. (We 00938 // never timeout indefinitely, so we can check the shutdown 00939 // flag every once in a while.) 00940 interrupted = true; 00941 Thread::force_yield(); 00942 00943 } else if (_num_results < 0) { 00944 // If we had an error, just return. But yield the timeslice 00945 // first. 00946 Thread::force_yield(); 00947 return (SocketInfo *)NULL; 00948 } 00949 } while (!_shutdown && interrupted); 00950 00951 AtomicAdjust::set(_currently_polling_thread, current_thread_index); 00952 00953 // Repeat the above until we (a) find a socket with actual noise 00954 // on it, or (b) return from PR_Poll() with no sockets available. 00955 } while (!_shutdown && _num_results > 0); 00956 00957 return (SocketInfo *)NULL; 00958 } 00959 00960 00961 //////////////////////////////////////////////////////////////////// 00962 // Function: ConnectionReader::rebuild_select_list 00963 // Access: Private 00964 // Description: Rebuilds the _fdset and _selecting_sockets arrays 00965 // based on the sockets that are currently available for 00966 // selecting. 00967 //////////////////////////////////////////////////////////////////// 00968 void ConnectionReader:: 00969 rebuild_select_list() { 00970 _fdset.clear(); 00971 _selecting_sockets.clear(); 00972 00973 LightMutexHolder holder(_sockets_mutex); 00974 Sockets::const_iterator si; 00975 for (si = _sockets.begin(); si != _sockets.end(); ++si) { 00976 SocketInfo *sinfo = (*si); 00977 if (!sinfo->_busy && !sinfo->_error) { 00978 _fdset.setForSocket(*sinfo->get_socket()); 00979 _selecting_sockets.push_back(sinfo); 00980 } 00981 } 00982 00983 // This is also a fine time to delete the contents of the 00984 // _removed_sockets list. 00985 if (!_removed_sockets.empty()) { 00986 Sockets still_busy_sockets; 00987 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) { 00988 SocketInfo *sinfo = (*si); 00989 if (sinfo->_busy) { 00990 still_busy_sockets.push_back(sinfo); 00991 } else { 00992 delete sinfo; 00993 } 00994 } 00995 _removed_sockets.swap(still_busy_sockets); 00996 } 00997 } 00998 00999 //////////////////////////////////////////////////////////////////// 01000 // Function: ConnectionReader::accumulate_fdset 01001 // Access: Private 01002 // Description: Adds the sockets from this ConnectionReader (or 01003 // ConnectionListener) to the indicated fdset. This is 01004 // used by ConnectionManager::block() to build an fdset 01005 // of all attached readers. 01006 //////////////////////////////////////////////////////////////////// 01007 void ConnectionReader:: 01008 accumulate_fdset(Socket_fdset &fdset) { 01009 LightMutexHolder holder(_sockets_mutex); 01010 Sockets::const_iterator si; 01011 for (si = _sockets.begin(); si != _sockets.end(); ++si) { 01012 SocketInfo *sinfo = (*si); 01013 if (!sinfo->_busy && !sinfo->_error) { 01014 fdset.setForSocket(*sinfo->get_socket()); 01015 } 01016 } 01017 }