Panda3D
connectionReader.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file connectionReader.cxx
10  * @author drose
11  * @date 2000-02-08
12  */
13 
14 #include "connectionReader.h"
15 #include "dcast.h"
16 #include "connectionManager.h"
17 #include "netDatagram.h"
18 #include "datagramTCPHeader.h"
19 #include "datagramUDPHeader.h"
20 #include "config_net.h"
21 #include "trueClock.h"
22 #include "socket_udp.h"
23 #include "socket_tcp.h"
24 #include "mutexHolder.h"
25 #include "lightMutexHolder.h"
26 #include "pnotify.h"
27 #include "atomicAdjust.h"
28 #include "config_downloader.h"
29 
30 using std::min;
31 
32 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
33 
34 /**
35  *
36  */
37 ConnectionReader::SocketInfo::
38 SocketInfo(const PT(Connection) &connection) :
39  _connection(connection)
40 {
41  _busy = false;
42  _error = false;
43 }
44 
45 /**
46  *
47  */
48 bool ConnectionReader::SocketInfo::
49 is_udp() const {
50  return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
51 }
52 
53 /**
54  *
55  */
56 Socket_IP *ConnectionReader::SocketInfo::
57 get_socket() const {
58  return _connection->get_socket();
59 }
60 
61 /**
62  *
63  */
64 ConnectionReader::ReaderThread::
65 ReaderThread(ConnectionReader *reader, const std::string &thread_name,
66  int thread_index) :
67  Thread(make_thread_name(thread_name, thread_index),
68  make_thread_name(thread_name, thread_index)),
69  _reader(reader),
70  _thread_index(thread_index)
71 {
72 }
73 
74 /**
75  *
76  */
77 void ConnectionReader::ReaderThread::
78 thread_main() {
79  _reader->thread_run(_thread_index);
80 }
81 
82 /**
83  * Creates a new ConnectionReader with the indicated number of threads to
84  * handle requests. If num_threads is 0, the sockets will only be read by
85  * polling, during an explicit poll() call. (QueuedConnectionReader will do
86  * this automatically.)
87  */
89 ConnectionReader(ConnectionManager *manager, int num_threads,
90  const std::string &thread_name) :
91  _manager(manager)
92 {
94 #ifndef NDEBUG
95  if (num_threads != 0) {
96  if (net_cat.is_debug()) {
97  net_cat.debug()
98  << "Threading support is not available.\n";
99  }
100  }
101 #endif // NDEBUG
102  num_threads = 0;
103  }
104 
105  _raw_mode = false;
106  _tcp_header_size = tcp_header_size;
107  _polling = (num_threads <= 0);
108 
109  _shutdown = false;
110 
111  _next_index = 0;
112  _num_results = 0;
113 
114  _currently_polling_thread = -1;
115 
116  std::string reader_thread_name = thread_name;
117  if (thread_name.empty()) {
118  reader_thread_name = "ReaderThread";
119  }
120  int i;
121  for (i = 0; i < num_threads; i++) {
122  PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
123  _threads.push_back(thread);
124  }
125  for (i = 0; i < num_threads; i++) {
126  _threads[i]->start(net_thread_priority, true);
127  }
128 
129  _manager->add_reader(this);
130 }
131 
132 /**
133  *
134  */
135 ConnectionReader::
136 ~ConnectionReader() {
137  if (_manager != nullptr) {
138  _manager->remove_reader(this);
139  }
140 
141  shutdown();
142 
143  // Delete all of our old sockets.
144  Sockets::iterator si;
145  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
146  delete (*si);
147  }
148  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
149  SocketInfo *sinfo = (*si);
150  if (!sinfo->_busy) {
151  delete sinfo;
152  } else {
153  net_cat.error()
154  << "Reentrant deletion of ConnectionReader--don't delete these\n"
155  << "in response to connection_reset().\n";
156 
157  // We'll have to do the best we can to recover.
158  sinfo->_connection.clear();
159  }
160  }
161 }
162 
163 /**
164  * Adds a new socket to the list of sockets the ConnectionReader will monitor.
165  * A datagram that comes in on any of the monitored sockets will be reported.
166  * In the case of a ConnectionListener, this adds a new rendezvous socket; any
167  * activity on any of the monitored sockets will cause a connection to be
168  * accepted.
169  *
170  * The return value is true if the connection was added, false if it was
171  * already there.
172  *
173  * add_connection() is thread-safe, and may be called at will by any thread.
174  */
177  nassertr(connection != nullptr, false);
178 
179  LightMutexHolder holder(_sockets_mutex);
180 
181  // Make sure it's not already on the _sockets list.
182  Sockets::const_iterator si;
183  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
184  if ((*si)->_connection == connection) {
185  // Whoops, already there.
186  return false;
187  }
188  }
189 
190  _sockets.push_back(new SocketInfo(connection));
191 
192  return true;
193 }
194 
195 /**
196  * Removes a socket from the list of sockets being monitored. Returns true if
197  * the socket was correctly removed, false if it was not on the list in the
198  * first place.
199  *
200  * remove_connection() is thread-safe, and may be called at will by any
201  * thread.
202  */
205  LightMutexHolder holder(_sockets_mutex);
206 
207  // Walk through the list of sockets to find the one we're removing.
208  Sockets::iterator si;
209  si = _sockets.begin();
210  while (si != _sockets.end() && (*si)->_connection != connection) {
211  ++si;
212  }
213  if (si == _sockets.end()) {
214  return false;
215  }
216 
217  _removed_sockets.push_back(*si);
218  _sockets.erase(si);
219 
220  return true;
221 }
222 
223 /**
224  * Returns true if the indicated connection has been added to the
225  * ConnectionReader and is being monitored properly, false if it is not known,
226  * or if there was some error condition detected on the connection. (If there
227  * was an error condition, normally the ConnectionManager would have been
228  * informed and closed the connection.)
229  */
232  LightMutexHolder holder(_sockets_mutex);
233 
234  // Walk through the list of sockets to find the one we're asking about.
235  Sockets::iterator si;
236  si = _sockets.begin();
237  while (si != _sockets.end() && (*si)->_connection != connection) {
238  ++si;
239  }
240  if (si == _sockets.end()) {
241  // Don't know that connection.
242  return false;
243  }
244 
245  SocketInfo *sinfo = (*si);
246  bool is_ok = !sinfo->_error;
247 
248  return is_ok;
249 }
250 
251 /**
252  * Explicitly polls the available sockets to see if any of them have any
253  * noise. This function does nothing unless this is a polling-type
254  * ConnectionReader, i.e. it was created with zero threads (and is_polling()
255  * will return true).
256  *
257  * It is not necessary to call this explicitly for a QueuedConnectionReader.
258  */
260 poll() {
261  if (!_polling) {
262  return;
263  }
264 
265  SocketInfo *sinfo = get_next_available_socket(false, -2);
266  if (sinfo != nullptr) {
267  double max_poll_cycle = get_net_max_poll_cycle();
268  if (max_poll_cycle < 0.0) {
269  // Continue to read all data.
270  while (sinfo != nullptr) {
271  process_incoming_data(sinfo);
272  sinfo = get_next_available_socket(false, -2);
273  }
274 
275  } else {
276  // Read only until a certain amount of time has elapsed.
277  TrueClock *global_clock = TrueClock::get_global_ptr();
278  double stop = global_clock->get_short_time() + max_poll_cycle;
279 
280  while (sinfo != nullptr) {
281  process_incoming_data(sinfo);
282  if (global_clock->get_short_time() >= stop) {
283  return;
284  }
285  sinfo = get_next_available_socket(false, -2);
286  }
287  }
288  }
289 }
290 
291 /**
292  * Returns a pointer to the ConnectionManager object that serves this
293  * ConnectionReader.
294  */
296 get_manager() const {
297  return _manager;
298 }
299 
300 /**
301  * Returns the number of threads the ConnectionReader has been created with.
302  */
305  return _threads.size();
306 }
307 
308 /**
309  * Sets the ConnectionReader into raw mode (or turns off raw mode). In raw
310  * mode, datagram headers are not expected; instead, all the data available on
311  * the pipe is treated as a single datagram.
312  *
313  * This is similar to set_tcp_header_size(0), except that it also turns off
314  * headers for UDP packets.
315  */
317 set_raw_mode(bool mode) {
318  _raw_mode = mode;
319 }
320 
321 /**
322  * Returns the current setting of the raw mode flag. See set_raw_mode().
323  */
325 get_raw_mode() const {
326  return _raw_mode;
327 }
328 
329 /**
330  * Sets the header size of TCP packets. At the present, legal values for this
331  * are 0, 2, or 4; this specifies the number of bytes to use encode the
332  * datagram length at the start of each TCP datagram. Sender and receiver
333  * must independently agree on this.
334  */
336 set_tcp_header_size(int tcp_header_size) {
337  _tcp_header_size = tcp_header_size;
338 }
339 
340 /**
341  * Returns the current setting of TCP header size. See set_tcp_header_size().
342  */
345  return _tcp_header_size;
346 }
347 
348 /**
349  * Terminates all threads cleanly. Normally this is only called by the
350  * destructor, but it may be called explicitly before destruction.
351  */
354  if (_shutdown) {
355  return;
356  }
357 
358  // First, begin the shutdown. This will tell our threads we want them to
359  // quit.
360  _shutdown = true;
361 
362  // Now wait for all of our threads to terminate.
363  Threads::iterator ti;
364  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
365  (*ti)->join();
366  }
367 }
368 
369 /**
370  * Attempts to read all the possible data from the indicated connection, which
371  * has just delivered a write error (and has therefore already been closed).
372  * If the connection is not monitered by this reader, does nothing.
373  */
374 void ConnectionReader::
375 flush_read_connection(Connection *connection) {
376  // Ensure it doesn't get deleted.
377  SocketInfo sinfo(connection);
378 
379  if (!remove_connection(connection)) {
380  // Not already in the reader.
381  return;
382  }
383 
384  // The connection was previously in the reader, but has now been removed.
385  // Now we can flush it completely. We check if there is any read data
386  // available on just this one socket; we can do this right here in this
387  // thread, since we've already removed this connection from the reader.
388 
389  Socket_fdset fdset;
390  fdset.clear();
391  fdset.setForSocket(*(sinfo.get_socket()));
392  int num_results = fdset.WaitForRead(true, 0);
393  while (num_results != 0) {
394  sinfo._busy = true;
395  if (!process_incoming_data(&sinfo)) {
396  break;
397  }
398  fdset.setForSocket(*(sinfo.get_socket()));
399  num_results = fdset.WaitForRead(true, 0);
400  }
401 }
402 
403 /**
404  * This should normally only be called when the associated ConnectionManager
405  * destructs. It resets the ConnectionManager pointer to NULL so we don't
406  * have a floating pointer. This makes the ConnectionReader invalid;
407  * presumably it also will be destructed momentarily.
408  */
409 void ConnectionReader::
410 clear_manager() {
411  _manager = nullptr;
412 }
413 
414 /**
415  * To be called when a socket has been fully read and is ready for polling for
416  * additional data.
417  */
418 void ConnectionReader::
419 finish_socket(SocketInfo *sinfo) {
420  nassertv(sinfo->_busy);
421 
422  // By marking the SocketInfo nonbusy, we make it available for future polls.
423  sinfo->_busy = false;
424 }
425 
426 /**
427  * This is run within a thread when the call to select() indicates there is
428  * data available on a socket. Returns true if the data is read successfully,
429  * false on failure (for instance, because the connection is closed).
430  */
431 bool ConnectionReader::
432 process_incoming_data(SocketInfo *sinfo) {
433  if (_raw_mode) {
434  if (sinfo->is_udp()) {
435  return process_raw_incoming_udp_data(sinfo);
436  } else {
437  return process_raw_incoming_tcp_data(sinfo);
438  }
439  } else {
440  if (sinfo->is_udp()) {
441  return process_incoming_udp_data(sinfo);
442  } else {
443  return process_incoming_tcp_data(sinfo);
444  }
445  }
446 }
447 
448 /**
449  *
450  */
451 bool ConnectionReader::
452 process_incoming_udp_data(SocketInfo *sinfo) {
453  Socket_UDP *socket;
454  DCAST_INTO_R(socket, sinfo->get_socket(), false);
455  Socket_Address addr;
456 
457  // Read as many bytes as we can.
458  char buffer[read_buffer_size];
459  int bytes_read = read_buffer_size;
460 
461  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
462 
463  if (!okflag) {
464  finish_socket(sinfo);
465  return false;
466 
467  } else if (bytes_read == 0) {
468  // The socket was closed (!). This shouldn't happen with a UDP
469  // connection. Oh well. Report that and return.
470  if (_manager != nullptr) {
471  _manager->connection_reset(sinfo->_connection, 0);
472  }
473  finish_socket(sinfo);
474  return false;
475  }
476 
477  // Since we are not running in raw mode, we decode the header to determine
478  // how big the datagram is. This means we must have read at least a full
479  // header.
480  if (bytes_read < datagram_udp_header_size) {
481  net_cat.error()
482  << "Did not read entire header, discarding UDP datagram.\n";
483  finish_socket(sinfo);
484  return true;
485  }
486 
487  DatagramUDPHeader header(buffer);
488 
489  char *dp = buffer + datagram_udp_header_size;
490  bytes_read -= datagram_udp_header_size;
491 
492  NetDatagram datagram(dp, bytes_read);
493 
494  // Now that we've read all the data, it's time to finish the socket so
495  // another thread can read the next datagram.
496  finish_socket(sinfo);
497 
498  if (_shutdown) {
499  return false;
500  }
501 
502  // And now do whatever we need to do to process the datagram.
503  if (!header.verify_datagram(datagram)) {
504  net_cat.error()
505  << "Ignoring invalid UDP datagram.\n";
506  } else {
507  datagram.set_connection(sinfo->_connection);
508  datagram.set_address(NetAddress(addr));
509 
510  if (net_cat.is_spam()) {
511  net_cat.spam()
512  << "Received UDP datagram with "
513  << datagram_udp_header_size + datagram.get_length()
514  << " bytes on " << (void *)datagram.get_connection()
515  << " from " << datagram.get_address() << "\n";
516  }
517 
518  receive_datagram(datagram);
519  }
520 
521  return true;
522 }
523 
524 /**
525  *
526  */
527 bool ConnectionReader::
528 process_incoming_tcp_data(SocketInfo *sinfo) {
529  Socket_TCP *socket;
530  DCAST_INTO_R(socket, sinfo->get_socket(), false);
531 
532  // Read only the header bytes to start with.
533  char buffer[read_buffer_size];
534  int header_bytes_read = 0;
535 
536  // First, we have to read the first _tcp_header_size bytes.
537  while (header_bytes_read < _tcp_header_size) {
538  int bytes_read =
539  socket->RecvData(buffer + header_bytes_read,
540  _tcp_header_size - header_bytes_read);
541 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
542  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
543  socket->Active()) {
545  bytes_read = socket->RecvData(buffer + header_bytes_read,
546  _tcp_header_size - header_bytes_read);
547  }
548 #endif // SIMPLE_THREADS
549 
550  if (bytes_read <= 0) {
551  // The socket was closed. Report that and return.
552  if (_manager != nullptr) {
553  _manager->connection_reset(sinfo->_connection, 0);
554  }
555  finish_socket(sinfo);
556  return false;
557  }
558 
559  header_bytes_read += bytes_read;
561  }
562 
563  // Now we must decode the header to determine how big the datagram is. This
564  // means we must have read at least a full header.
565  if (header_bytes_read != _tcp_header_size) {
566  // This should actually be impossible, by the read-loop logic above.
567  net_cat.error()
568  << "Did not read entire header, discarding TCP datagram.\n";
569  finish_socket(sinfo);
570  return true;
571  }
572 
573  DatagramTCPHeader header(buffer, _tcp_header_size);
574  int size = header.get_datagram_size(_tcp_header_size);
575 
576  // We have to loop until the entire datagram is read.
577  NetDatagram datagram;
578 
579  while (!_shutdown && (int)datagram.get_length() < size) {
580  int bytes_read;
581 
582  int read_bytes = read_buffer_size;
583 #ifdef SIMPLE_THREADS
584  // In the SIMPLE_THREADS case, we want to limit the number of bytes we
585  // read in a single epoch, to minimize the impact on the other threads.
586  read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
587 #endif
588 
589  bytes_read =
590  socket->RecvData(buffer, min(read_bytes,
591  (int)(size - datagram.get_length())));
592 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
593  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
594  socket->Active()) {
596  bytes_read =
597  socket->RecvData(buffer, min(read_bytes,
598  (int)(size - datagram.get_length())));
599  }
600 #endif // SIMPLE_THREADS
601 
602  char *dp = buffer;
603 
604  if (bytes_read <= 0) {
605  // The socket was closed. Report that and return.
606  if (_manager != nullptr) {
607  _manager->connection_reset(sinfo->_connection, 0);
608  }
609  finish_socket(sinfo);
610  return false;
611  }
612 
613  int datagram_bytes =
614  min(bytes_read, (int)(size - datagram.get_length()));
615  datagram.append_data(dp, datagram_bytes);
616 
617  if (bytes_read > datagram_bytes) {
618  // There were some extra bytes at the end of the datagram. Maybe the
619  // beginning of the next datagram? Huh.
620  net_cat.error()
621  << "Discarding " << bytes_read - datagram_bytes
622  << " bytes following TCP datagram.\n";
623  }
625  }
626 
627  // Now that we've read all the data, it's time to finish the socket so
628  // another thread can read the next datagram.
629  finish_socket(sinfo);
630 
631  if (_shutdown) {
632  return false;
633  }
634 
635  // And now do whatever we need to do to process the datagram.
636  if (!header.verify_datagram(datagram, _tcp_header_size)) {
637  net_cat.error()
638  << "Ignoring invalid TCP datagram.\n";
639  } else {
640  datagram.set_connection(sinfo->_connection);
641  datagram.set_address(NetAddress(socket->GetPeerName()));
642 
643  if (net_cat.is_spam()) {
644  net_cat.spam()
645  << "Received TCP datagram with "
646  << _tcp_header_size + datagram.get_length()
647  << " bytes on " << (void *)datagram.get_connection()
648  << " from " << datagram.get_address() << "\n";
649  }
650 
651  receive_datagram(datagram);
652  }
653 
654  return true;
655 }
656 
657 /**
658  *
659  */
660 bool ConnectionReader::
661 process_raw_incoming_udp_data(SocketInfo *sinfo) {
662  Socket_UDP *socket;
663  DCAST_INTO_R(socket, sinfo->get_socket(), false);
664  Socket_Address addr;
665 
666  // Read as many bytes as we can.
667  char buffer[read_buffer_size];
668  int bytes_read = read_buffer_size;
669 
670  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
671 
672  if (!okflag) {
673  finish_socket(sinfo);
674  return false;
675 
676  } else if (bytes_read == 0) {
677  // The socket was closed (!). This shouldn't happen with a UDP
678  // connection. Oh well. Report that and return.
679  if (_manager != nullptr) {
680  _manager->connection_reset(sinfo->_connection, 0);
681  }
682  finish_socket(sinfo);
683  return false;
684  }
685 
686  // In raw mode, we simply extract all the bytes and make that a datagram.
687  NetDatagram datagram(buffer, bytes_read);
688 
689  // Now that we've read all the data, it's time to finish the socket so
690  // another thread can read the next datagram.
691  finish_socket(sinfo);
692 
693  if (_shutdown) {
694  return false;
695  }
696 
697  datagram.set_connection(sinfo->_connection);
698  datagram.set_address(NetAddress(addr));
699 
700  if (net_cat.is_spam()) {
701  net_cat.spam()
702  << "Received raw UDP datagram with " << datagram.get_length()
703  << " bytes on " << (void *)datagram.get_connection()
704  << " from " << datagram.get_address() << "\n";
705  }
706 
707  receive_datagram(datagram);
708 
709  return true;
710 }
711 
712 /**
713  *
714  */
715 bool ConnectionReader::
716 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
717  Socket_TCP *socket;
718  DCAST_INTO_R(socket, sinfo->get_socket(), false);
719 
720  // Read as many bytes as we can.
721  char buffer[read_buffer_size];
722  int bytes_read = socket->RecvData(buffer, read_buffer_size);
723 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
724  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
725  socket->Active()) {
727  bytes_read = socket->RecvData(buffer, read_buffer_size);
728  }
729 #endif // SIMPLE_THREADS
730 
731  if (bytes_read <= 0) {
732  // The socket was closed. Report that and return.
733  if (_manager != nullptr) {
734  _manager->connection_reset(sinfo->_connection, 0);
735  }
736  finish_socket(sinfo);
737  return false;
738  }
739 
740  // In raw mode, we simply extract all the bytes and make that a datagram.
741  NetDatagram datagram(buffer, bytes_read);
742 
743  // Now that we've read all the data, it's time to finish the socket so
744  // another thread can read the next datagram.
745  finish_socket(sinfo);
746 
747  if (_shutdown) {
748  return false;
749  }
750 
751  datagram.set_connection(sinfo->_connection);
752  datagram.set_address(NetAddress(socket->GetPeerName()));
753 
754  if (net_cat.is_spam()) {
755  net_cat.spam()
756  << "Received raw TCP datagram with " << datagram.get_length()
757  << " bytes on " << (void *)datagram.get_connection()
758  << " from " << datagram.get_address() << "\n";
759  }
760 
761  receive_datagram(datagram);
762 
763  return true;
764 }
765 
766 /**
767  * This is the actual executing function for each thread.
768  */
769 void ConnectionReader::
770 thread_run(int thread_index) {
771  nassertv(!_polling);
772  nassertv(_threads[thread_index] == Thread::get_current_thread());
773 
774  while (!_shutdown) {
775  SocketInfo *sinfo =
776  get_next_available_socket(true, thread_index);
777  if (sinfo != nullptr) {
778  process_incoming_data(sinfo);
780  } else {
782  }
783  }
784 }
785 
786 
787 /**
788  * Polls the known connections for activity and returns the next one known to
789  * have activity, or NULL if no activity is detected within the timeout
790  * interval.
791  *
792  * This function may block indefinitely if it is being called by multiple
793  * threads; if there are no other threads, it may block only if allow_block is
794  * true.
795  */
796 ConnectionReader::SocketInfo *ConnectionReader::
797 get_next_available_socket(bool allow_block, int current_thread_index) {
798  // Go to sleep on the select() mutex. This guarantees that only one thread
799  // is in this function at a time.
800  MutexHolder holder(_select_mutex);
801 
802  do {
803  // First, check the result from the previous select call. If there are
804  // any sockets remaining there, process them first.
805  while (!_shutdown && _num_results > 0) {
806  nassertr(_next_index < (int)_selecting_sockets.size(), nullptr);
807  int i = _next_index;
808  _next_index++;
809 
810  if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
811  _num_results--;
812  SocketInfo *sinfo = _selecting_sockets[i];
813 
814  // Some noise on this socket.
815  sinfo->_busy = true;
816  return sinfo;
817  }
818  }
819 
820  bool interrupted;
821  do {
822  interrupted = false;
823 
824  // Ok, no results from previous select calls. Prepare to set up for a
825  // new select.
826 
827  // First, report to anyone else who cares that we're the thread about to
828  // do the poll. That way, if any new sockets come available while we're
829  // polling, we can service them.
830  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
831 
832  rebuild_select_list();
833 
834  // Now we can execute the select.
835  _num_results = 0;
836  _next_index = 0;
837 
838  if (!_shutdown) {
839  uint32_t timeout = (uint32_t)(get_net_max_block() * 1000.0);
840  if (!allow_block) {
841  timeout = 0;
842  }
843 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
844  // In the presence of SIMPLE_THREADS, we never wait at all, but rather
845  // we yield the thread if we come up empty (so that we won't block the
846  // entire process).
847  timeout = 0;
848 #endif
849 
850  _num_results = _fdset.WaitForRead(false, timeout);
851  }
852 
853  if (_num_results == 0 && allow_block) {
854  // If we reached net_max_block, go back and reconsider. (We never
855  // timeout indefinitely, so we can check the shutdown flag every once
856  // in a while.)
857  interrupted = true;
859 
860  } else if (_num_results < 0) {
861  // If we had an error, just return. But yield the timeslice first.
863  return nullptr;
864  }
865  } while (!_shutdown && interrupted);
866 
867  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
868 
869  // Repeat the above until we (a) find a socket with actual noise on it, or
870  // (b) return from PR_Poll() with no sockets available.
871  } while (!_shutdown && _num_results > 0);
872 
873  return nullptr;
874 }
875 
876 
877 /**
878  * Rebuilds the _fdset and _selecting_sockets arrays based on the sockets that
879  * are currently available for selecting.
880  */
881 void ConnectionReader::
882 rebuild_select_list() {
883  _fdset.clear();
884  _selecting_sockets.clear();
885 
886  LightMutexHolder holder(_sockets_mutex);
887  Sockets::const_iterator si;
888  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
889  SocketInfo *sinfo = (*si);
890  if (!sinfo->_busy && !sinfo->_error) {
891  _fdset.setForSocket(*sinfo->get_socket());
892  _selecting_sockets.push_back(sinfo);
893  }
894  }
895 
896  // This is also a fine time to delete the contents of the _removed_sockets
897  // list.
898  if (!_removed_sockets.empty()) {
899  Sockets still_busy_sockets;
900  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
901  SocketInfo *sinfo = (*si);
902  if (sinfo->_busy) {
903  still_busy_sockets.push_back(sinfo);
904  } else {
905  delete sinfo;
906  }
907  }
908  _removed_sockets.swap(still_busy_sockets);
909  }
910 }
911 
912 /**
913  * Adds the sockets from this ConnectionReader (or ConnectionListener) to the
914  * indicated fdset. This is used by ConnectionManager::block() to build an
915  * fdset of all attached readers.
916  */
917 void ConnectionReader::
918 accumulate_fdset(Socket_fdset &fdset) {
919  LightMutexHolder holder(_sockets_mutex);
920  Sockets::const_iterator si;
921  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
922  SocketInfo *sinfo = (*si);
923  if (!sinfo->_busy && !sinfo->_error) {
924  fdset.setForSocket(*sinfo->get_socket());
925  }
926  }
927 }
MutexHolder
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
Socket_IP
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
Definition: socket_ip.h:27
LightMutexHolder
Similar to MutexHolder, but for a light mutex.
Definition: lightMutexHolder.h:25
ConnectionReader::shutdown
void shutdown()
Terminates all threads cleanly.
Definition: connectionReader.cxx:353
netDatagram.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
datagramTCPHeader.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ConnectionReader::is_connection_ok
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
Definition: connectionReader.cxx:231
Socket_fdset::IsSetFor
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
Definition: socket_fdset.h:81
Socket_TCP
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:12
Socket_UDP_Incoming::GetPacket
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
Definition: socket_udp_incoming.h:166
trueClock.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Socket_IP::GetPeerName
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:253
dcast.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Thread::consider_yield
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
ConnectionReader::get_raw_mode
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
Definition: connectionReader.cxx:325
NetDatagram::get_address
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
Definition: netDatagram.cxx:114
ConnectionReader::get_tcp_header_size
int get_tcp_header_size() const
Returns the current setting of TCP header size.
Definition: connectionReader.cxx:344
Connection
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:29
ConnectionReader::remove_connection
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
Definition: connectionReader.cxx:204
DatagramUDPHeader
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
Definition: datagramUDPHeader.h:34
ConnectionReader::add_connection
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
Definition: connectionReader.cxx:176
pnotify.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ConnectionReader::get_manager
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
Definition: connectionReader.cxx:296
ConnectionReader::get_num_threads
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
Definition: connectionReader.cxx:304
Thread::get_current_thread
get_current_thread
Returns a pointer to the currently-executing Thread object.
Definition: thread.h:109
Socket_IP::Active
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:96
socket_udp.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
config_downloader.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
lightMutexHolder.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Socket_fdset
Definition: socket_fdset.h:16
NetDatagram::set_connection
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
Definition: netDatagram.cxx:88
NetAddress
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition: netAddress.h:25
ConnectionReader::ConnectionReader
ConnectionReader(ConnectionManager *manager, int num_threads, const std::string &thread_name=std::string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
Definition: connectionReader.cxx:89
Socket_UDP
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:24
Thread::is_threading_supported
is_threading_supported
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.h:112
config_net.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Datagram::get_length
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335
AtomicAdjustDummyImpl::set
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
Definition: atomicAdjustDummyImpl.I:46
atomicAdjust.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ConnectionManager
The primary interface to the low-level networking layer in this package.
Definition: connectionManager.h:41
Socket_fdset::clear
void clear()
Marks the content as empty.
Definition: socket_fdset.h:124
mutexHolder.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ConnectionReader::set_tcp_header_size
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
Definition: connectionReader.cxx:336
TrueClock::get_global_ptr
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:68
ConnectionReader::poll
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
Definition: connectionReader.cxx:260
TrueClock
An interface to whatever real-time clock we might have available in the current environment.
Definition: trueClock.h:33
NetDatagram
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
Socket_Address
A simple place to store and manipulate tcp and port address for communication layer.
Definition: socket_address.h:25
connectionManager.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
connectionReader.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ConnectionReader::set_raw_mode
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
Definition: connectionReader.cxx:317
datagramUDPHeader.h
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Thread
A thread; that is, a lightweight process.
Definition: thread.h:46
DatagramTCPHeader
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
Definition: datagramTCPHeader.h:35
Datagram::append_data
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
Definition: datagram.cxx:129
Thread::force_yield
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
ConnectionReader
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
Definition: connectionReader.h:50
Socket_TCP::RecvData
std::string RecvData(int max_len)
Read the data from the connection - if error 0 if socket closed for read or length is 0 + bytes read ...
Definition: socket_tcp.h:185