Panda3D
 All Classes Functions Variables Enumerations
connectionReader.cxx
00001 // Filename: connectionReader.cxx
00002 // Created by:  drose (08Feb00)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "connectionReader.h"
00016 #include "connectionManager.h"
00017 #include "netDatagram.h"
00018 #include "datagramTCPHeader.h"
00019 #include "datagramUDPHeader.h"
00020 #include "config_net.h"
00021 #include "trueClock.h"
00022 #include "socket_udp.h"
00023 #include "socket_tcp.h"
00024 #include "mutexHolder.h"
00025 #include "lightMutexHolder.h"
00026 #include "pnotify.h"
00027 #include "atomicAdjust.h"
00028 #include "config_downloader.h"
00029 
00030 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
00031 
00032 ////////////////////////////////////////////////////////////////////
00033 //     Function: ConnectionReader::SocketInfo::Constructor
00034 //       Access: Public
00035 //  Description:
00036 ////////////////////////////////////////////////////////////////////
00037 ConnectionReader::SocketInfo::
00038 SocketInfo(const PT(Connection) &connection) :
00039   _connection(connection)
00040 {
00041   _busy = false;
00042   _error = false;
00043 }
00044 
00045 ////////////////////////////////////////////////////////////////////
00046 //     Function: ConnectionReader::SocketInfo::is_udp
00047 //       Access: Public
00048 //  Description:
00049 ////////////////////////////////////////////////////////////////////
00050 bool ConnectionReader::SocketInfo::
00051 is_udp() const {
00052   return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
00053 }
00054 
00055 ////////////////////////////////////////////////////////////////////
00056 //     Function: ConnectionReader::SocketInfo::get_socket
00057 //       Access: Public
00058 //  Description:
00059 ////////////////////////////////////////////////////////////////////
00060 Socket_IP *ConnectionReader::SocketInfo::
00061 get_socket() const {
00062   return _connection->get_socket();
00063 }
00064 
00065 ////////////////////////////////////////////////////////////////////
00066 //     Function: ConnectionReader::ReaderThread::Constructor
00067 //       Access: Public
00068 //  Description:
00069 ////////////////////////////////////////////////////////////////////
00070 ConnectionReader::ReaderThread::
00071 ReaderThread(ConnectionReader *reader, const string &thread_name, 
00072              int thread_index) :
00073   Thread(make_thread_name(thread_name, thread_index), 
00074          make_thread_name(thread_name, thread_index)),
00075   _reader(reader),
00076   _thread_index(thread_index)
00077 {
00078 }
00079 
00080 ////////////////////////////////////////////////////////////////////
00081 //     Function: ConnectionReader::ReaderThread::thread_main
00082 //       Access: Public, Virtual
00083 //  Description:
00084 ////////////////////////////////////////////////////////////////////
00085 void ConnectionReader::ReaderThread::
00086 thread_main() {
00087   _reader->thread_run(_thread_index);
00088 }
00089 
00090 ////////////////////////////////////////////////////////////////////
00091 //     Function: ConnectionReader::Constructor
00092 //       Access: Published
00093 //  Description: Creates a new ConnectionReader with the indicated
00094 //               number of threads to handle requests.  If num_threads
00095 //               is 0, the sockets will only be read by polling,
00096 //               during an explicit poll() call.
00097 //               (QueuedConnectionReader will do this automatically.)
00098 ////////////////////////////////////////////////////////////////////
00099 ConnectionReader::
00100 ConnectionReader(ConnectionManager *manager, int num_threads,
00101                  const string &thread_name) :
00102   _manager(manager)
00103 {
00104   if (!Thread::is_threading_supported()) {
00105 #ifndef NDEBUG
00106     if (num_threads != 0) {
00107       if (net_cat.is_debug()) {
00108         net_cat.debug()
00109           << "Threading support is not available.\n";
00110       }
00111     }
00112 #endif  // NDEBUG
00113     num_threads = 0;
00114   }
00115 
00116   _raw_mode = false;
00117   _tcp_header_size = tcp_header_size;
00118   _polling = (num_threads <= 0);
00119 
00120   _shutdown = false;
00121 
00122   _next_index = 0;
00123   _num_results = 0;
00124 
00125   _currently_polling_thread = -1;
00126 
00127   string reader_thread_name = thread_name;
00128   if (thread_name.empty()) {
00129     reader_thread_name = "ReaderThread";
00130   }
00131   int i;
00132   for (i = 0; i < num_threads; i++) {
00133     PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
00134     _threads.push_back(thread);
00135   }
00136   for (i = 0; i < num_threads; i++) {
00137     _threads[i]->start(net_thread_priority, true);
00138   }
00139 
00140   _manager->add_reader(this);
00141 }
00142 
00143 ////////////////////////////////////////////////////////////////////
00144 //     Function: ConnectionReader::Destructor
00145 //       Access: Published, Virtual
00146 //  Description:
00147 ////////////////////////////////////////////////////////////////////
00148 ConnectionReader::
00149 ~ConnectionReader() {
00150   if (_manager != (ConnectionManager *)NULL) {
00151     _manager->remove_reader(this);
00152   }
00153 
00154   shutdown();
00155 
00156   // Delete all of our old sockets.
00157   Sockets::iterator si;
00158   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00159     delete (*si);
00160   }
00161   for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00162     SocketInfo *sinfo = (*si);
00163     if (!sinfo->_busy) {
00164       delete sinfo;
00165     } else {
00166       net_cat.error()
00167         << "Reentrant deletion of ConnectionReader--don't delete these\n"
00168         << "in response to connection_reset().\n";
00169 
00170       // We'll have to do the best we can to recover.
00171       sinfo->_connection.clear();
00172     }
00173   }
00174 }
00175 
00176 ////////////////////////////////////////////////////////////////////
00177 //     Function: ConnectionReader::add_connection
00178 //       Access: Published
00179 //  Description: Adds a new socket to the list of sockets the
00180 //               ConnectionReader will monitor.  A datagram that comes
00181 //               in on any of the monitored sockets will be reported.
00182 //               In the case of a ConnectionListener, this adds a new
00183 //               rendezvous socket; any activity on any of the
00184 //               monitored sockets will cause a connection to be
00185 //               accepted.
00186 //
00187 //               The return value is true if the connection was added,
00188 //               false if it was already there.
00189 //
00190 //               add_connection() is thread-safe, and may be called at
00191 //               will by any thread.
00192 ////////////////////////////////////////////////////////////////////
00193 bool ConnectionReader::
00194 add_connection(Connection *connection) {
00195   nassertr(connection != (Connection *)NULL, false);
00196 
00197   LightMutexHolder holder(_sockets_mutex);
00198 
00199   // Make sure it's not already on the _sockets list.
00200   Sockets::const_iterator si;
00201   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00202     if ((*si)->_connection == connection) {
00203       // Whoops, already there.
00204       return false;
00205     }
00206   }
00207 
00208   _sockets.push_back(new SocketInfo(connection));
00209 
00210   return true;
00211 }
00212 
00213 ////////////////////////////////////////////////////////////////////
00214 //     Function: ConnectionReader::remove_connection
00215 //       Access: Published
00216 //  Description: Removes a socket from the list of sockets being
00217 //               monitored.  Returns true if the socket was correctly
00218 //               removed, false if it was not on the list in the first
00219 //               place.
00220 //
00221 //               remove_connection() is thread-safe, and may be called
00222 //               at will by any thread.
00223 ////////////////////////////////////////////////////////////////////
00224 bool ConnectionReader::
00225 remove_connection(Connection *connection) {
00226   LightMutexHolder holder(_sockets_mutex);
00227 
00228   // Walk through the list of sockets to find the one we're removing.
00229   Sockets::iterator si;
00230   si = _sockets.begin();
00231   while (si != _sockets.end() && (*si)->_connection != connection) {
00232     ++si;
00233   }
00234   if (si == _sockets.end()) {
00235     return false;
00236   }
00237 
00238   _removed_sockets.push_back(*si);
00239   _sockets.erase(si);
00240 
00241   return true;
00242 }
00243 
00244 ////////////////////////////////////////////////////////////////////
00245 //     Function: ConnectionReader::is_connection_ok
00246 //       Access: Published
00247 //  Description: Returns true if the indicated connection has been
00248 //               added to the ConnectionReader and is being monitored
00249 //               properly, false if it is not known, or if there was
00250 //               some error condition detected on the connection.  (If
00251 //               there was an error condition, normally the
00252 //               ConnectionManager would have been informed and closed
00253 //               the connection.)
00254 ////////////////////////////////////////////////////////////////////
00255 bool ConnectionReader::
00256 is_connection_ok(Connection *connection) {
00257   LightMutexHolder holder(_sockets_mutex);
00258 
00259   // Walk through the list of sockets to find the one we're asking
00260   // about.
00261   Sockets::iterator si;
00262   si = _sockets.begin();
00263   while (si != _sockets.end() && (*si)->_connection != connection) {
00264     ++si;
00265   }
00266   if (si == _sockets.end()) {
00267     // Don't know that connection.
00268     return false;
00269   }
00270 
00271   SocketInfo *sinfo = (*si);
00272   bool is_ok = !sinfo->_error;
00273 
00274   return is_ok;
00275 }
00276 
00277 ////////////////////////////////////////////////////////////////////
00278 //     Function: ConnectionReader::poll
00279 //       Access: Published
00280 //  Description: Explicitly polls the available sockets to see if any
00281 //               of them have any noise.  This function does nothing
00282 //               unless this is a polling-type ConnectionReader,
00283 //               i.e. it was created with zero threads (and
00284 //               is_polling() will return true).
00285 //
00286 //               It is not necessary to call this explicitly for a
00287 //               QueuedConnectionReader.
00288 ////////////////////////////////////////////////////////////////////
00289 void ConnectionReader::
00290 poll() {
00291   if (!_polling) {
00292     return;
00293   }
00294 
00295   SocketInfo *sinfo = get_next_available_socket(false, -2);
00296   if (sinfo != (SocketInfo *)NULL) {
00297     double max_poll_cycle = get_net_max_poll_cycle();
00298     if (max_poll_cycle < 0.0) {
00299       // Continue to read all data.
00300       while (sinfo != (SocketInfo *)NULL) {
00301         process_incoming_data(sinfo);
00302         sinfo = get_next_available_socket(false, -2);
00303       }
00304 
00305     } else {
00306       // Read only until a certain amount of time has elapsed.
00307       TrueClock *global_clock = TrueClock::get_global_ptr();
00308       double stop = global_clock->get_short_time() + max_poll_cycle;
00309 
00310       while (sinfo != (SocketInfo *)NULL) {
00311         process_incoming_data(sinfo);
00312         if (global_clock->get_short_time() >= stop) {
00313           return;
00314         }
00315         sinfo = get_next_available_socket(false, -2);
00316       }
00317     }
00318   }
00319 }
00320 
00321 ////////////////////////////////////////////////////////////////////
00322 //     Function: ConnectionReader::get_manager
00323 //       Access: Published
00324 //  Description: Returns a pointer to the ConnectionManager object
00325 //               that serves this ConnectionReader.
00326 ////////////////////////////////////////////////////////////////////
00327 ConnectionManager *ConnectionReader::
00328 get_manager() const {
00329   return _manager;
00330 }
00331 
00332 ////////////////////////////////////////////////////////////////////
00333 //     Function: ConnectionReader::get_num_threads
00334 //       Access: Published
00335 //  Description: Returns the number of threads the ConnectionReader
00336 //               has been created with.
00337 ////////////////////////////////////////////////////////////////////
00338 int ConnectionReader::
00339 get_num_threads() const {
00340   return _threads.size();
00341 }
00342 
00343 ////////////////////////////////////////////////////////////////////
00344 //     Function: ConnectionReader::set_raw_mode
00345 //       Access: Published
00346 //  Description: Sets the ConnectionReader into raw mode (or turns off
00347 //               raw mode).  In raw mode, datagram headers are not
00348 //               expected; instead, all the data available on the pipe
00349 //               is treated as a single datagram.
00350 //
00351 //               This is similar to set_tcp_header_size(0), except that it
00352 //               also turns off headers for UDP packets.
00353 ////////////////////////////////////////////////////////////////////
00354 void ConnectionReader::
00355 set_raw_mode(bool mode) {
00356   _raw_mode = mode;
00357 }
00358 
00359 ////////////////////////////////////////////////////////////////////
00360 //     Function: ConnectionReader::get_raw_mode
00361 //       Access: Published
00362 //  Description: Returns the current setting of the raw mode flag.
00363 //               See set_raw_mode().
00364 ////////////////////////////////////////////////////////////////////
00365 bool ConnectionReader::
00366 get_raw_mode() const {
00367   return _raw_mode;
00368 }
00369 
00370 ////////////////////////////////////////////////////////////////////
00371 //     Function: ConnectionReader::set_tcp_header_size
00372 //       Access: Published
00373 //  Description: Sets the header size of TCP packets.  At the present,
00374 //               legal values for this are 0, 2, or 4; this specifies
00375 //               the number of bytes to use encode the datagram length
00376 //               at the start of each TCP datagram.  Sender and
00377 //               receiver must independently agree on this.
00378 ////////////////////////////////////////////////////////////////////
00379 void ConnectionReader::
00380 set_tcp_header_size(int tcp_header_size) {
00381   _tcp_header_size = tcp_header_size;
00382 }
00383 
00384 ////////////////////////////////////////////////////////////////////
00385 //     Function: ConnectionReader::get_tcp_header_size
00386 //       Access: Published
00387 //  Description: Returns the current setting of TCP header size.
00388 //               See set_tcp_header_size().
00389 ////////////////////////////////////////////////////////////////////
00390 int ConnectionReader::
00391 get_tcp_header_size() const {
00392   return _tcp_header_size;
00393 }
00394 
00395 ////////////////////////////////////////////////////////////////////
00396 //     Function: ConnectionReader::shutdown
00397 //       Access: Published
00398 //  Description: Terminates all threads cleanly.  Normally this is
00399 //               only called by the destructor, but it may be called
00400 //               explicitly before destruction.
00401 ////////////////////////////////////////////////////////////////////
00402 void ConnectionReader::
00403 shutdown() {
00404   if (_shutdown) {
00405     return;
00406   }
00407 
00408   // First, begin the shutdown.  This will tell our threads we want
00409   // them to quit.
00410   _shutdown = true;
00411 
00412   // Now wait for all of our threads to terminate.
00413   Threads::iterator ti;
00414   for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00415     (*ti)->join();
00416   }
00417 }
00418 
00419 ////////////////////////////////////////////////////////////////////
00420 //     Function: ConnectionReader::flush_read_connection
00421 //       Access: Protected, Virtual
00422 //  Description: Attempts to read all the possible data from the
00423 //               indicated connection, which has just delivered a
00424 //               write error (and has therefore already been closed).
00425 //               If the connection is not monitered by this reader,
00426 //               does nothing.
00427 ////////////////////////////////////////////////////////////////////
00428 void ConnectionReader::
00429 flush_read_connection(Connection *connection) {
00430   // Ensure it doesn't get deleted.
00431   SocketInfo sinfo(connection);
00432 
00433   if (!remove_connection(connection)) {
00434     // Not already in the reader.
00435     return;
00436   }
00437 
00438   // The connection was previously in the reader, but has now been
00439   // removed.  Now we can flush it completely.  We check if there is
00440   // any read data available on just this one socket; we can do this
00441   // right here in this thread, since we've already removed this
00442   // connection from the reader.
00443 
00444   Socket_fdset fdset;
00445   fdset.clear();
00446   fdset.setForSocket(*(sinfo.get_socket()));
00447   int num_results = fdset.WaitForRead(true, 0);
00448   while (num_results != 0) {
00449     sinfo._busy = true;
00450     if (!process_incoming_data(&sinfo)) {
00451       break;
00452     }
00453     fdset.setForSocket(*(sinfo.get_socket()));
00454     num_results = fdset.WaitForRead(true, 0);
00455   }
00456 }
00457 
00458 ////////////////////////////////////////////////////////////////////
00459 //     Function: ConnectionReader::clear_manager
00460 //       Access: Protected
00461 //  Description: This should normally only be called when the
00462 //               associated ConnectionManager destructs.  It resets
00463 //               the ConnectionManager pointer to NULL so we don't
00464 //               have a floating pointer.  This makes the
00465 //               ConnectionReader invalid; presumably it also will be
00466 //               destructed momentarily.
00467 ////////////////////////////////////////////////////////////////////
00468 void ConnectionReader::
00469 clear_manager() {
00470   _manager = (ConnectionManager *)NULL;
00471 }
00472 
00473 ////////////////////////////////////////////////////////////////////
00474 //     Function: ConnectionReader::finish_socket
00475 //       Access: Protected
00476 //  Description: To be called when a socket has been fully read and is
00477 //               ready for polling for additional data.
00478 ////////////////////////////////////////////////////////////////////
00479 void ConnectionReader::
00480 finish_socket(SocketInfo *sinfo) {
00481   nassertv(sinfo->_busy);
00482 
00483   // By marking the SocketInfo nonbusy, we make it available for
00484   // future polls.
00485   sinfo->_busy = false;
00486 }
00487 
00488 ////////////////////////////////////////////////////////////////////
00489 //     Function: ConnectionReader::process_incoming_data
00490 //       Access: Protected, Virtual
00491 //  Description: This is run within a thread when the call to
00492 //               select() indicates there is data available on a
00493 //               socket.  Returns true if the data is read
00494 //               successfully, false on failure (for instance, because
00495 //               the connection is closed).
00496 ////////////////////////////////////////////////////////////////////
00497 bool ConnectionReader::
00498 process_incoming_data(SocketInfo *sinfo) {
00499   if (_raw_mode) {
00500     if (sinfo->is_udp()) {
00501       return process_raw_incoming_udp_data(sinfo);
00502     } else {
00503       return process_raw_incoming_tcp_data(sinfo);
00504     }
00505   } else {
00506     if (sinfo->is_udp()) {
00507       return process_incoming_udp_data(sinfo);
00508     } else {
00509       return process_incoming_tcp_data(sinfo);
00510     }
00511   }
00512 }
00513 
00514 ////////////////////////////////////////////////////////////////////
00515 //     Function: ConnectionReader::process_incoming_udp_data
00516 //       Access: Protected
00517 //  Description:
00518 ////////////////////////////////////////////////////////////////////
00519 bool ConnectionReader::
00520 process_incoming_udp_data(SocketInfo *sinfo) {
00521   Socket_UDP *socket;
00522   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00523   Socket_Address addr;
00524 
00525   // Read as many bytes as we can.
00526   char buffer[read_buffer_size];
00527   int bytes_read = read_buffer_size;
00528 
00529   bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
00530 
00531   if (!okflag) {
00532     finish_socket(sinfo);
00533     return false;
00534 
00535   } else if (bytes_read == 0) {
00536     // The socket was closed (!).  This shouldn't happen with a UDP
00537     // connection.  Oh well.  Report that and return.
00538     if (_manager != (ConnectionManager *)NULL) {
00539       _manager->connection_reset(sinfo->_connection, 0);
00540     }
00541     finish_socket(sinfo);
00542     return false;
00543   }
00544 
00545   // Since we are not running in raw mode, we decode the header to
00546   // determine how big the datagram is.  This means we must have read
00547   // at least a full header.
00548   if (bytes_read < datagram_udp_header_size) {
00549     net_cat.error()
00550       << "Did not read entire header, discarding UDP datagram.\n";
00551     finish_socket(sinfo);
00552     return true;
00553   }
00554   
00555   DatagramUDPHeader header(buffer);
00556   
00557   char *dp = buffer + datagram_udp_header_size;
00558   bytes_read -= datagram_udp_header_size;
00559   
00560   NetDatagram datagram(dp, bytes_read);
00561   
00562   // Now that we've read all the data, it's time to finish the socket
00563   // so another thread can read the next datagram.
00564   finish_socket(sinfo);
00565   
00566   if (_shutdown) {
00567     return false;
00568   }
00569   
00570   // And now do whatever we need to do to process the datagram.
00571   if (!header.verify_datagram(datagram)) {
00572     net_cat.error()
00573       << "Ignoring invalid UDP datagram.\n";
00574   } else {
00575     datagram.set_connection(sinfo->_connection);
00576     datagram.set_address(NetAddress(addr));
00577 
00578     if (net_cat.is_spam()) {
00579       net_cat.spam()
00580         << "Received UDP datagram with " 
00581         << datagram_udp_header_size + datagram.get_length() 
00582         << " bytes on " << (void *)datagram.get_connection()
00583         << " from " << datagram.get_address() << "\n";
00584     }
00585 
00586     receive_datagram(datagram);
00587   }
00588 
00589   return true;
00590 }
00591 
00592 ////////////////////////////////////////////////////////////////////
00593 //     Function: ConnectionReader::process_incoming_tcp_data
00594 //       Access: Protected
00595 //  Description:
00596 ////////////////////////////////////////////////////////////////////
00597 bool ConnectionReader::
00598 process_incoming_tcp_data(SocketInfo *sinfo) {
00599   Socket_TCP *socket;
00600   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00601 
00602   // Read only the header bytes to start with.
00603   char buffer[read_buffer_size];
00604   int header_bytes_read = 0;
00605 
00606   // First, we have to read the first _tcp_header_size bytes.
00607   while (header_bytes_read < _tcp_header_size) {
00608     int bytes_read =
00609       socket->RecvData(buffer + header_bytes_read,
00610                        _tcp_header_size - header_bytes_read);
00611 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00612     while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
00613            socket->Active()) {
00614       Thread::force_yield();
00615       bytes_read = socket->RecvData(buffer + header_bytes_read,
00616                                     _tcp_header_size - header_bytes_read);
00617     }
00618 #endif  // SIMPLE_THREADS
00619 
00620     if (bytes_read <= 0) {
00621       // The socket was closed.  Report that and return.
00622       if (_manager != (ConnectionManager *)NULL) {
00623         _manager->connection_reset(sinfo->_connection, 0);
00624       }
00625       finish_socket(sinfo);
00626       return false;
00627     }
00628 
00629     header_bytes_read += bytes_read;
00630     Thread::consider_yield();
00631   }
00632 
00633   // Now we must decode the header to determine how big the datagram
00634   // is.  This means we must have read at least a full header.
00635   if (header_bytes_read != _tcp_header_size) {
00636     // This should actually be impossible, by the read-loop logic
00637     // above.
00638     net_cat.error()
00639       << "Did not read entire header, discarding TCP datagram.\n";
00640     finish_socket(sinfo);
00641     return true;
00642   }
00643 
00644   DatagramTCPHeader header(buffer, _tcp_header_size);
00645   int size = header.get_datagram_size(_tcp_header_size);
00646 
00647   // We have to loop until the entire datagram is read.
00648   NetDatagram datagram;
00649 
00650   while (!_shutdown && (int)datagram.get_length() < size) {
00651     int bytes_read;
00652 
00653     int read_bytes = read_buffer_size;
00654 #ifdef SIMPLE_THREADS
00655     // In the SIMPLE_THREADS case, we want to limit the number of
00656     // bytes we read in a single epoch, to minimize the impact on the
00657     // other threads.
00658     read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
00659 #endif
00660 
00661     bytes_read =
00662       socket->RecvData(buffer, min(read_bytes,
00663                                    (int)(size - datagram.get_length())));
00664 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00665     while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
00666            socket->Active()) {
00667       Thread::force_yield();
00668       bytes_read =
00669         socket->RecvData(buffer, min(read_bytes,
00670                                      (int)(size - datagram.get_length())));
00671     }
00672 #endif  // SIMPLE_THREADS
00673 
00674     char *dp = buffer;
00675 
00676     if (bytes_read <= 0) {
00677       // The socket was closed.  Report that and return.
00678       if (_manager != (ConnectionManager *)NULL) {
00679         _manager->connection_reset(sinfo->_connection, 0);
00680       }
00681       finish_socket(sinfo);
00682       return false;
00683     }
00684 
00685     int datagram_bytes =
00686       min(bytes_read, (int)(size - datagram.get_length()));
00687     datagram.append_data(dp, datagram_bytes);
00688 
00689     if (bytes_read > datagram_bytes) {
00690       // There were some extra bytes at the end of the datagram.  Maybe
00691       // the beginning of the next datagram?  Huh.
00692       net_cat.error()
00693         << "Discarding " << bytes_read - datagram_bytes
00694         << " bytes following TCP datagram.\n";
00695     }
00696     Thread::consider_yield();
00697   }
00698 
00699   // Now that we've read all the data, it's time to finish the socket
00700   // so another thread can read the next datagram.
00701   finish_socket(sinfo);
00702 
00703   if (_shutdown) {
00704     return false;
00705   }
00706 
00707   // And now do whatever we need to do to process the datagram.
00708   if (!header.verify_datagram(datagram, _tcp_header_size)) {
00709     net_cat.error()
00710       << "Ignoring invalid TCP datagram.\n";
00711   } else {
00712     datagram.set_connection(sinfo->_connection);
00713     datagram.set_address(NetAddress(socket->GetPeerName()));
00714 
00715     if (net_cat.is_spam()) {
00716       net_cat.spam()
00717         << "Received TCP datagram with " 
00718         << _tcp_header_size + datagram.get_length() 
00719         << " bytes on " << (void *)datagram.get_connection()
00720         << " from " << datagram.get_address() << "\n";
00721     }
00722     
00723     receive_datagram(datagram);
00724   }
00725 
00726   return true;
00727 }
00728 
00729 ////////////////////////////////////////////////////////////////////
00730 //     Function: ConnectionReader::process_raw_incoming_udp_data
00731 //       Access: Protected
00732 //  Description:
00733 ////////////////////////////////////////////////////////////////////
00734 bool ConnectionReader::
00735 process_raw_incoming_udp_data(SocketInfo *sinfo) {
00736   Socket_UDP *socket;
00737   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00738   Socket_Address addr;
00739 
00740   // Read as many bytes as we can.
00741   char buffer[read_buffer_size];
00742   int bytes_read = read_buffer_size;
00743 
00744   bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
00745 
00746   if (!okflag) {
00747     finish_socket(sinfo);
00748     return false;
00749 
00750   } else if (bytes_read == 0) {
00751     // The socket was closed (!).  This shouldn't happen with a UDP
00752     // connection.  Oh well.  Report that and return.
00753     if (_manager != (ConnectionManager *)NULL) {
00754       _manager->connection_reset(sinfo->_connection, 0);
00755     }
00756     finish_socket(sinfo);
00757     return false;
00758   }
00759 
00760   // In raw mode, we simply extract all the bytes and make that a
00761   // datagram.
00762   NetDatagram datagram(buffer, bytes_read);
00763   
00764   // Now that we've read all the data, it's time to finish the socket
00765   // so another thread can read the next datagram.
00766   finish_socket(sinfo);
00767   
00768   if (_shutdown) {
00769     return false;
00770   }
00771   
00772   datagram.set_connection(sinfo->_connection);
00773   datagram.set_address(NetAddress(addr));
00774 
00775   if (net_cat.is_spam()) {
00776     net_cat.spam()
00777       << "Received raw UDP datagram with " << datagram.get_length() 
00778       << " bytes on " << (void *)datagram.get_connection()
00779       << " from " << datagram.get_address() << "\n";
00780   }
00781 
00782   receive_datagram(datagram);
00783 
00784   return true;
00785 }
00786 
00787 ////////////////////////////////////////////////////////////////////
00788 //     Function: ConnectionReader::process_raw_incoming_tcp_data
00789 //       Access: Protected
00790 //  Description:
00791 ////////////////////////////////////////////////////////////////////
00792 bool ConnectionReader::
00793 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
00794   Socket_TCP *socket;
00795   DCAST_INTO_R(socket, sinfo->get_socket(), false);
00796 
00797   // Read as many bytes as we can.
00798   char buffer[read_buffer_size];
00799   int bytes_read = socket->RecvData(buffer, read_buffer_size);
00800 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00801   while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && 
00802          socket->Active()) {
00803     Thread::force_yield();
00804     bytes_read = socket->RecvData(buffer, read_buffer_size);
00805   }
00806 #endif  // SIMPLE_THREADS
00807 
00808   if (bytes_read <= 0) {
00809     // The socket was closed.  Report that and return.
00810     if (_manager != (ConnectionManager *)NULL) {
00811       _manager->connection_reset(sinfo->_connection, 0);
00812     }
00813     finish_socket(sinfo);
00814     return false;
00815   }
00816 
00817   // In raw mode, we simply extract all the bytes and make that a
00818   // datagram.
00819   NetDatagram datagram(buffer, bytes_read);
00820   
00821   // Now that we've read all the data, it's time to finish the socket
00822   // so another thread can read the next datagram.
00823   finish_socket(sinfo);
00824   
00825   if (_shutdown) {
00826     return false;
00827   }
00828   
00829   datagram.set_connection(sinfo->_connection);
00830   datagram.set_address(NetAddress(socket->GetPeerName()));
00831 
00832   if (net_cat.is_spam()) {
00833     net_cat.spam()
00834       << "Received raw TCP datagram with " << datagram.get_length() 
00835       << " bytes on " << (void *)datagram.get_connection()
00836       << " from " << datagram.get_address() << "\n";
00837   }
00838 
00839   receive_datagram(datagram);
00840 
00841   return true;
00842 }
00843 
00844 ////////////////////////////////////////////////////////////////////
00845 //     Function: ConnectionReader::thread_run
00846 //       Access: Private
00847 //  Description: This is the actual executing function for each
00848 //               thread.
00849 ////////////////////////////////////////////////////////////////////
00850 void ConnectionReader::
00851 thread_run(int thread_index) {
00852   nassertv(!_polling);
00853   nassertv(_threads[thread_index] == Thread::get_current_thread());
00854 
00855   while (!_shutdown) {
00856     SocketInfo *sinfo =
00857       get_next_available_socket(true, thread_index);
00858     if (sinfo != (SocketInfo *)NULL) {
00859       process_incoming_data(sinfo);
00860       Thread::consider_yield();
00861     } else {
00862       Thread::force_yield();
00863     }
00864   }
00865 }
00866 
00867 
00868 ////////////////////////////////////////////////////////////////////
00869 //     Function: ConnectionReader::get_next_available_socket
00870 //       Access: Private
00871 //  Description: Polls the known connections for activity and returns
00872 //               the next one known to have activity, or NULL if no
00873 //               activity is detected within the timeout interval.
00874 //
00875 //               This function may block indefinitely if it is being
00876 //               called by multiple threads; if there are no other
00877 //               threads, it may block only if allow_block is true.
00878 ////////////////////////////////////////////////////////////////////
00879 ConnectionReader::SocketInfo *ConnectionReader::
00880 get_next_available_socket(bool allow_block, int current_thread_index) {
00881   // Go to sleep on the select() mutex.  This guarantees that only one
00882   // thread is in this function at a time.
00883   MutexHolder holder(_select_mutex);
00884 
00885   do {
00886     // First, check the result from the previous select call.  If
00887     // there are any sockets remaining there, process them first.
00888     while (!_shutdown && _num_results > 0) {
00889       nassertr(_next_index < (int)_selecting_sockets.size(), NULL);
00890       int i = _next_index;
00891       _next_index++;
00892 
00893       if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
00894         _num_results--;
00895         SocketInfo *sinfo = _selecting_sockets[i];
00896 
00897         // Some noise on this socket.
00898         sinfo->_busy = true;
00899         return sinfo;
00900       }
00901     }
00902 
00903     bool interrupted;
00904     do {
00905       interrupted = false;
00906 
00907       // Ok, no results from previous select calls.  Prepare to set up
00908       // for a new select.
00909 
00910       // First, report to anyone else who cares that we're the thread
00911       // about to do the poll.  That way, if any new sockets come
00912       // available while we're polling, we can service them.
00913       AtomicAdjust::set(_currently_polling_thread, current_thread_index);
00914       
00915       rebuild_select_list();
00916 
00917       // Now we can execute the select.
00918       _num_results = 0;
00919       _next_index = 0;
00920 
00921       if (!_shutdown) {
00922         PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0);
00923         if (!allow_block) {
00924           timeout = 0;
00925         }
00926 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00927         // In the presence of SIMPLE_THREADS, we never wait at all,
00928         // but rather we yield the thread if we come up empty (so that
00929         // we won't block the entire process).
00930         timeout = 0;
00931 #endif
00932 
00933         _num_results = _fdset.WaitForRead(false, timeout);
00934       }
00935 
00936       if (_num_results == 0 && allow_block) {
00937         // If we reached net_max_block, go back and reconsider.  (We
00938         // never timeout indefinitely, so we can check the shutdown
00939         // flag every once in a while.)
00940         interrupted = true;
00941         Thread::force_yield();
00942 
00943       } else if (_num_results < 0) {
00944         // If we had an error, just return.  But yield the timeslice
00945         // first.
00946         Thread::force_yield();
00947         return (SocketInfo *)NULL;
00948       }
00949     } while (!_shutdown && interrupted);
00950 
00951     AtomicAdjust::set(_currently_polling_thread, current_thread_index);
00952 
00953     // Repeat the above until we (a) find a socket with actual noise
00954     // on it, or (b) return from PR_Poll() with no sockets available.
00955   } while (!_shutdown && _num_results > 0);
00956 
00957   return (SocketInfo *)NULL;
00958 }
00959 
00960 
00961 ////////////////////////////////////////////////////////////////////
00962 //     Function: ConnectionReader::rebuild_select_list
00963 //       Access: Private
00964 //  Description: Rebuilds the _fdset and _selecting_sockets arrays
00965 //               based on the sockets that are currently available for
00966 //               selecting.
00967 ////////////////////////////////////////////////////////////////////
00968 void ConnectionReader::
00969 rebuild_select_list() {
00970   _fdset.clear();
00971   _selecting_sockets.clear();
00972 
00973   LightMutexHolder holder(_sockets_mutex);
00974   Sockets::const_iterator si;
00975   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00976     SocketInfo *sinfo = (*si);
00977     if (!sinfo->_busy && !sinfo->_error) {
00978       _fdset.setForSocket(*sinfo->get_socket());
00979       _selecting_sockets.push_back(sinfo);
00980     }
00981   }
00982 
00983   // This is also a fine time to delete the contents of the
00984   // _removed_sockets list.
00985   if (!_removed_sockets.empty()) {
00986     Sockets still_busy_sockets;
00987     for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00988       SocketInfo *sinfo = (*si);
00989       if (sinfo->_busy) {
00990         still_busy_sockets.push_back(sinfo);
00991       } else {
00992         delete sinfo;
00993       }
00994     }
00995     _removed_sockets.swap(still_busy_sockets);
00996   }
00997 }
00998 
00999 ////////////////////////////////////////////////////////////////////
01000 //     Function: ConnectionReader::accumulate_fdset
01001 //       Access: Private
01002 //  Description: Adds the sockets from this ConnectionReader (or
01003 //               ConnectionListener) to the indicated fdset.  This is
01004 //               used by ConnectionManager::block() to build an fdset
01005 //               of all attached readers.
01006 ////////////////////////////////////////////////////////////////////
01007 void ConnectionReader::
01008 accumulate_fdset(Socket_fdset &fdset) {
01009   LightMutexHolder holder(_sockets_mutex);
01010   Sockets::const_iterator si;
01011   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
01012     SocketInfo *sinfo = (*si);
01013     if (!sinfo->_busy && !sinfo->_error) {
01014       fdset.setForSocket(*sinfo->get_socket());
01015     }
01016   }
01017 }
 All Classes Functions Variables Enumerations