Panda3D
connectionReader.cxx
1 // Filename: connectionReader.cxx
2 // Created by: drose (08Feb00)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "connectionReader.h"
16 #include "dcast.h"
17 #include "connectionManager.h"
18 #include "netDatagram.h"
19 #include "datagramTCPHeader.h"
20 #include "datagramUDPHeader.h"
21 #include "config_net.h"
22 #include "trueClock.h"
23 #include "socket_udp.h"
24 #include "socket_tcp.h"
25 #include "mutexHolder.h"
26 #include "lightMutexHolder.h"
27 #include "pnotify.h"
28 #include "atomicAdjust.h"
29 #include "config_downloader.h"
30 
31 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
32 
33 ////////////////////////////////////////////////////////////////////
34 // Function: ConnectionReader::SocketInfo::Constructor
35 // Access: Public
36 // Description:
37 ////////////////////////////////////////////////////////////////////
38 ConnectionReader::SocketInfo::
39 SocketInfo(const PT(Connection) &connection) :
40  _connection(connection)
41 {
42  _busy = false;
43  _error = false;
44 }
45 
46 ////////////////////////////////////////////////////////////////////
47 // Function: ConnectionReader::SocketInfo::is_udp
48 // Access: Public
49 // Description:
50 ////////////////////////////////////////////////////////////////////
51 bool ConnectionReader::SocketInfo::
52 is_udp() const {
53  return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
54 }
55 
56 ////////////////////////////////////////////////////////////////////
57 // Function: ConnectionReader::SocketInfo::get_socket
58 // Access: Public
59 // Description:
60 ////////////////////////////////////////////////////////////////////
61 Socket_IP *ConnectionReader::SocketInfo::
62 get_socket() const {
63  return _connection->get_socket();
64 }
65 
66 ////////////////////////////////////////////////////////////////////
67 // Function: ConnectionReader::ReaderThread::Constructor
68 // Access: Public
69 // Description:
70 ////////////////////////////////////////////////////////////////////
71 ConnectionReader::ReaderThread::
72 ReaderThread(ConnectionReader *reader, const string &thread_name,
73  int thread_index) :
74  Thread(make_thread_name(thread_name, thread_index),
75  make_thread_name(thread_name, thread_index)),
76  _reader(reader),
77  _thread_index(thread_index)
78 {
79 }
80 
81 ////////////////////////////////////////////////////////////////////
82 // Function: ConnectionReader::ReaderThread::thread_main
83 // Access: Public, Virtual
84 // Description:
85 ////////////////////////////////////////////////////////////////////
86 void ConnectionReader::ReaderThread::
87 thread_main() {
88  _reader->thread_run(_thread_index);
89 }
90 
91 ////////////////////////////////////////////////////////////////////
92 // Function: ConnectionReader::Constructor
93 // Access: Published
94 // Description: Creates a new ConnectionReader with the indicated
95 // number of threads to handle requests. If num_threads
96 // is 0, the sockets will only be read by polling,
97 // during an explicit poll() call.
98 // (QueuedConnectionReader will do this automatically.)
99 ////////////////////////////////////////////////////////////////////
101 ConnectionReader(ConnectionManager *manager, int num_threads,
102  const string &thread_name) :
103  _manager(manager)
104 {
106 #ifndef NDEBUG
107  if (num_threads != 0) {
108  if (net_cat.is_debug()) {
109  net_cat.debug()
110  << "Threading support is not available.\n";
111  }
112  }
113 #endif // NDEBUG
114  num_threads = 0;
115  }
116 
117  _raw_mode = false;
118  _tcp_header_size = tcp_header_size;
119  _polling = (num_threads <= 0);
120 
121  _shutdown = false;
122 
123  _next_index = 0;
124  _num_results = 0;
125 
126  _currently_polling_thread = -1;
127 
128  string reader_thread_name = thread_name;
129  if (thread_name.empty()) {
130  reader_thread_name = "ReaderThread";
131  }
132  int i;
133  for (i = 0; i < num_threads; i++) {
134  PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
135  _threads.push_back(thread);
136  }
137  for (i = 0; i < num_threads; i++) {
138  _threads[i]->start(net_thread_priority, true);
139  }
140 
141  _manager->add_reader(this);
142 }
143 
144 ////////////////////////////////////////////////////////////////////
145 // Function: ConnectionReader::Destructor
146 // Access: Published, Virtual
147 // Description:
148 ////////////////////////////////////////////////////////////////////
149 ConnectionReader::
150 ~ConnectionReader() {
151  if (_manager != (ConnectionManager *)NULL) {
152  _manager->remove_reader(this);
153  }
154 
155  shutdown();
156 
157  // Delete all of our old sockets.
158  Sockets::iterator si;
159  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
160  delete (*si);
161  }
162  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
163  SocketInfo *sinfo = (*si);
164  if (!sinfo->_busy) {
165  delete sinfo;
166  } else {
167  net_cat.error()
168  << "Reentrant deletion of ConnectionReader--don't delete these\n"
169  << "in response to connection_reset().\n";
170 
171  // We'll have to do the best we can to recover.
172  sinfo->_connection.clear();
173  }
174  }
175 }
176 
177 ////////////////////////////////////////////////////////////////////
178 // Function: ConnectionReader::add_connection
179 // Access: Published
180 // Description: Adds a new socket to the list of sockets the
181 // ConnectionReader will monitor. A datagram that comes
182 // in on any of the monitored sockets will be reported.
183 // In the case of a ConnectionListener, this adds a new
184 // rendezvous socket; any activity on any of the
185 // monitored sockets will cause a connection to be
186 // accepted.
187 //
188 // The return value is true if the connection was added,
189 // false if it was already there.
190 //
191 // add_connection() is thread-safe, and may be called at
192 // will by any thread.
193 ////////////////////////////////////////////////////////////////////
196  nassertr(connection != (Connection *)NULL, false);
197 
198  LightMutexHolder holder(_sockets_mutex);
199 
200  // Make sure it's not already on the _sockets list.
201  Sockets::const_iterator si;
202  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
203  if ((*si)->_connection == connection) {
204  // Whoops, already there.
205  return false;
206  }
207  }
208 
209  _sockets.push_back(new SocketInfo(connection));
210 
211  return true;
212 }
213 
214 ////////////////////////////////////////////////////////////////////
215 // Function: ConnectionReader::remove_connection
216 // Access: Published
217 // Description: Removes a socket from the list of sockets being
218 // monitored. Returns true if the socket was correctly
219 // removed, false if it was not on the list in the first
220 // place.
221 //
222 // remove_connection() is thread-safe, and may be called
223 // at will by any thread.
224 ////////////////////////////////////////////////////////////////////
227  LightMutexHolder holder(_sockets_mutex);
228 
229  // Walk through the list of sockets to find the one we're removing.
230  Sockets::iterator si;
231  si = _sockets.begin();
232  while (si != _sockets.end() && (*si)->_connection != connection) {
233  ++si;
234  }
235  if (si == _sockets.end()) {
236  return false;
237  }
238 
239  _removed_sockets.push_back(*si);
240  _sockets.erase(si);
241 
242  return true;
243 }
244 
245 ////////////////////////////////////////////////////////////////////
246 // Function: ConnectionReader::is_connection_ok
247 // Access: Published
248 // Description: Returns true if the indicated connection has been
249 // added to the ConnectionReader and is being monitored
250 // properly, false if it is not known, or if there was
251 // some error condition detected on the connection. (If
252 // there was an error condition, normally the
253 // ConnectionManager would have been informed and closed
254 // the connection.)
255 ////////////////////////////////////////////////////////////////////
258  LightMutexHolder holder(_sockets_mutex);
259 
260  // Walk through the list of sockets to find the one we're asking
261  // about.
262  Sockets::iterator si;
263  si = _sockets.begin();
264  while (si != _sockets.end() && (*si)->_connection != connection) {
265  ++si;
266  }
267  if (si == _sockets.end()) {
268  // Don't know that connection.
269  return false;
270  }
271 
272  SocketInfo *sinfo = (*si);
273  bool is_ok = !sinfo->_error;
274 
275  return is_ok;
276 }
277 
278 ////////////////////////////////////////////////////////////////////
279 // Function: ConnectionReader::poll
280 // Access: Published
281 // Description: Explicitly polls the available sockets to see if any
282 // of them have any noise. This function does nothing
283 // unless this is a polling-type ConnectionReader,
284 // i.e. it was created with zero threads (and
285 // is_polling() will return true).
286 //
287 // It is not necessary to call this explicitly for a
288 // QueuedConnectionReader.
289 ////////////////////////////////////////////////////////////////////
291 poll() {
292  if (!_polling) {
293  return;
294  }
295 
296  SocketInfo *sinfo = get_next_available_socket(false, -2);
297  if (sinfo != (SocketInfo *)NULL) {
298  double max_poll_cycle = get_net_max_poll_cycle();
299  if (max_poll_cycle < 0.0) {
300  // Continue to read all data.
301  while (sinfo != (SocketInfo *)NULL) {
302  process_incoming_data(sinfo);
303  sinfo = get_next_available_socket(false, -2);
304  }
305 
306  } else {
307  // Read only until a certain amount of time has elapsed.
308  TrueClock *global_clock = TrueClock::get_global_ptr();
309  double stop = global_clock->get_short_time() + max_poll_cycle;
310 
311  while (sinfo != (SocketInfo *)NULL) {
312  process_incoming_data(sinfo);
313  if (global_clock->get_short_time() >= stop) {
314  return;
315  }
316  sinfo = get_next_available_socket(false, -2);
317  }
318  }
319  }
320 }
321 
322 ////////////////////////////////////////////////////////////////////
323 // Function: ConnectionReader::get_manager
324 // Access: Published
325 // Description: Returns a pointer to the ConnectionManager object
326 // that serves this ConnectionReader.
327 ////////////////////////////////////////////////////////////////////
329 get_manager() const {
330  return _manager;
331 }
332 
333 ////////////////////////////////////////////////////////////////////
334 // Function: ConnectionReader::get_num_threads
335 // Access: Published
336 // Description: Returns the number of threads the ConnectionReader
337 // has been created with.
338 ////////////////////////////////////////////////////////////////////
341  return _threads.size();
342 }
343 
344 ////////////////////////////////////////////////////////////////////
345 // Function: ConnectionReader::set_raw_mode
346 // Access: Published
347 // Description: Sets the ConnectionReader into raw mode (or turns off
348 // raw mode). In raw mode, datagram headers are not
349 // expected; instead, all the data available on the pipe
350 // is treated as a single datagram.
351 //
352 // This is similar to set_tcp_header_size(0), except that it
353 // also turns off headers for UDP packets.
354 ////////////////////////////////////////////////////////////////////
356 set_raw_mode(bool mode) {
357  _raw_mode = mode;
358 }
359 
360 ////////////////////////////////////////////////////////////////////
361 // Function: ConnectionReader::get_raw_mode
362 // Access: Published
363 // Description: Returns the current setting of the raw mode flag.
364 // See set_raw_mode().
365 ////////////////////////////////////////////////////////////////////
367 get_raw_mode() const {
368  return _raw_mode;
369 }
370 
371 ////////////////////////////////////////////////////////////////////
372 // Function: ConnectionReader::set_tcp_header_size
373 // Access: Published
374 // Description: Sets the header size of TCP packets. At the present,
375 // legal values for this are 0, 2, or 4; this specifies
376 // the number of bytes to use encode the datagram length
377 // at the start of each TCP datagram. Sender and
378 // receiver must independently agree on this.
379 ////////////////////////////////////////////////////////////////////
381 set_tcp_header_size(int tcp_header_size) {
382  _tcp_header_size = tcp_header_size;
383 }
384 
385 ////////////////////////////////////////////////////////////////////
386 // Function: ConnectionReader::get_tcp_header_size
387 // Access: Published
388 // Description: Returns the current setting of TCP header size.
389 // See set_tcp_header_size().
390 ////////////////////////////////////////////////////////////////////
393  return _tcp_header_size;
394 }
395 
396 ////////////////////////////////////////////////////////////////////
397 // Function: ConnectionReader::shutdown
398 // Access: Published
399 // Description: Terminates all threads cleanly. Normally this is
400 // only called by the destructor, but it may be called
401 // explicitly before destruction.
402 ////////////////////////////////////////////////////////////////////
405  if (_shutdown) {
406  return;
407  }
408 
409  // First, begin the shutdown. This will tell our threads we want
410  // them to quit.
411  _shutdown = true;
412 
413  // Now wait for all of our threads to terminate.
414  Threads::iterator ti;
415  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
416  (*ti)->join();
417  }
418 }
419 
420 ////////////////////////////////////////////////////////////////////
421 // Function: ConnectionReader::flush_read_connection
422 // Access: Protected, Virtual
423 // Description: Attempts to read all the possible data from the
424 // indicated connection, which has just delivered a
425 // write error (and has therefore already been closed).
426 // If the connection is not monitered by this reader,
427 // does nothing.
428 ////////////////////////////////////////////////////////////////////
429 void ConnectionReader::
430 flush_read_connection(Connection *connection) {
431  // Ensure it doesn't get deleted.
432  SocketInfo sinfo(connection);
433 
434  if (!remove_connection(connection)) {
435  // Not already in the reader.
436  return;
437  }
438 
439  // The connection was previously in the reader, but has now been
440  // removed. Now we can flush it completely. We check if there is
441  // any read data available on just this one socket; we can do this
442  // right here in this thread, since we've already removed this
443  // connection from the reader.
444 
445  Socket_fdset fdset;
446  fdset.clear();
447  fdset.setForSocket(*(sinfo.get_socket()));
448  int num_results = fdset.WaitForRead(true, 0);
449  while (num_results != 0) {
450  sinfo._busy = true;
451  if (!process_incoming_data(&sinfo)) {
452  break;
453  }
454  fdset.setForSocket(*(sinfo.get_socket()));
455  num_results = fdset.WaitForRead(true, 0);
456  }
457 }
458 
459 ////////////////////////////////////////////////////////////////////
460 // Function: ConnectionReader::clear_manager
461 // Access: Protected
462 // Description: This should normally only be called when the
463 // associated ConnectionManager destructs. It resets
464 // the ConnectionManager pointer to NULL so we don't
465 // have a floating pointer. This makes the
466 // ConnectionReader invalid; presumably it also will be
467 // destructed momentarily.
468 ////////////////////////////////////////////////////////////////////
469 void ConnectionReader::
470 clear_manager() {
471  _manager = (ConnectionManager *)NULL;
472 }
473 
474 ////////////////////////////////////////////////////////////////////
475 // Function: ConnectionReader::finish_socket
476 // Access: Protected
477 // Description: To be called when a socket has been fully read and is
478 // ready for polling for additional data.
479 ////////////////////////////////////////////////////////////////////
480 void ConnectionReader::
481 finish_socket(SocketInfo *sinfo) {
482  nassertv(sinfo->_busy);
483 
484  // By marking the SocketInfo nonbusy, we make it available for
485  // future polls.
486  sinfo->_busy = false;
487 }
488 
489 ////////////////////////////////////////////////////////////////////
490 // Function: ConnectionReader::process_incoming_data
491 // Access: Protected, Virtual
492 // Description: This is run within a thread when the call to
493 // select() indicates there is data available on a
494 // socket. Returns true if the data is read
495 // successfully, false on failure (for instance, because
496 // the connection is closed).
497 ////////////////////////////////////////////////////////////////////
498 bool ConnectionReader::
499 process_incoming_data(SocketInfo *sinfo) {
500  if (_raw_mode) {
501  if (sinfo->is_udp()) {
502  return process_raw_incoming_udp_data(sinfo);
503  } else {
504  return process_raw_incoming_tcp_data(sinfo);
505  }
506  } else {
507  if (sinfo->is_udp()) {
508  return process_incoming_udp_data(sinfo);
509  } else {
510  return process_incoming_tcp_data(sinfo);
511  }
512  }
513 }
514 
515 ////////////////////////////////////////////////////////////////////
516 // Function: ConnectionReader::process_incoming_udp_data
517 // Access: Protected
518 // Description:
519 ////////////////////////////////////////////////////////////////////
520 bool ConnectionReader::
521 process_incoming_udp_data(SocketInfo *sinfo) {
522  Socket_UDP *socket;
523  DCAST_INTO_R(socket, sinfo->get_socket(), false);
524  Socket_Address addr;
525 
526  // Read as many bytes as we can.
527  char buffer[read_buffer_size];
528  int bytes_read = read_buffer_size;
529 
530  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
531 
532  if (!okflag) {
533  finish_socket(sinfo);
534  return false;
535 
536  } else if (bytes_read == 0) {
537  // The socket was closed (!). This shouldn't happen with a UDP
538  // connection. Oh well. Report that and return.
539  if (_manager != (ConnectionManager *)NULL) {
540  _manager->connection_reset(sinfo->_connection, 0);
541  }
542  finish_socket(sinfo);
543  return false;
544  }
545 
546  // Since we are not running in raw mode, we decode the header to
547  // determine how big the datagram is. This means we must have read
548  // at least a full header.
549  if (bytes_read < datagram_udp_header_size) {
550  net_cat.error()
551  << "Did not read entire header, discarding UDP datagram.\n";
552  finish_socket(sinfo);
553  return true;
554  }
555 
556  DatagramUDPHeader header(buffer);
557 
558  char *dp = buffer + datagram_udp_header_size;
559  bytes_read -= datagram_udp_header_size;
560 
561  NetDatagram datagram(dp, bytes_read);
562 
563  // Now that we've read all the data, it's time to finish the socket
564  // so another thread can read the next datagram.
565  finish_socket(sinfo);
566 
567  if (_shutdown) {
568  return false;
569  }
570 
571  // And now do whatever we need to do to process the datagram.
572  if (!header.verify_datagram(datagram)) {
573  net_cat.error()
574  << "Ignoring invalid UDP datagram.\n";
575  } else {
576  datagram.set_connection(sinfo->_connection);
577  datagram.set_address(NetAddress(addr));
578 
579  if (net_cat.is_spam()) {
580  net_cat.spam()
581  << "Received UDP datagram with "
582  << datagram_udp_header_size + datagram.get_length()
583  << " bytes on " << (void *)datagram.get_connection()
584  << " from " << datagram.get_address() << "\n";
585  }
586 
587  receive_datagram(datagram);
588  }
589 
590  return true;
591 }
592 
593 ////////////////////////////////////////////////////////////////////
594 // Function: ConnectionReader::process_incoming_tcp_data
595 // Access: Protected
596 // Description:
597 ////////////////////////////////////////////////////////////////////
598 bool ConnectionReader::
599 process_incoming_tcp_data(SocketInfo *sinfo) {
600  Socket_TCP *socket;
601  DCAST_INTO_R(socket, sinfo->get_socket(), false);
602 
603  // Read only the header bytes to start with.
604  char buffer[read_buffer_size];
605  int header_bytes_read = 0;
606 
607  // First, we have to read the first _tcp_header_size bytes.
608  while (header_bytes_read < _tcp_header_size) {
609  int bytes_read =
610  socket->RecvData(buffer + header_bytes_read,
611  _tcp_header_size - header_bytes_read);
612 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
613  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
614  socket->Active()) {
616  bytes_read = socket->RecvData(buffer + header_bytes_read,
617  _tcp_header_size - header_bytes_read);
618  }
619 #endif // SIMPLE_THREADS
620 
621  if (bytes_read <= 0) {
622  // The socket was closed. Report that and return.
623  if (_manager != (ConnectionManager *)NULL) {
624  _manager->connection_reset(sinfo->_connection, 0);
625  }
626  finish_socket(sinfo);
627  return false;
628  }
629 
630  header_bytes_read += bytes_read;
632  }
633 
634  // Now we must decode the header to determine how big the datagram
635  // is. This means we must have read at least a full header.
636  if (header_bytes_read != _tcp_header_size) {
637  // This should actually be impossible, by the read-loop logic
638  // above.
639  net_cat.error()
640  << "Did not read entire header, discarding TCP datagram.\n";
641  finish_socket(sinfo);
642  return true;
643  }
644 
645  DatagramTCPHeader header(buffer, _tcp_header_size);
646  int size = header.get_datagram_size(_tcp_header_size);
647 
648  // We have to loop until the entire datagram is read.
649  NetDatagram datagram;
650 
651  while (!_shutdown && (int)datagram.get_length() < size) {
652  int bytes_read;
653 
654  int read_bytes = read_buffer_size;
655 #ifdef SIMPLE_THREADS
656  // In the SIMPLE_THREADS case, we want to limit the number of
657  // bytes we read in a single epoch, to minimize the impact on the
658  // other threads.
659  read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
660 #endif
661 
662  bytes_read =
663  socket->RecvData(buffer, min(read_bytes,
664  (int)(size - datagram.get_length())));
665 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
666  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
667  socket->Active()) {
669  bytes_read =
670  socket->RecvData(buffer, min(read_bytes,
671  (int)(size - datagram.get_length())));
672  }
673 #endif // SIMPLE_THREADS
674 
675  char *dp = buffer;
676 
677  if (bytes_read <= 0) {
678  // The socket was closed. Report that and return.
679  if (_manager != (ConnectionManager *)NULL) {
680  _manager->connection_reset(sinfo->_connection, 0);
681  }
682  finish_socket(sinfo);
683  return false;
684  }
685 
686  int datagram_bytes =
687  min(bytes_read, (int)(size - datagram.get_length()));
688  datagram.append_data(dp, datagram_bytes);
689 
690  if (bytes_read > datagram_bytes) {
691  // There were some extra bytes at the end of the datagram. Maybe
692  // the beginning of the next datagram? Huh.
693  net_cat.error()
694  << "Discarding " << bytes_read - datagram_bytes
695  << " bytes following TCP datagram.\n";
696  }
698  }
699 
700  // Now that we've read all the data, it's time to finish the socket
701  // so another thread can read the next datagram.
702  finish_socket(sinfo);
703 
704  if (_shutdown) {
705  return false;
706  }
707 
708  // And now do whatever we need to do to process the datagram.
709  if (!header.verify_datagram(datagram, _tcp_header_size)) {
710  net_cat.error()
711  << "Ignoring invalid TCP datagram.\n";
712  } else {
713  datagram.set_connection(sinfo->_connection);
714  datagram.set_address(NetAddress(socket->GetPeerName()));
715 
716  if (net_cat.is_spam()) {
717  net_cat.spam()
718  << "Received TCP datagram with "
719  << _tcp_header_size + datagram.get_length()
720  << " bytes on " << (void *)datagram.get_connection()
721  << " from " << datagram.get_address() << "\n";
722  }
723 
724  receive_datagram(datagram);
725  }
726 
727  return true;
728 }
729 
730 ////////////////////////////////////////////////////////////////////
731 // Function: ConnectionReader::process_raw_incoming_udp_data
732 // Access: Protected
733 // Description:
734 ////////////////////////////////////////////////////////////////////
735 bool ConnectionReader::
736 process_raw_incoming_udp_data(SocketInfo *sinfo) {
737  Socket_UDP *socket;
738  DCAST_INTO_R(socket, sinfo->get_socket(), false);
739  Socket_Address addr;
740 
741  // Read as many bytes as we can.
742  char buffer[read_buffer_size];
743  int bytes_read = read_buffer_size;
744 
745  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
746 
747  if (!okflag) {
748  finish_socket(sinfo);
749  return false;
750 
751  } else if (bytes_read == 0) {
752  // The socket was closed (!). This shouldn't happen with a UDP
753  // connection. Oh well. Report that and return.
754  if (_manager != (ConnectionManager *)NULL) {
755  _manager->connection_reset(sinfo->_connection, 0);
756  }
757  finish_socket(sinfo);
758  return false;
759  }
760 
761  // In raw mode, we simply extract all the bytes and make that a
762  // datagram.
763  NetDatagram datagram(buffer, bytes_read);
764 
765  // Now that we've read all the data, it's time to finish the socket
766  // so another thread can read the next datagram.
767  finish_socket(sinfo);
768 
769  if (_shutdown) {
770  return false;
771  }
772 
773  datagram.set_connection(sinfo->_connection);
774  datagram.set_address(NetAddress(addr));
775 
776  if (net_cat.is_spam()) {
777  net_cat.spam()
778  << "Received raw UDP datagram with " << datagram.get_length()
779  << " bytes on " << (void *)datagram.get_connection()
780  << " from " << datagram.get_address() << "\n";
781  }
782 
783  receive_datagram(datagram);
784 
785  return true;
786 }
787 
788 ////////////////////////////////////////////////////////////////////
789 // Function: ConnectionReader::process_raw_incoming_tcp_data
790 // Access: Protected
791 // Description:
792 ////////////////////////////////////////////////////////////////////
793 bool ConnectionReader::
794 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
795  Socket_TCP *socket;
796  DCAST_INTO_R(socket, sinfo->get_socket(), false);
797 
798  // Read as many bytes as we can.
799  char buffer[read_buffer_size];
800  int bytes_read = socket->RecvData(buffer, read_buffer_size);
801 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
802  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
803  socket->Active()) {
805  bytes_read = socket->RecvData(buffer, read_buffer_size);
806  }
807 #endif // SIMPLE_THREADS
808 
809  if (bytes_read <= 0) {
810  // The socket was closed. Report that and return.
811  if (_manager != (ConnectionManager *)NULL) {
812  _manager->connection_reset(sinfo->_connection, 0);
813  }
814  finish_socket(sinfo);
815  return false;
816  }
817 
818  // In raw mode, we simply extract all the bytes and make that a
819  // datagram.
820  NetDatagram datagram(buffer, bytes_read);
821 
822  // Now that we've read all the data, it's time to finish the socket
823  // so another thread can read the next datagram.
824  finish_socket(sinfo);
825 
826  if (_shutdown) {
827  return false;
828  }
829 
830  datagram.set_connection(sinfo->_connection);
831  datagram.set_address(NetAddress(socket->GetPeerName()));
832 
833  if (net_cat.is_spam()) {
834  net_cat.spam()
835  << "Received raw TCP datagram with " << datagram.get_length()
836  << " bytes on " << (void *)datagram.get_connection()
837  << " from " << datagram.get_address() << "\n";
838  }
839 
840  receive_datagram(datagram);
841 
842  return true;
843 }
844 
845 ////////////////////////////////////////////////////////////////////
846 // Function: ConnectionReader::thread_run
847 // Access: Private
848 // Description: This is the actual executing function for each
849 // thread.
850 ////////////////////////////////////////////////////////////////////
851 void ConnectionReader::
852 thread_run(int thread_index) {
853  nassertv(!_polling);
854  nassertv(_threads[thread_index] == Thread::get_current_thread());
855 
856  while (!_shutdown) {
857  SocketInfo *sinfo =
858  get_next_available_socket(true, thread_index);
859  if (sinfo != (SocketInfo *)NULL) {
860  process_incoming_data(sinfo);
862  } else {
864  }
865  }
866 }
867 
868 
869 ////////////////////////////////////////////////////////////////////
870 // Function: ConnectionReader::get_next_available_socket
871 // Access: Private
872 // Description: Polls the known connections for activity and returns
873 // the next one known to have activity, or NULL if no
874 // activity is detected within the timeout interval.
875 //
876 // This function may block indefinitely if it is being
877 // called by multiple threads; if there are no other
878 // threads, it may block only if allow_block is true.
879 ////////////////////////////////////////////////////////////////////
880 ConnectionReader::SocketInfo *ConnectionReader::
881 get_next_available_socket(bool allow_block, int current_thread_index) {
882  // Go to sleep on the select() mutex. This guarantees that only one
883  // thread is in this function at a time.
884  MutexHolder holder(_select_mutex);
885 
886  do {
887  // First, check the result from the previous select call. If
888  // there are any sockets remaining there, process them first.
889  while (!_shutdown && _num_results > 0) {
890  nassertr(_next_index < (int)_selecting_sockets.size(), NULL);
891  int i = _next_index;
892  _next_index++;
893 
894  if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
895  _num_results--;
896  SocketInfo *sinfo = _selecting_sockets[i];
897 
898  // Some noise on this socket.
899  sinfo->_busy = true;
900  return sinfo;
901  }
902  }
903 
904  bool interrupted;
905  do {
906  interrupted = false;
907 
908  // Ok, no results from previous select calls. Prepare to set up
909  // for a new select.
910 
911  // First, report to anyone else who cares that we're the thread
912  // about to do the poll. That way, if any new sockets come
913  // available while we're polling, we can service them.
914  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
915 
916  rebuild_select_list();
917 
918  // Now we can execute the select.
919  _num_results = 0;
920  _next_index = 0;
921 
922  if (!_shutdown) {
923  PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0);
924  if (!allow_block) {
925  timeout = 0;
926  }
927 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
928  // In the presence of SIMPLE_THREADS, we never wait at all,
929  // but rather we yield the thread if we come up empty (so that
930  // we won't block the entire process).
931  timeout = 0;
932 #endif
933 
934  _num_results = _fdset.WaitForRead(false, timeout);
935  }
936 
937  if (_num_results == 0 && allow_block) {
938  // If we reached net_max_block, go back and reconsider. (We
939  // never timeout indefinitely, so we can check the shutdown
940  // flag every once in a while.)
941  interrupted = true;
943 
944  } else if (_num_results < 0) {
945  // If we had an error, just return. But yield the timeslice
946  // first.
948  return (SocketInfo *)NULL;
949  }
950  } while (!_shutdown && interrupted);
951 
952  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
953 
954  // Repeat the above until we (a) find a socket with actual noise
955  // on it, or (b) return from PR_Poll() with no sockets available.
956  } while (!_shutdown && _num_results > 0);
957 
958  return (SocketInfo *)NULL;
959 }
960 
961 
962 ////////////////////////////////////////////////////////////////////
963 // Function: ConnectionReader::rebuild_select_list
964 // Access: Private
965 // Description: Rebuilds the _fdset and _selecting_sockets arrays
966 // based on the sockets that are currently available for
967 // selecting.
968 ////////////////////////////////////////////////////////////////////
969 void ConnectionReader::
970 rebuild_select_list() {
971  _fdset.clear();
972  _selecting_sockets.clear();
973 
974  LightMutexHolder holder(_sockets_mutex);
975  Sockets::const_iterator si;
976  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
977  SocketInfo *sinfo = (*si);
978  if (!sinfo->_busy && !sinfo->_error) {
979  _fdset.setForSocket(*sinfo->get_socket());
980  _selecting_sockets.push_back(sinfo);
981  }
982  }
983 
984  // This is also a fine time to delete the contents of the
985  // _removed_sockets list.
986  if (!_removed_sockets.empty()) {
987  Sockets still_busy_sockets;
988  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
989  SocketInfo *sinfo = (*si);
990  if (sinfo->_busy) {
991  still_busy_sockets.push_back(sinfo);
992  } else {
993  delete sinfo;
994  }
995  }
996  _removed_sockets.swap(still_busy_sockets);
997  }
998 }
999 
1000 ////////////////////////////////////////////////////////////////////
1001 // Function: ConnectionReader::accumulate_fdset
1002 // Access: Private
1003 // Description: Adds the sockets from this ConnectionReader (or
1004 // ConnectionListener) to the indicated fdset. This is
1005 // used by ConnectionManager::block() to build an fdset
1006 // of all attached readers.
1007 ////////////////////////////////////////////////////////////////////
1008 void ConnectionReader::
1009 accumulate_fdset(Socket_fdset &fdset) {
1010  LightMutexHolder holder(_sockets_mutex);
1011  Sockets::const_iterator si;
1012  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
1013  SocketInfo *sinfo = (*si);
1014  if (!sinfo->_busy && !sinfo->_error) {
1015  fdset.setForSocket(*sinfo->get_socket());
1016  }
1017  }
1018 }
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:81
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:15
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:43
bool verify_datagram(const NetDatagram &datagram) const
Verifies that the indicated datagram has the appropriate length and checksum.
Base functionality for a INET domain Socket this call should be the starting point for all other unix...
Definition: socket_ip.h:34
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
Definition: datagram.cxx:144
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
std::string RecvData(int max_len)
Read the data from the connection.
Definition: socket_tcp.h:210
ConnectionReader(ConnectionManager *manager, int num_threads, const string &thread_name=string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
The primary interface to the low-level networking layer in this package.
void shutdown()
Terminates all threads cleanly.
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:29
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:263
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:257
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:105
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:248
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
Definition: thread.I:145
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
This is our own Panda specialization on the default STL vector.
Definition: pvector.h:39
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent...
static bool is_threading_supported()
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.I:185
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:28
Similar to MutexHolder, but for a light mutex.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
int get_datagram_size(int header_size) const
Returns the number of bytes in the associated datagram.
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
An interface to whatever real-time clock we might have available in the current environment.
Definition: trueClock.h:38
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
A simple place to store and munipulate tcp and port address for communication layer.
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
A thread; that is, a lightweight process.
Definition: thread.h:51
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
Definition: socket_fdset.h:89
void clear()
Marks the content as empty.
Definition: socket_fdset.h:138
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:32
bool verify_datagram(const NetDatagram &datagram, int header_size) const
Verifies that the indicated datagram has the appropriate length.
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:457
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound...
Definition: netAddress.h:27