Panda3D
connectionReader.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file connectionReader.cxx
10  * @author drose
11  * @date 2000-02-08
12  */
13 
14 #include "connectionReader.h"
15 #include "dcast.h"
16 #include "connectionManager.h"
17 #include "netDatagram.h"
18 #include "datagramTCPHeader.h"
19 #include "datagramUDPHeader.h"
20 #include "config_net.h"
21 #include "trueClock.h"
22 #include "socket_udp.h"
23 #include "socket_tcp.h"
24 #include "mutexHolder.h"
25 #include "lightMutexHolder.h"
26 #include "pnotify.h"
27 #include "atomicAdjust.h"
28 #include "config_downloader.h"
29 
30 using std::min;
31 
32 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
33 
34 /**
35  *
36  */
37 ConnectionReader::SocketInfo::
38 SocketInfo(const PT(Connection) &connection) :
39  _connection(connection)
40 {
41  _busy = false;
42  _error = false;
43 }
44 
45 /**
46  *
47  */
48 bool ConnectionReader::SocketInfo::
49 is_udp() const {
50  return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
51 }
52 
53 /**
54  *
55  */
56 Socket_IP *ConnectionReader::SocketInfo::
57 get_socket() const {
58  return _connection->get_socket();
59 }
60 
61 /**
62  *
63  */
64 ConnectionReader::ReaderThread::
65 ReaderThread(ConnectionReader *reader, const std::string &thread_name,
66  int thread_index) :
67  Thread(make_thread_name(thread_name, thread_index),
68  make_thread_name(thread_name, thread_index)),
69  _reader(reader),
70  _thread_index(thread_index)
71 {
72 }
73 
74 /**
75  *
76  */
77 void ConnectionReader::ReaderThread::
78 thread_main() {
79  _reader->thread_run(_thread_index);
80 }
81 
82 /**
83  * Creates a new ConnectionReader with the indicated number of threads to
84  * handle requests. If num_threads is 0, the sockets will only be read by
85  * polling, during an explicit poll() call. (QueuedConnectionReader will do
86  * this automatically.)
87  */
89 ConnectionReader(ConnectionManager *manager, int num_threads,
90  const std::string &thread_name) :
91  _manager(manager)
92 {
93  if (!Thread::is_threading_supported()) {
94 #ifndef NDEBUG
95  if (num_threads != 0) {
96  if (net_cat.is_debug()) {
97  net_cat.debug()
98  << "Threading support is not available.\n";
99  }
100  }
101 #endif // NDEBUG
102  num_threads = 0;
103  }
104 
105  _raw_mode = false;
106  _tcp_header_size = tcp_header_size;
107  _polling = (num_threads <= 0);
108 
109  _shutdown = false;
110 
111  _next_index = 0;
112  _num_results = 0;
113 
114  _currently_polling_thread = -1;
115 
116  std::string reader_thread_name = thread_name;
117  if (thread_name.empty()) {
118  reader_thread_name = "ReaderThread";
119  }
120  int i;
121  for (i = 0; i < num_threads; i++) {
122  PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
123  _threads.push_back(thread);
124  }
125  for (i = 0; i < num_threads; i++) {
126  _threads[i]->start(net_thread_priority, true);
127  }
128 
129  _manager->add_reader(this);
130 }
131 
132 /**
133  *
134  */
135 ConnectionReader::
136 ~ConnectionReader() {
137  if (_manager != nullptr) {
138  _manager->remove_reader(this);
139  }
140 
141  shutdown();
142 
143  // Delete all of our old sockets.
144  Sockets::iterator si;
145  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
146  delete (*si);
147  }
148  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
149  SocketInfo *sinfo = (*si);
150  if (!sinfo->_busy) {
151  delete sinfo;
152  } else {
153  net_cat.error()
154  << "Reentrant deletion of ConnectionReader--don't delete these\n"
155  << "in response to connection_reset().\n";
156 
157  // We'll have to do the best we can to recover.
158  sinfo->_connection.clear();
159  }
160  }
161 }
162 
163 /**
164  * Adds a new socket to the list of sockets the ConnectionReader will monitor.
165  * A datagram that comes in on any of the monitored sockets will be reported.
166  * In the case of a ConnectionListener, this adds a new rendezvous socket; any
167  * activity on any of the monitored sockets will cause a connection to be
168  * accepted.
169  *
170  * The return value is true if the connection was added, false if it was
171  * already there.
172  *
173  * add_connection() is thread-safe, and may be called at will by any thread.
174  */
177  nassertr(connection != nullptr, false);
178 
179  LightMutexHolder holder(_sockets_mutex);
180 
181  // Make sure it's not already on the _sockets list.
182  Sockets::const_iterator si;
183  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
184  if ((*si)->_connection == connection) {
185  // Whoops, already there.
186  return false;
187  }
188  }
189 
190  _sockets.push_back(new SocketInfo(connection));
191 
192  return true;
193 }
194 
195 /**
196  * Removes a socket from the list of sockets being monitored. Returns true if
197  * the socket was correctly removed, false if it was not on the list in the
198  * first place.
199  *
200  * remove_connection() is thread-safe, and may be called at will by any
201  * thread.
202  */
205  LightMutexHolder holder(_sockets_mutex);
206 
207  // Walk through the list of sockets to find the one we're removing.
208  Sockets::iterator si;
209  si = _sockets.begin();
210  while (si != _sockets.end() && (*si)->_connection != connection) {
211  ++si;
212  }
213  if (si == _sockets.end()) {
214  return false;
215  }
216 
217  _removed_sockets.push_back(*si);
218  _sockets.erase(si);
219 
220  return true;
221 }
222 
223 /**
224  * Returns true if the indicated connection has been added to the
225  * ConnectionReader and is being monitored properly, false if it is not known,
226  * or if there was some error condition detected on the connection. (If there
227  * was an error condition, normally the ConnectionManager would have been
228  * informed and closed the connection.)
229  */
232  LightMutexHolder holder(_sockets_mutex);
233 
234  // Walk through the list of sockets to find the one we're asking about.
235  Sockets::iterator si;
236  si = _sockets.begin();
237  while (si != _sockets.end() && (*si)->_connection != connection) {
238  ++si;
239  }
240  if (si == _sockets.end()) {
241  // Don't know that connection.
242  return false;
243  }
244 
245  SocketInfo *sinfo = (*si);
246  bool is_ok = !sinfo->_error;
247 
248  return is_ok;
249 }
250 
251 /**
252  * Explicitly polls the available sockets to see if any of them have any
253  * noise. This function does nothing unless this is a polling-type
254  * ConnectionReader, i.e. it was created with zero threads (and is_polling()
255  * will return true).
256  *
257  * It is not necessary to call this explicitly for a QueuedConnectionReader.
258  */
260 poll() {
261  if (!_polling) {
262  return;
263  }
264 
265  SocketInfo *sinfo = get_next_available_socket(false, -2);
266  if (sinfo != nullptr) {
267  double max_poll_cycle = get_net_max_poll_cycle();
268  if (max_poll_cycle < 0.0) {
269  // Continue to read all data.
270  while (sinfo != nullptr) {
271  process_incoming_data(sinfo);
272  sinfo = get_next_available_socket(false, -2);
273  }
274 
275  } else {
276  // Read only until a certain amount of time has elapsed.
277  TrueClock *global_clock = TrueClock::get_global_ptr();
278  double stop = global_clock->get_short_time() + max_poll_cycle;
279 
280  while (sinfo != nullptr) {
281  process_incoming_data(sinfo);
282  if (global_clock->get_short_time() >= stop) {
283  return;
284  }
285  sinfo = get_next_available_socket(false, -2);
286  }
287  }
288  }
289 }
290 
291 /**
292  * Returns a pointer to the ConnectionManager object that serves this
293  * ConnectionReader.
294  */
296 get_manager() const {
297  return _manager;
298 }
299 
300 /**
301  * Returns the number of threads the ConnectionReader has been created with.
302  */
305  return _threads.size();
306 }
307 
308 /**
309  * Sets the ConnectionReader into raw mode (or turns off raw mode). In raw
310  * mode, datagram headers are not expected; instead, all the data available on
311  * the pipe is treated as a single datagram.
312  *
313  * This is similar to set_tcp_header_size(0), except that it also turns off
314  * headers for UDP packets.
315  */
317 set_raw_mode(bool mode) {
318  _raw_mode = mode;
319 }
320 
321 /**
322  * Returns the current setting of the raw mode flag. See set_raw_mode().
323  */
325 get_raw_mode() const {
326  return _raw_mode;
327 }
328 
329 /**
330  * Sets the header size of TCP packets. At the present, legal values for this
331  * are 0, 2, or 4; this specifies the number of bytes to use encode the
332  * datagram length at the start of each TCP datagram. Sender and receiver
333  * must independently agree on this.
334  */
336 set_tcp_header_size(int tcp_header_size) {
337  _tcp_header_size = tcp_header_size;
338 }
339 
340 /**
341  * Returns the current setting of TCP header size. See set_tcp_header_size().
342  */
345  return _tcp_header_size;
346 }
347 
348 /**
349  * Terminates all threads cleanly. Normally this is only called by the
350  * destructor, but it may be called explicitly before destruction.
351  */
354  if (_shutdown) {
355  return;
356  }
357 
358  // First, begin the shutdown. This will tell our threads we want them to
359  // quit.
360  _shutdown = true;
361 
362  // Now wait for all of our threads to terminate.
363  Threads::iterator ti;
364  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
365  (*ti)->join();
366  }
367 }
368 
369 /**
370  * Attempts to read all the possible data from the indicated connection, which
371  * has just delivered a write error (and has therefore already been closed).
372  * If the connection is not monitered by this reader, does nothing.
373  */
374 void ConnectionReader::
375 flush_read_connection(Connection *connection) {
376  // Ensure it doesn't get deleted.
377  SocketInfo sinfo(connection);
378 
379  if (!remove_connection(connection)) {
380  // Not already in the reader.
381  return;
382  }
383 
384  // The connection was previously in the reader, but has now been removed.
385  // Now we can flush it completely. We check if there is any read data
386  // available on just this one socket; we can do this right here in this
387  // thread, since we've already removed this connection from the reader.
388 
389  Socket_fdset fdset;
390  fdset.clear();
391  fdset.setForSocket(*(sinfo.get_socket()));
392  int num_results = fdset.WaitForRead(true, 0);
393  while (num_results != 0) {
394  sinfo._busy = true;
395  if (!process_incoming_data(&sinfo)) {
396  break;
397  }
398  fdset.setForSocket(*(sinfo.get_socket()));
399  num_results = fdset.WaitForRead(true, 0);
400  }
401 }
402 
403 /**
404  * This should normally only be called when the associated ConnectionManager
405  * destructs. It resets the ConnectionManager pointer to NULL so we don't
406  * have a floating pointer. This makes the ConnectionReader invalid;
407  * presumably it also will be destructed momentarily.
408  */
409 void ConnectionReader::
410 clear_manager() {
411  _manager = nullptr;
412 }
413 
414 /**
415  * To be called when a socket has been fully read and is ready for polling for
416  * additional data.
417  */
418 void ConnectionReader::
419 finish_socket(SocketInfo *sinfo) {
420  nassertv(sinfo->_busy);
421 
422  // By marking the SocketInfo nonbusy, we make it available for future polls.
423  sinfo->_busy = false;
424 }
425 
426 /**
427  * This is run within a thread when the call to select() indicates there is
428  * data available on a socket. Returns true if the data is read successfully,
429  * false on failure (for instance, because the connection is closed).
430  */
431 bool ConnectionReader::
432 process_incoming_data(SocketInfo *sinfo) {
433  if (_raw_mode) {
434  if (sinfo->is_udp()) {
435  return process_raw_incoming_udp_data(sinfo);
436  } else {
437  return process_raw_incoming_tcp_data(sinfo);
438  }
439  } else {
440  if (sinfo->is_udp()) {
441  return process_incoming_udp_data(sinfo);
442  } else {
443  return process_incoming_tcp_data(sinfo);
444  }
445  }
446 }
447 
448 /**
449  *
450  */
451 bool ConnectionReader::
452 process_incoming_udp_data(SocketInfo *sinfo) {
453  Socket_UDP *socket;
454  DCAST_INTO_R(socket, sinfo->get_socket(), false);
455  Socket_Address addr;
456 
457  // Read as many bytes as we can.
458  char buffer[read_buffer_size];
459  int bytes_read = read_buffer_size;
460 
461  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
462 
463  if (!okflag) {
464  finish_socket(sinfo);
465  return false;
466 
467  } else if (bytes_read == 0) {
468  // The socket was closed (!). This shouldn't happen with a UDP
469  // connection. Oh well. Report that and return.
470  if (_manager != nullptr) {
471  _manager->connection_reset(sinfo->_connection, 0);
472  }
473  finish_socket(sinfo);
474  return false;
475  }
476 
477  // Since we are not running in raw mode, we decode the header to determine
478  // how big the datagram is. This means we must have read at least a full
479  // header.
480  if (bytes_read < datagram_udp_header_size) {
481  net_cat.error()
482  << "Did not read entire header, discarding UDP datagram.\n";
483  finish_socket(sinfo);
484  return true;
485  }
486 
487  DatagramUDPHeader header(buffer);
488 
489  char *dp = buffer + datagram_udp_header_size;
490  bytes_read -= datagram_udp_header_size;
491 
492  NetDatagram datagram(dp, bytes_read);
493 
494  // Now that we've read all the data, it's time to finish the socket so
495  // another thread can read the next datagram.
496  finish_socket(sinfo);
497 
498  if (_shutdown) {
499  return false;
500  }
501 
502  // And now do whatever we need to do to process the datagram.
503  if (!header.verify_datagram(datagram)) {
504  net_cat.error()
505  << "Ignoring invalid UDP datagram.\n";
506  } else {
507  datagram.set_connection(sinfo->_connection);
508  datagram.set_address(NetAddress(addr));
509 
510  if (net_cat.is_spam()) {
511  net_cat.spam()
512  << "Received UDP datagram with "
513  << datagram_udp_header_size + datagram.get_length()
514  << " bytes on " << (void *)datagram.get_connection()
515  << " from " << datagram.get_address() << "\n";
516  }
517 
518  receive_datagram(datagram);
519  }
520 
521  return true;
522 }
523 
524 /**
525  *
526  */
527 bool ConnectionReader::
528 process_incoming_tcp_data(SocketInfo *sinfo) {
529  Socket_TCP *socket;
530  DCAST_INTO_R(socket, sinfo->get_socket(), false);
531 
532  // Read only the header bytes to start with.
533  char buffer[read_buffer_size];
534  int header_bytes_read = 0;
535 
536  // First, we have to read the first _tcp_header_size bytes.
537  while (header_bytes_read < _tcp_header_size) {
538  int bytes_read =
539  socket->RecvData(buffer + header_bytes_read,
540  _tcp_header_size - header_bytes_read);
541 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
542  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
543  socket->Active()) {
545  bytes_read = socket->RecvData(buffer + header_bytes_read,
546  _tcp_header_size - header_bytes_read);
547  }
548 #endif // SIMPLE_THREADS
549 
550  if (bytes_read <= 0) {
551  // The socket was closed. Report that and return.
552  if (_manager != nullptr) {
553  _manager->connection_reset(sinfo->_connection, 0);
554  }
555  finish_socket(sinfo);
556  return false;
557  }
558 
559  header_bytes_read += bytes_read;
561  }
562 
563  // Now we must decode the header to determine how big the datagram is. This
564  // means we must have read at least a full header.
565  if (header_bytes_read != _tcp_header_size) {
566  // This should actually be impossible, by the read-loop logic above.
567  net_cat.error()
568  << "Did not read entire header, discarding TCP datagram.\n";
569  finish_socket(sinfo);
570  return true;
571  }
572 
573  DatagramTCPHeader header(buffer, _tcp_header_size);
574  int size = header.get_datagram_size(_tcp_header_size);
575 
576  // We have to loop until the entire datagram is read.
577  NetDatagram datagram;
578 
579  while (!_shutdown && (int)datagram.get_length() < size) {
580  int bytes_read;
581 
582  int read_bytes = read_buffer_size;
583 #ifdef SIMPLE_THREADS
584  // In the SIMPLE_THREADS case, we want to limit the number of bytes we
585  // read in a single epoch, to minimize the impact on the other threads.
586  read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
587 #endif
588 
589  bytes_read =
590  socket->RecvData(buffer, min(read_bytes,
591  (int)(size - datagram.get_length())));
592 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
593  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
594  socket->Active()) {
596  bytes_read =
597  socket->RecvData(buffer, min(read_bytes,
598  (int)(size - datagram.get_length())));
599  }
600 #endif // SIMPLE_THREADS
601 
602  char *dp = buffer;
603 
604  if (bytes_read <= 0) {
605  // The socket was closed. Report that and return.
606  if (_manager != nullptr) {
607  _manager->connection_reset(sinfo->_connection, 0);
608  }
609  finish_socket(sinfo);
610  return false;
611  }
612 
613  int datagram_bytes =
614  min(bytes_read, (int)(size - datagram.get_length()));
615  datagram.append_data(dp, datagram_bytes);
616 
617  if (bytes_read > datagram_bytes) {
618  // There were some extra bytes at the end of the datagram. Maybe the
619  // beginning of the next datagram? Huh.
620  net_cat.error()
621  << "Discarding " << bytes_read - datagram_bytes
622  << " bytes following TCP datagram.\n";
623  }
625  }
626 
627  // Now that we've read all the data, it's time to finish the socket so
628  // another thread can read the next datagram.
629  finish_socket(sinfo);
630 
631  if (_shutdown) {
632  return false;
633  }
634 
635  // And now do whatever we need to do to process the datagram.
636  if (!header.verify_datagram(datagram, _tcp_header_size)) {
637  net_cat.error()
638  << "Ignoring invalid TCP datagram.\n";
639  } else {
640  datagram.set_connection(sinfo->_connection);
641  datagram.set_address(NetAddress(socket->GetPeerName()));
642 
643  if (net_cat.is_spam()) {
644  net_cat.spam()
645  << "Received TCP datagram with "
646  << _tcp_header_size + datagram.get_length()
647  << " bytes on " << (void *)datagram.get_connection()
648  << " from " << datagram.get_address() << "\n";
649  }
650 
651  receive_datagram(datagram);
652  }
653 
654  return true;
655 }
656 
657 /**
658  *
659  */
660 bool ConnectionReader::
661 process_raw_incoming_udp_data(SocketInfo *sinfo) {
662  Socket_UDP *socket;
663  DCAST_INTO_R(socket, sinfo->get_socket(), false);
664  Socket_Address addr;
665 
666  // Read as many bytes as we can.
667  char buffer[read_buffer_size];
668  int bytes_read = read_buffer_size;
669 
670  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
671 
672  if (!okflag) {
673  finish_socket(sinfo);
674  return false;
675 
676  } else if (bytes_read == 0) {
677  // The socket was closed (!). This shouldn't happen with a UDP
678  // connection. Oh well. Report that and return.
679  if (_manager != nullptr) {
680  _manager->connection_reset(sinfo->_connection, 0);
681  }
682  finish_socket(sinfo);
683  return false;
684  }
685 
686  // In raw mode, we simply extract all the bytes and make that a datagram.
687  NetDatagram datagram(buffer, bytes_read);
688 
689  // Now that we've read all the data, it's time to finish the socket so
690  // another thread can read the next datagram.
691  finish_socket(sinfo);
692 
693  if (_shutdown) {
694  return false;
695  }
696 
697  datagram.set_connection(sinfo->_connection);
698  datagram.set_address(NetAddress(addr));
699 
700  if (net_cat.is_spam()) {
701  net_cat.spam()
702  << "Received raw UDP datagram with " << datagram.get_length()
703  << " bytes on " << (void *)datagram.get_connection()
704  << " from " << datagram.get_address() << "\n";
705  }
706 
707  receive_datagram(datagram);
708 
709  return true;
710 }
711 
712 /**
713  *
714  */
715 bool ConnectionReader::
716 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
717  Socket_TCP *socket;
718  DCAST_INTO_R(socket, sinfo->get_socket(), false);
719 
720  // Read as many bytes as we can.
721  char buffer[read_buffer_size];
722  int bytes_read = socket->RecvData(buffer, read_buffer_size);
723 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
724  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
725  socket->Active()) {
727  bytes_read = socket->RecvData(buffer, read_buffer_size);
728  }
729 #endif // SIMPLE_THREADS
730 
731  if (bytes_read <= 0) {
732  // The socket was closed. Report that and return.
733  if (_manager != nullptr) {
734  _manager->connection_reset(sinfo->_connection, 0);
735  }
736  finish_socket(sinfo);
737  return false;
738  }
739 
740  // In raw mode, we simply extract all the bytes and make that a datagram.
741  NetDatagram datagram(buffer, bytes_read);
742 
743  // Now that we've read all the data, it's time to finish the socket so
744  // another thread can read the next datagram.
745  finish_socket(sinfo);
746 
747  if (_shutdown) {
748  return false;
749  }
750 
751  datagram.set_connection(sinfo->_connection);
752  datagram.set_address(NetAddress(socket->GetPeerName()));
753 
754  if (net_cat.is_spam()) {
755  net_cat.spam()
756  << "Received raw TCP datagram with " << datagram.get_length()
757  << " bytes on " << (void *)datagram.get_connection()
758  << " from " << datagram.get_address() << "\n";
759  }
760 
761  receive_datagram(datagram);
762 
763  return true;
764 }
765 
766 /**
767  * This is the actual executing function for each thread.
768  */
769 void ConnectionReader::
770 thread_run(int thread_index) {
771  nassertv(!_polling);
772  nassertv(_threads[thread_index] == Thread::get_current_thread());
773 
774  while (!_shutdown) {
775  SocketInfo *sinfo =
776  get_next_available_socket(true, thread_index);
777  if (sinfo != nullptr) {
778  process_incoming_data(sinfo);
780  } else {
782  }
783  }
784 }
785 
786 
787 /**
788  * Polls the known connections for activity and returns the next one known to
789  * have activity, or NULL if no activity is detected within the timeout
790  * interval.
791  *
792  * This function may block indefinitely if it is being called by multiple
793  * threads; if there are no other threads, it may block only if allow_block is
794  * true.
795  */
796 ConnectionReader::SocketInfo *ConnectionReader::
797 get_next_available_socket(bool allow_block, int current_thread_index) {
798  // Go to sleep on the select() mutex. This guarantees that only one thread
799  // is in this function at a time.
800  MutexHolder holder(_select_mutex);
801 
802  do {
803  // First, check the result from the previous select call. If there are
804  // any sockets remaining there, process them first.
805  while (!_shutdown && _num_results > 0) {
806  nassertr(_next_index < (int)_selecting_sockets.size(), nullptr);
807  int i = _next_index;
808  _next_index++;
809 
810  if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
811  _num_results--;
812  SocketInfo *sinfo = _selecting_sockets[i];
813 
814  // Some noise on this socket.
815  sinfo->_busy = true;
816  return sinfo;
817  }
818  }
819 
820  bool interrupted;
821  do {
822  interrupted = false;
823 
824  // Ok, no results from previous select calls. Prepare to set up for a
825  // new select.
826 
827  // First, report to anyone else who cares that we're the thread about to
828  // do the poll. That way, if any new sockets come available while we're
829  // polling, we can service them.
830  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
831 
832  rebuild_select_list();
833 
834  // Now we can execute the select.
835  _num_results = 0;
836  _next_index = 0;
837 
838  if (!_shutdown) {
839  uint32_t timeout = (uint32_t)(get_net_max_block() * 1000.0);
840  if (!allow_block) {
841  timeout = 0;
842  }
843 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
844  // In the presence of SIMPLE_THREADS, we never wait at all, but rather
845  // we yield the thread if we come up empty (so that we won't block the
846  // entire process).
847  timeout = 0;
848 #endif
849 
850  _num_results = _fdset.WaitForRead(false, timeout);
851  }
852 
853  if (_num_results == 0 && allow_block) {
854  // If we reached net_max_block, go back and reconsider. (We never
855  // timeout indefinitely, so we can check the shutdown flag every once
856  // in a while.)
857  interrupted = true;
859 
860  } else if (_num_results < 0) {
861  // If we had an error, just return. But yield the timeslice first.
863  return nullptr;
864  }
865  } while (!_shutdown && interrupted);
866 
867  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
868 
869  // Repeat the above until we (a) find a socket with actual noise on it, or
870  // (b) return from PR_Poll() with no sockets available.
871  } while (!_shutdown && _num_results > 0);
872 
873  return nullptr;
874 }
875 
876 
877 /**
878  * Rebuilds the _fdset and _selecting_sockets arrays based on the sockets that
879  * are currently available for selecting.
880  */
881 void ConnectionReader::
882 rebuild_select_list() {
883  _fdset.clear();
884  _selecting_sockets.clear();
885 
886  LightMutexHolder holder(_sockets_mutex);
887  Sockets::const_iterator si;
888  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
889  SocketInfo *sinfo = (*si);
890  if (!sinfo->_busy && !sinfo->_error) {
891  _fdset.setForSocket(*sinfo->get_socket());
892  _selecting_sockets.push_back(sinfo);
893  }
894  }
895 
896  // This is also a fine time to delete the contents of the _removed_sockets
897  // list.
898  if (!_removed_sockets.empty()) {
899  Sockets still_busy_sockets;
900  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
901  SocketInfo *sinfo = (*si);
902  if (sinfo->_busy) {
903  still_busy_sockets.push_back(sinfo);
904  } else {
905  delete sinfo;
906  }
907  }
908  _removed_sockets.swap(still_busy_sockets);
909  }
910 }
911 
912 /**
913  * Adds the sockets from this ConnectionReader (or ConnectionListener) to the
914  * indicated fdset. This is used by ConnectionManager::block() to build an
915  * fdset of all attached readers.
916  */
917 void ConnectionReader::
918 accumulate_fdset(Socket_fdset &fdset) {
919  LightMutexHolder holder(_sockets_mutex);
920  Sockets::const_iterator si;
921  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
922  SocketInfo *sinfo = (*si);
923  if (!sinfo->_busy && !sinfo->_error) {
924  fdset.setForSocket(*sinfo->get_socket());
925  }
926  }
927 }
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:68
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:12
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
Definition: socket_ip.h:27
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
Definition: datagram.cxx:129
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
std::string RecvData(int max_len)
Read the data from the connection - if error 0 if socket closed for read or length is 0 + bytes read ...
Definition: socket_tcp.h:185
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
The primary interface to the low-level networking layer in this package.
void shutdown()
Terminates all threads cleanly.
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:253
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:96
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:24
Similar to MutexHolder, but for a light mutex.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
An interface to whatever real-time clock we might have available in the current environment.
Definition: trueClock.h:33
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A simple place to store and manipulate tcp and port address for communication layer.
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
A thread; that is, a lightweight process.
Definition: thread.h:46
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
Definition: netDatagram.cxx:88
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
Definition: socket_fdset.h:81
void clear()
Marks the content as empty.
Definition: socket_fdset.h:124
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:29
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ConnectionReader(ConnectionManager *manager, int num_threads, const std::string &thread_name=std::string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition: netAddress.h:25
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.