Panda3D

connectionReader.cxx

00001 // Filename: connectionReader.cxx
00002 // Created by:  drose (08Feb00)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "connectionReader.h"
00016 #include "connectionManager.h"
00017 #include "netDatagram.h"
00018 #include "datagramTCPHeader.h"
00019 #include "datagramUDPHeader.h"
00020 #include "config_net.h"
00021 #include "trueClock.h"
00022 #include "socket_udp.h"
00023 #include "socket_tcp.h"
00024 #include "mutexHolder.h"
00025 #include "lightMutexHolder.h"
00026 #include "pnotify.h"
00027 #include "atomicAdjust.h"
00028 #include "config_downloader.h"
00029 
00030 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
00031 
00032 static const int max_timeout_ms = 100;
00033 
00034 ////////////////////////////////////////////////////////////////////
00035 //     Function: ConnectionReader::SocketInfo::Constructor
00036 //       Access: Public
00037 //  Description:
00038 ////////////////////////////////////////////////////////////////////
00039 ConnectionReader::SocketInfo::
00040 SocketInfo(const PT(Connection) &connection) :
00041   _connection(connection)
00042 {
00043   _busy = false;
00044   _error = false;
00045 }
00046 
00047 ////////////////////////////////////////////////////////////////////
00048 //     Function: ConnectionReader::SocketInfo::is_udp
00049 //       Access: Public
00050 //  Description:
00051 ////////////////////////////////////////////////////////////////////
00052 bool ConnectionReader::SocketInfo::
00053 is_udp() const {
00054   return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
00055 }
00056 
00057 ////////////////////////////////////////////////////////////////////
00058 //     Function: ConnectionReader::SocketInfo::get_socket
00059 //       Access: Public
00060 //  Description:
00061 ////////////////////////////////////////////////////////////////////
00062 Socket_IP *ConnectionReader::SocketInfo::
00063 get_socket() const {
00064   return _connection->get_socket();
00065 }
00066 
00067 ////////////////////////////////////////////////////////////////////
00068 //     Function: ConnectionReader::ReaderThread::Constructor
00069 //       Access: Public
00070 //  Description:
00071 ////////////////////////////////////////////////////////////////////
00072 ConnectionReader::ReaderThread::
00073 ReaderThread(ConnectionReader *reader, const string &thread_name, 
00074              int thread_index) :
00075   Thread(make_thread_name(thread_name, thread_index), 
00076          make_thread_name(thread_name, thread_index)),
00077   _reader(reader),
00078   _thread_index(thread_index)
00079 {
00080 }
00081 
00082 ////////////////////////////////////////////////////////////////////
00083 //     Function: ConnectionReader::ReaderThread::thread_main
00084 //       Access: Public, Virtual
00085 //  Description:
00086 ////////////////////////////////////////////////////////////////////
00087 void ConnectionReader::ReaderThread::
00088 thread_main() {
00089   _reader->thread_run(_thread_index);
00090 }
00091 
00092 ////////////////////////////////////////////////////////////////////
00093 //     Function: ConnectionReader::Constructor
00094 //       Access: Published
00095 //  Description: Creates a new ConnectionReader with the indicated
00096 //               number of threads to handle requests.  If num_threads
00097 //               is 0, the sockets will only be read by polling,
00098 //               during an explicit poll() call.
00099 //               (QueuedConnectionReader will do this automatically.)
00100 ////////////////////////////////////////////////////////////////////
00101 ConnectionReader::
00102 ConnectionReader(ConnectionManager *manager, int num_threads,
00103                  const string &thread_name) :
00104   _manager(manager)
00105 {
00106   if (!Thread::is_threading_supported()) {
00107 #ifndef NDEBUG
00108     if (num_threads != 0) {
00109       if (net_cat.is_debug()) {
00110         net_cat.debug()
00111           << "Threading support is not available.\n";
00112       }
00113     }
00114 #endif  // NDEBUG
00115     num_threads = 0;
00116   }
00117 
00118   _raw_mode = false;
00119   _tcp_header_size = tcp_header_size;
00120   _polling = (num_threads <= 0);
00121 
00122   _shutdown = false;
00123 
00124   _next_index = 0;
00125   _num_results = 0;
00126 
00127   _currently_polling_thread = -1;
00128 
00129   string reader_thread_name = thread_name;
00130   if (thread_name.empty()) {
00131     reader_thread_name = "ReaderThread";
00132   }
00133   int i;
00134   for (i = 0; i < num_threads; i++) {
00135     PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
00136     _threads.push_back(thread);
00137   }
00138   for (i = 0; i < num_threads; i++) {
00139     _threads[i]->start(net_thread_priority, true);
00140   }
00141 
00142   _manager->add_reader(this);
00143 }
00144 
00145 ////////////////////////////////////////////////////////////////////
00146 //     Function: ConnectionReader::Destructor
00147 //       Access: Published, Virtual
00148 //  Description:
00149 ////////////////////////////////////////////////////////////////////
00150 ConnectionReader::
00151 ~ConnectionReader() {
00152   if (_manager != (ConnectionManager *)NULL) {
00153     _manager->remove_reader(this);
00154   }
00155 
00156   shutdown();
00157 
00158   // Delete all of our old sockets.
00159   Sockets::iterator si;
00160   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00161     delete (*si);
00162   }
00163   for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00164     SocketInfo *sinfo = (*si);
00165     if (!sinfo->_busy) {
00166       delete sinfo;
00167     } else {
00168       net_cat.error()
00169         << "Reentrant deletion of ConnectionReader--don't delete these\n"
00170         << "in response to connection_reset().\n";
00171 
00172       // We'll have to do the best we can to recover.
00173       sinfo->_connection.clear();
00174     }
00175   }
00176 }
00177 
00178 ////////////////////////////////////////////////////////////////////
00179 //     Function: ConnectionReader::add_connection
00180 //       Access: Published
00181 //  Description: Adds a new socket to the list of sockets the
00182 //               ConnectionReader will monitor.  A datagram that comes
00183 //               in on any of the monitored sockets will be reported.
00184 //               In the case of a ConnectionListener, this adds a new
00185 //               rendezvous socket; any activity on any of the
00186 //               monitored sockets will cause a connection to be
00187 //               accepted.
00188 //
00189 //               The return value is true if the connection was added,
00190 //               false if it was already there.
00191 //
00192 //               add_connection() is thread-safe, and may be called at
00193 //               will by any thread.
00194 ////////////////////////////////////////////////////////////////////
00195 bool ConnectionReader::
00196 add_connection(Connection *connection) {
00197   nassertr(connection != (Connection *)NULL, false);
00198 
00199   LightMutexHolder holder(_sockets_mutex);
00200 
00201   // Make sure it's not already on the _sockets list.
00202   Sockets::const_iterator si;
00203   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00204     if ((*si)->_connection == connection) {
00205       // Whoops, already there.
00206       return false;
00207     }
00208   }
00209 
00210   _sockets.push_back(new SocketInfo(connection));
00211 
00212   return true;
00213 }
00214 
00215 ////////////////////////////////////////////////////////////////////
00216 //     Function: ConnectionReader::remove_connection
00217 //       Access: Published
00218 //  Description: Removes a socket from the list of sockets being
00219 //               monitored.  Returns true if the socket was correctly
00220 //               removed, false if it was not on the list in the first
00221 //               place.
00222 //
00223 //               remove_connection() is thread-safe, and may be called
00224 //               at will by any thread.
00225 ////////////////////////////////////////////////////////////////////
00226 bool ConnectionReader::
00227 remove_connection(Connection *connection) {
00228   LightMutexHolder holder(_sockets_mutex);
00229 
00230   // Walk through the list of sockets to find the one we're removing.
00231   Sockets::iterator si;
00232   si = _sockets.begin();
00233   while (si != _sockets.end() && (*si)->_connection != connection) {
00234     ++si;
00235   }
00236   if (si == _sockets.end()) {
00237     return false;
00238   }
00239 
00240   _removed_sockets.push_back(*si);
00241   _sockets.erase(si);
00242 
00243   return true;
00244 }
00245 
00246 ////////////////////////////////////////////////////////////////////
00247 //     Function: ConnectionReader::is_connection_ok
00248 //       Access: Published
00249 //  Description: Returns true if the indicated connection has been
00250 //               added to the ConnectionReader and is being monitored
00251 //               properly, false if it is not known, or if there was
00252 //               some error condition detected on the connection.  (If
00253 //               there was an error condition, normally the
00254 //               ConnectionManager would have been informed and closed
00255 //               the connection.)
00256 ////////////////////////////////////////////////////////////////////
00257 bool ConnectionReader::
00258 is_connection_ok(Connection *connection) {
00259   LightMutexHolder holder(_sockets_mutex);
00260 
00261   // Walk through the list of sockets to find the one we're asking
00262   // about.
00263   Sockets::iterator si;
00264   si = _sockets.begin();
00265   while (si != _sockets.end() && (*si)->_connection != connection) {
00266     ++si;
00267   }
00268   if (si == _sockets.end()) {
00269     // Don't know that connection.
00270     return false;
00271   }
00272 
00273   SocketInfo *sinfo = (*si);
00274   bool is_ok = !sinfo->_error;
00275 
00276   return is_ok;
00277 }
00278 
00279 ////////////////////////////////////////////////////////////////////
00280 //     Function: ConnectionReader::poll
00281 //       Access: Published
00282 //  Description: Explicitly polls the available sockets to see if any
00283 //               of them have any noise.  This function does nothing
00284 //               unless this is a polling-type ConnectionReader,
00285 //               i.e. it was created with zero threads (and
00286 //               is_polling() will return true).
00287 //
00288 //               It is not necessary to call this explicitly for a
00289 //               QueuedConnectionReader.
00290 ////////////////////////////////////////////////////////////////////
00291 void ConnectionReader::
00292 poll() {
00293   if (!_polling) {
00294     return;
00295   }
00296 
00297   SocketInfo *sinfo = get_next_available_socket(false, -2);
00298   if (sinfo != (SocketInfo *)NULL) {
00299     double max_poll_cycle = get_max_poll_cycle();
00300     if (max_poll_cycle < 0.0) {
00301       // Continue to read all data.
00302       while (sinfo != (SocketInfo *)NULL) {
00303         process_incoming_data(sinfo);
00304         sinfo = get_next_available_socket(false, -2);
00305       }
00306 
00307     } else {
00308       // Read only until a certain amount of time has elapsed.
00309       TrueClock *global_clock = TrueClock::get_global_ptr();
00310       double stop = global_clock->get_short_time() + max_poll_cycle;
00311 
00312       while (sinfo != (SocketInfo *)NULL) {
00313         process_incoming_data(sinfo);
00314         if (global_clock->get_short_time() >= stop) {
00315           return;
00316         }
00317         sinfo = get_next_available_socket(false, -2);
00318       }
00319     }
00320   }
00321 }
00322 
00323 ////////////////////////////////////////////////////////////////////
00324 //     Function: ConnectionReader::get_manager
00325 //       Access: Published
00326 //  Description: Returns a pointer to the ConnectionManager object
00327 //               that serves this ConnectionReader.
00328 ////////////////////////////////////////////////////////////////////
00329 ConnectionManager *ConnectionReader::
00330 get_manager() const {
00331   return _manager;
00332 }
00333 
00334 ////////////////////////////////////////////////////////////////////
00335 //     Function: ConnectionReader::get_num_threads
00336 //       Access: Published
00337 //  Description: Returns the number of threads the ConnectionReader
00338 //               has been created with.
00339 ////////////////////////////////////////////////////////////////////
00340 int ConnectionReader::
00341 get_num_threads() const {
00342   return _threads.size();
00343 }
00344 
00345 ////////////////////////////////////////////////////////////////////
00346 //     Function: ConnectionReader::set_raw_mode
00347 //       Access: Published
00348 //  Description: Sets the ConnectionReader into raw mode (or turns off
00349 //               raw mode).  In raw mode, datagram headers are not
00350 //               expected; instead, all the data available on the pipe
00351 //               is treated as a single datagram.
00352 //
00353 //               This is similar to set_tcp_header_size(0), except that it
00354 //               also turns off headers for UDP packets.
00355 ////////////////////////////////////////////////////////////////////
00356 void ConnectionReader::
00357 set_raw_mode(bool mode) {
00358   _raw_mode = mode;
00359 }
00360 
00361 ////////////////////////////////////////////////////////////////////
00362 //     Function: ConnectionReader::get_raw_mode
00363 //       Access: Published
00364 //  Description: Returns the current setting of the raw mode flag.
00365 //               See set_raw_mode().
00366 ////////////////////////////////////////////////////////////////////
00367 bool ConnectionReader::
00368 get_raw_mode() const {
00369   return _raw_mode;
00370 }
00371 
00372 ////////////////////////////////////////////////////////////////////
00373 //     Function: ConnectionReader::set_tcp_header_size
00374 //       Access: Published
00375 //  Description: Sets the header size of TCP packets.  At the present,
00376 //               legal values for this are 0, 2, or 4; this specifies
00377 //               the number of bytes to use encode the datagram length
00378 //               at the start of each TCP datagram.  Sender and
00379 //               receiver must independently agree on this.
00380 ////////////////////////////////////////////////////////////////////
00381 void ConnectionReader::
00382 set_tcp_header_size(int tcp_header_size) {
00383   _tcp_header_size = tcp_header_size;
00384 }
00385 
00386 ////////////////////////////////////////////////////////////////////
00387 //     Function: ConnectionReader::get_tcp_header_size
00388 //       Access: Published
00389 //  Description: Returns the current setting of TCP header size.
00390 //               See set_tcp_header_size().
00391 ////////////////////////////////////////////////////////////////////
00392 int ConnectionReader::
00393 get_tcp_header_size() const {
00394   return _tcp_header_size;
00395 }
00396 
00397 ////////////////////////////////////////////////////////////////////
00398 //     Function: ConnectionReader::shutdown
00399 //       Access: Published
00400 //  Description: Terminates all threads cleanly.  Normally this is
00401 //               only called by the destructor, but it may be called
00402 //               explicitly before destruction.
00403 ////////////////////////////////////////////////////////////////////
00404 void ConnectionReader::
00405 shutdown() {
00406   if (_shutdown) {
00407     return;
00408   }
00409 
00410   // First, begin the shutdown.  This will tell our threads we want
00411   // them to quit.
00412   _shutdown = true;
00413 
00414   // Now wait for all of our threads to terminate.
00415   Threads::iterator ti;
00416   for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00417     (*ti)->join();
00418   }
00419 }
00420 
00421 ////////////////////////////////////////////////////////////////////
00422 //     Function: ConnectionReader::flush_read_connection
00423 //       Access: Protected, Virtual
00424 //  Description: Attempts to read all the possible data from the
00425 //               indicated connection, which has just delivered a
00426 //               write error (and has therefore already been closed).
00427 //               If the connection is not monitered by this reader,
00428 //               does nothing.
00429 ////////////////////////////////////////////////////////////////////
00430 void ConnectionReader::
00431 flush_read_connection(Connection *connection) {
00432   // Ensure it doesn't get deleted.
00433   SocketInfo sinfo(connection);
00434 
00435   if (!remove_connection(connection)) {
00436     // Not already in the reader.
00437     return;
00438   }
00439 
00440   // The connection was previously in the reader, but has now been
00441   // removed.  Now we can flush it completely.  We check if there is
00442   // any read data available on just this one socket; we can do this
00443   // right here in this thread, since we've already removed this
00444   // connection from the reader.
00445 
00446   Socket_fdset fdset;
00447   fdset.clear();
00448   fdset.setForSocket(*(sinfo.get_socket()));
00449   int num_results = fdset.WaitForRead(true, 0);
00450   while (num_results != 0) {
00451     sinfo._busy = true;
00452     if (!process_incoming_data(&sinfo)) {
00453       break;
00454     }
00455     fdset.setForSocket(*(sinfo.get_socket()));
00456     num_results = fdset.WaitForRead(true, 0);
00457   }
00458 }
00459 
00460 ////////////////////////////////////////////////////////////////////
00461 //     Function: ConnectionReader::clear_manager
00462 //       Access: Protected
00463 //  Description: This should normally only be called when the
00464 //               associated ConnectionManager destructs.  It resets
00465 //               the ConnectionManager pointer to NULL so we don't
00466 //               have a floating pointer.  This makes the
00467 //               ConnectionReader invalid; presumably it also will be
00468 //               destructed momentarily.
00469 ////////////////////////////////////////////////////////////////////
00470 void ConnectionReader::
00471 clear_manager() {
00472   _manager = (ConnectionManager *)NULL;
00473 }
00474 
00475 ////////////////////////////////////////////////////////////////////
00476 //     Function: ConnectionReader::finish_socket
00477 //       Access: Protected
00478 //  Description: To be called when a socket has been fully read and is
00479 //               ready for polling for additional data.
00480 ////////////////////////////////////////////////////////////////////
00481 void ConnectionReader::
00482 finish_socket(SocketInfo *sinfo) {
00483   nassertv(sinfo->_busy);
00484 
00485   // By marking the SocketInfo nonbusy, we make it available for
00486   // future polls.
00487   sinfo->_busy = false;
00488 }
00489 
00490 ////////////////////////////////////////////////////////////////////
00491 //     Function: ConnectionReader::process_incoming_data
00492 //       Access: Protected, Virtual
00493 //  Description: This is run within a thread when the call to
00494 //               select() indicates there is data available on a
00495 //               socket.  Returns true if the data is read
00496 //               successfully, false on failure (for instance, because
00497 //               the connection is closed).
00498 ////////////////////////////////////////////////////////////////////
00499 bool ConnectionReader::
00500 process_incoming_data(SocketInfo *sinfo) {
00501   if (_raw_mode) {
00502     if (sinfo->is_udp()) {
00503       return process_raw_incoming_udp_data(sinfo);
00504     } else {
00505       return process_raw_incoming_tcp_data(sinfo);
00506     }
00507   } else {
00508     if (sinfo->is_udp()) {
00509       return process_incoming_udp_data(sinfo);
00510     } else {
00511       return process_incoming_tcp_data(sinfo);
00512     }
00513   }
00514 }
00515 
00516 ////////////////////////////////////////////////////////////////////
00517 //     Function: ConnectionReader::process_incoming_udp_data
00518 //       Access: Protected
00519 //  Description:
00520 ////////////////////////////////////////////////////////////////////
00521 bool ConnectionReader::
00522 process_incoming_udp_data(SocketInfo *sinfo) {
00523   Socket_UDP *socket;
00524   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00525   Socket_Address addr;
00526 
00527   // Read as many bytes as we can.
00528   char buffer[read_buffer_size];
00529   int bytes_read = read_buffer_size;
00530 
00531   bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
00532 
00533   if (!okflag) {
00534     finish_socket(sinfo);
00535     return false;
00536 
00537   } else if (bytes_read == 0) {
00538     // The socket was closed (!).  This shouldn't happen with a UDP
00539     // connection.  Oh well.  Report that and return.
00540     if (_manager != (ConnectionManager *)NULL) {
00541       _manager->connection_reset(sinfo->_connection, 0);
00542     }
00543     finish_socket(sinfo);
00544     return false;
00545   }
00546 
00547   // Since we are not running in raw mode, we decode the header to
00548   // determine how big the datagram is.  This means we must have read
00549   // at least a full header.
00550   if (bytes_read < datagram_udp_header_size) {
00551     net_cat.error()
00552       << "Did not read entire header, discarding UDP datagram.\n";
00553     finish_socket(sinfo);
00554     return true;
00555   }
00556   
00557   DatagramUDPHeader header(buffer);
00558   
00559   char *dp = buffer + datagram_udp_header_size;
00560   bytes_read -= datagram_udp_header_size;
00561   
00562   NetDatagram datagram(dp, bytes_read);
00563   
00564   // Now that we've read all the data, it's time to finish the socket
00565   // so another thread can read the next datagram.
00566   finish_socket(sinfo);
00567   
00568   if (_shutdown) {
00569     return false;
00570   }
00571   
00572   // And now do whatever we need to do to process the datagram.
00573   if (!header.verify_datagram(datagram)) {
00574     net_cat.error()
00575       << "Ignoring invalid UDP datagram.\n";
00576   } else {
00577     datagram.set_connection(sinfo->_connection);
00578     datagram.set_address(NetAddress(addr));
00579 
00580     if (net_cat.is_spam()) {
00581       net_cat.spam()
00582         << "Received UDP datagram with " 
00583         << datagram_udp_header_size + datagram.get_length() 
00584         << " bytes on " << (void *)datagram.get_connection()
00585         << " from " << datagram.get_address() << "\n";
00586     }
00587 
00588     receive_datagram(datagram);
00589   }
00590 
00591   return true;
00592 }
00593 
00594 ////////////////////////////////////////////////////////////////////
00595 //     Function: ConnectionReader::process_incoming_tcp_data
00596 //       Access: Protected
00597 //  Description:
00598 ////////////////////////////////////////////////////////////////////
00599 bool ConnectionReader::
00600 process_incoming_tcp_data(SocketInfo *sinfo) {
00601   Socket_TCP *socket;
00602   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00603 
00604   // Read only the header bytes to start with.
00605   char buffer[read_buffer_size];
00606   int header_bytes_read = 0;
00607 
00608   // First, we have to read the first _tcp_header_size bytes.
00609   while (header_bytes_read < _tcp_header_size) {
00610     int bytes_read =
00611       socket->RecvData(buffer + header_bytes_read,
00612                        _tcp_header_size - header_bytes_read);
00613 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00614     while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
00615            socket->Active()) {
00616       Thread::force_yield();
00617       bytes_read = socket->RecvData(buffer + header_bytes_read,
00618                                     _tcp_header_size - header_bytes_read);
00619     }
00620 #endif  // SIMPLE_THREADS
00621 
00622     if (bytes_read <= 0) {
00623       // The socket was closed.  Report that and return.
00624       if (_manager != (ConnectionManager *)NULL) {
00625         _manager->connection_reset(sinfo->_connection, 0);
00626       }
00627       finish_socket(sinfo);
00628       return false;
00629     }
00630 
00631     header_bytes_read += bytes_read;
00632     Thread::consider_yield();
00633   }
00634 
00635   // Now we must decode the header to determine how big the datagram
00636   // is.  This means we must have read at least a full header.
00637   if (header_bytes_read != _tcp_header_size) {
00638     // This should actually be impossible, by the read-loop logic
00639     // above.
00640     net_cat.error()
00641       << "Did not read entire header, discarding TCP datagram.\n";
00642     finish_socket(sinfo);
00643     return true;
00644   }
00645 
00646   DatagramTCPHeader header(buffer, _tcp_header_size);
00647   int size = header.get_datagram_size(_tcp_header_size);
00648 
00649   // We have to loop until the entire datagram is read.
00650   NetDatagram datagram;
00651 
00652   while (!_shutdown && (int)datagram.get_length() < size) {
00653     int bytes_read;
00654 
00655     int read_bytes = read_buffer_size;
00656 #ifdef SIMPLE_THREADS
00657     // In the SIMPLE_THREADS case, we want to limit the number of
00658     // bytes we read in a single epoch, to minimize the impact on the
00659     // other threads.
00660     read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
00661 #endif
00662 
00663     bytes_read =
00664       socket->RecvData(buffer, min(read_bytes,
00665                                    (int)(size - datagram.get_length())));
00666 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00667     while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
00668            socket->Active()) {
00669       Thread::force_yield();
00670       bytes_read =
00671         socket->RecvData(buffer, min(read_bytes,
00672                                      (int)(size - datagram.get_length())));
00673     }
00674 #endif  // SIMPLE_THREADS
00675 
00676     char *dp = buffer;
00677 
00678     if (bytes_read <= 0) {
00679       // The socket was closed.  Report that and return.
00680       if (_manager != (ConnectionManager *)NULL) {
00681         _manager->connection_reset(sinfo->_connection, 0);
00682       }
00683       finish_socket(sinfo);
00684       return false;
00685     }
00686 
00687     int datagram_bytes =
00688       min(bytes_read, (int)(size - datagram.get_length()));
00689     datagram.append_data(dp, datagram_bytes);
00690 
00691     if (bytes_read > datagram_bytes) {
00692       // There were some extra bytes at the end of the datagram.  Maybe
00693       // the beginning of the next datagram?  Huh.
00694       net_cat.error()
00695         << "Discarding " << bytes_read - datagram_bytes
00696         << " bytes following TCP datagram.\n";
00697     }
00698     Thread::consider_yield();
00699   }
00700 
00701   // Now that we've read all the data, it's time to finish the socket
00702   // so another thread can read the next datagram.
00703   finish_socket(sinfo);
00704 
00705   if (_shutdown) {
00706     return false;
00707   }
00708 
00709   // And now do whatever we need to do to process the datagram.
00710   if (!header.verify_datagram(datagram, _tcp_header_size)) {
00711     net_cat.error()
00712       << "Ignoring invalid TCP datagram.\n";
00713   } else {
00714     datagram.set_connection(sinfo->_connection);
00715     datagram.set_address(NetAddress(socket->GetPeerName()));
00716 
00717     if (net_cat.is_spam()) {
00718       net_cat.spam()
00719         << "Received TCP datagram with " 
00720         << _tcp_header_size + datagram.get_length() 
00721         << " bytes on " << (void *)datagram.get_connection()
00722         << " from " << datagram.get_address() << "\n";
00723     }
00724     
00725     receive_datagram(datagram);
00726   }
00727 
00728   return true;
00729 }
00730 
00731 ////////////////////////////////////////////////////////////////////
00732 //     Function: ConnectionReader::process_raw_incoming_udp_data
00733 //       Access: Protected
00734 //  Description:
00735 ////////////////////////////////////////////////////////////////////
00736 bool ConnectionReader::
00737 process_raw_incoming_udp_data(SocketInfo *sinfo) {
00738   Socket_UDP *socket;
00739   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00740   Socket_Address addr;
00741 
00742   // Read as many bytes as we can.
00743   char buffer[read_buffer_size];
00744   int bytes_read = read_buffer_size;
00745 
00746   bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
00747 
00748   if (!okflag) {
00749     finish_socket(sinfo);
00750     return false;
00751 
00752   } else if (bytes_read == 0) {
00753     // The socket was closed (!).  This shouldn't happen with a UDP
00754     // connection.  Oh well.  Report that and return.
00755     if (_manager != (ConnectionManager *)NULL) {
00756       _manager->connection_reset(sinfo->_connection, 0);
00757     }
00758     finish_socket(sinfo);
00759     return false;
00760   }
00761 
00762   // In raw mode, we simply extract all the bytes and make that a
00763   // datagram.
00764   NetDatagram datagram(buffer, bytes_read);
00765   
00766   // Now that we've read all the data, it's time to finish the socket
00767   // so another thread can read the next datagram.
00768   finish_socket(sinfo);
00769   
00770   if (_shutdown) {
00771     return false;
00772   }
00773   
00774   datagram.set_connection(sinfo->_connection);
00775   datagram.set_address(NetAddress(addr));
00776 
00777   if (net_cat.is_spam()) {
00778     net_cat.spam()
00779       << "Received raw UDP datagram with " << datagram.get_length() 
00780       << " bytes on " << (void *)datagram.get_connection()
00781       << " from " << datagram.get_address() << "\n";
00782   }
00783 
00784   receive_datagram(datagram);
00785 
00786   return true;
00787 }
00788 
00789 ////////////////////////////////////////////////////////////////////
00790 //     Function: ConnectionReader::process_raw_incoming_tcp_data
00791 //       Access: Protected
00792 //  Description:
00793 ////////////////////////////////////////////////////////////////////
00794 bool ConnectionReader::
00795 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
00796   Socket_TCP *socket;
00797   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00798 
00799   // Read as many bytes as we can.
00800   char buffer[read_buffer_size];
00801   int bytes_read = socket->RecvData(buffer, read_buffer_size);
00802 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00803   while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 
00804          socket->Active()) {
00805     Thread::force_yield();
00806     bytes_read = socket->RecvData(buffer, read_buffer_size);
00807   }
00808 #endif  // SIMPLE_THREADS
00809 
00810   if (bytes_read <= 0) {
00811     // The socket was closed.  Report that and return.
00812     if (_manager != (ConnectionManager *)NULL) {
00813       _manager->connection_reset(sinfo->_connection, 0);
00814     }
00815     finish_socket(sinfo);
00816     return false;
00817   }
00818 
00819   // In raw mode, we simply extract all the bytes and make that a
00820   // datagram.
00821   NetDatagram datagram(buffer, bytes_read);
00822   
00823   // Now that we've read all the data, it's time to finish the socket
00824   // so another thread can read the next datagram.
00825   finish_socket(sinfo);
00826   
00827   if (_shutdown) {
00828     return false;
00829   }
00830   
00831   datagram.set_connection(sinfo->_connection);
00832   datagram.set_address(NetAddress(socket->GetPeerName()));
00833 
00834   if (net_cat.is_spam()) {
00835     net_cat.spam()
00836       << "Received raw TCP datagram with " << datagram.get_length() 
00837       << " bytes on " << (void *)datagram.get_connection()
00838       << " from " << datagram.get_address() << "\n";
00839   }
00840 
00841   receive_datagram(datagram);
00842 
00843   return true;
00844 }
00845 
00846 ////////////////////////////////////////////////////////////////////
00847 //     Function: ConnectionReader::thread_run
00848 //       Access: Private
00849 //  Description: This is the actual executing function for each
00850 //               thread.
00851 ////////////////////////////////////////////////////////////////////
00852 void ConnectionReader::
00853 thread_run(int thread_index) {
00854   nassertv(!_polling);
00855   nassertv(_threads[thread_index] == Thread::get_current_thread());
00856 
00857   while (!_shutdown) {
00858     SocketInfo *sinfo =
00859       get_next_available_socket(true, thread_index);
00860     if (sinfo != (SocketInfo *)NULL) {
00861       process_incoming_data(sinfo);
00862       Thread::consider_yield();
00863     } else {
00864       Thread::force_yield();
00865     }
00866   }
00867 }
00868 
00869 
00870 ////////////////////////////////////////////////////////////////////
00871 //     Function: ConnectionReader::get_next_available_socket
00872 //       Access: Private
00873 //  Description: Polls the known connections for activity and returns
00874 //               the next one known to have activity, or NULL if no
00875 //               activity is detected within the timeout interval.
00876 //
00877 //               This function may block indefinitely if it is being
00878 //               called by multiple threads; if there are no other
00879 //               threads, it may block only if allow_block is true.
00880 ////////////////////////////////////////////////////////////////////
00881 ConnectionReader::SocketInfo *ConnectionReader::
00882 get_next_available_socket(bool allow_block, int current_thread_index) {
00883   // Go to sleep on the select() mutex.  This guarantees that only one
00884   // thread is in this function at a time.
00885   MutexHolder holder(_select_mutex);
00886 
00887   do {
00888     // First, check the result from the previous select call.  If
00889     // there are any sockets remaining there, process them first.
00890     while (!_shutdown && _num_results > 0) {
00891       nassertr(_next_index < (int)_selecting_sockets.size(), NULL);
00892       int i = _next_index;
00893       _next_index++;
00894 
00895       if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
00896         _num_results--;
00897         SocketInfo *sinfo = _selecting_sockets[i];
00898 
00899         // Some noise on this socket.
00900         sinfo->_busy = true;
00901         return sinfo;
00902       }
00903     }
00904 
00905     bool interrupted;
00906     do {
00907       interrupted = false;
00908 
00909       // Ok, no results from previous select calls.  Prepare to set up
00910       // for a new select.
00911 
00912       // First, report to anyone else who cares that we're the thread
00913       // about to do the poll.  That way, if any new sockets come
00914       // available while we're polling, we can service them.
00915       AtomicAdjust::set(_currently_polling_thread, current_thread_index);
00916       
00917       rebuild_select_list();
00918 
00919       // Now we can execute the select.
00920       _num_results = 0;
00921       _next_index = 0;
00922 
00923       if (!_shutdown) {
00924         PN_uint32 timeout = max_timeout_ms;
00925         if (!allow_block) {
00926           timeout = 0;
00927         }
00928 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00929         // In the presence of SIMPLE_THREADS, we never wait at all,
00930         // but rather we yield the thread if we come up empty (so that
00931         // we won't block the entire process).
00932         timeout = 0;
00933 #endif
00934 
00935         _num_results = _fdset.WaitForRead(false, timeout);
00936       }
00937 
00938       if (_num_results == 0 && allow_block) {
00939         // If we reached max_timeout_ms, go back and reconsider.  (We
00940         // never timeout indefinitely, so we can check the shutdown
00941         // flag every once in a while.)
00942         interrupted = true;
00943         Thread::force_yield();
00944 
00945       } else if (_num_results < 0) {
00946         // If we had an error, just return.
00947         return (SocketInfo *)NULL;
00948       }
00949     } while (!_shutdown && interrupted);
00950 
00951     AtomicAdjust::set(_currently_polling_thread, current_thread_index);
00952 
00953     // Repeat the above until we (a) find a socket with actual noise
00954     // on it, or (b) return from PR_Poll() with no sockets available.
00955   } while (!_shutdown && _num_results > 0);
00956 
00957   return (SocketInfo *)NULL;
00958 }
00959 
00960 
00961 ////////////////////////////////////////////////////////////////////
00962 //     Function: ConnectionReader::rebuild_select_list
00963 //       Access: Private
00964 //  Description: Rebuilds the _fdset and _selecting_sockets arrays
00965 //               based on the sockets that are currently available for
00966 //               selecting.
00967 ////////////////////////////////////////////////////////////////////
00968 void ConnectionReader::
00969 rebuild_select_list() {
00970   _fdset.clear();
00971   _selecting_sockets.clear();
00972 
00973   LightMutexHolder holder(_sockets_mutex);
00974   Sockets::const_iterator si;
00975   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00976     SocketInfo *sinfo = (*si);
00977     if (!sinfo->_busy && !sinfo->_error) {
00978       _fdset.setForSocket(*sinfo->get_socket());
00979       _selecting_sockets.push_back(sinfo);
00980     }
00981   }
00982 
00983   // This is also a fine time to delete the contents of the
00984   // _removed_sockets list.
00985   if (!_removed_sockets.empty()) {
00986     Sockets still_busy_sockets;
00987     for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00988       SocketInfo *sinfo = (*si);
00989       if (sinfo->_busy) {
00990         still_busy_sockets.push_back(sinfo);
00991       } else {
00992         delete sinfo;
00993       }
00994     }
00995     _removed_sockets.swap(still_busy_sockets);
00996   }
00997 }
 All Classes Functions Variables Enumerations