Panda3D
 All Classes Functions Variables Enumerations
connectionReader.cxx
1 // Filename: connectionReader.cxx
2 // Created by: drose (08Feb00)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "connectionReader.h"
16 #include "dcast.h"
17 #include "connectionManager.h"
18 #include "netDatagram.h"
19 #include "datagramTCPHeader.h"
20 #include "datagramUDPHeader.h"
21 #include "config_net.h"
22 #include "trueClock.h"
23 #include "socket_udp.h"
24 #include "socket_tcp.h"
25 #include "mutexHolder.h"
26 #include "lightMutexHolder.h"
27 #include "pnotify.h"
28 #include "atomicAdjust.h"
29 #include "config_downloader.h"
30 
31 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
32 
33 ////////////////////////////////////////////////////////////////////
34 // Function: ConnectionReader::SocketInfo::Constructor
35 // Access: Public
36 // Description:
37 ////////////////////////////////////////////////////////////////////
38 ConnectionReader::SocketInfo::
39 SocketInfo(const PT(Connection) &connection) :
40  _connection(connection)
41 {
42  _busy = false;
43  _error = false;
44 }
45 
46 ////////////////////////////////////////////////////////////////////
47 // Function: ConnectionReader::SocketInfo::is_udp
48 // Access: Public
49 // Description:
50 ////////////////////////////////////////////////////////////////////
51 bool ConnectionReader::SocketInfo::
52 is_udp() const {
53  return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
54 }
55 
56 ////////////////////////////////////////////////////////////////////
57 // Function: ConnectionReader::SocketInfo::get_socket
58 // Access: Public
59 // Description:
60 ////////////////////////////////////////////////////////////////////
61 Socket_IP *ConnectionReader::SocketInfo::
62 get_socket() const {
63  return _connection->get_socket();
64 }
65 
66 ////////////////////////////////////////////////////////////////////
67 // Function: ConnectionReader::ReaderThread::Constructor
68 // Access: Public
69 // Description:
70 ////////////////////////////////////////////////////////////////////
71 ConnectionReader::ReaderThread::
72 ReaderThread(ConnectionReader *reader, const string &thread_name,
73  int thread_index) :
74  Thread(make_thread_name(thread_name, thread_index),
75  make_thread_name(thread_name, thread_index)),
76  _reader(reader),
77  _thread_index(thread_index)
78 {
79 }
80 
81 ////////////////////////////////////////////////////////////////////
82 // Function: ConnectionReader::ReaderThread::thread_main
83 // Access: Public, Virtual
84 // Description:
85 ////////////////////////////////////////////////////////////////////
86 void ConnectionReader::ReaderThread::
87 thread_main() {
88  _reader->thread_run(_thread_index);
89 }
90 
91 ////////////////////////////////////////////////////////////////////
92 // Function: ConnectionReader::Constructor
93 // Access: Published
94 // Description: Creates a new ConnectionReader with the indicated
95 // number of threads to handle requests. If num_threads
96 // is 0, the sockets will only be read by polling,
97 // during an explicit poll() call.
98 // (QueuedConnectionReader will do this automatically.)
99 ////////////////////////////////////////////////////////////////////
101 ConnectionReader(ConnectionManager *manager, int num_threads,
102  const string &thread_name) :
103  _manager(manager)
104 {
106 #ifndef NDEBUG
107  if (num_threads != 0) {
108  if (net_cat.is_debug()) {
109  net_cat.debug()
110  << "Threading support is not available.\n";
111  }
112  }
113 #endif // NDEBUG
114  num_threads = 0;
115  }
116 
117  _raw_mode = false;
118  _tcp_header_size = tcp_header_size;
119  _polling = (num_threads <= 0);
120 
121  _shutdown = false;
122 
123  _next_index = 0;
124  _num_results = 0;
125 
126  _currently_polling_thread = -1;
127 
128  string reader_thread_name = thread_name;
129  if (thread_name.empty()) {
130  reader_thread_name = "ReaderThread";
131  }
132  int i;
133  for (i = 0; i < num_threads; i++) {
134  PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
135  _threads.push_back(thread);
136  }
137  for (i = 0; i < num_threads; i++) {
138  _threads[i]->start(net_thread_priority, true);
139  }
140 
141  _manager->add_reader(this);
142 }
143 
144 ////////////////////////////////////////////////////////////////////
145 // Function: ConnectionReader::Destructor
146 // Access: Published, Virtual
147 // Description:
148 ////////////////////////////////////////////////////////////////////
149 ConnectionReader::
150 ~ConnectionReader() {
151  if (_manager != (ConnectionManager *)NULL) {
152  _manager->remove_reader(this);
153  }
154 
155  shutdown();
156 
157  // Delete all of our old sockets.
158  Sockets::iterator si;
159  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
160  delete (*si);
161  }
162  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
163  SocketInfo *sinfo = (*si);
164  if (!sinfo->_busy) {
165  delete sinfo;
166  } else {
167  net_cat.error()
168  << "Reentrant deletion of ConnectionReader--don't delete these\n"
169  << "in response to connection_reset().\n";
170 
171  // We'll have to do the best we can to recover.
172  sinfo->_connection.clear();
173  }
174  }
175 }
176 
177 ////////////////////////////////////////////////////////////////////
178 // Function: ConnectionReader::add_connection
179 // Access: Published
180 // Description: Adds a new socket to the list of sockets the
181 // ConnectionReader will monitor. A datagram that comes
182 // in on any of the monitored sockets will be reported.
183 // In the case of a ConnectionListener, this adds a new
184 // rendezvous socket; any activity on any of the
185 // monitored sockets will cause a connection to be
186 // accepted.
187 //
188 // The return value is true if the connection was added,
189 // false if it was already there.
190 //
191 // add_connection() is thread-safe, and may be called at
192 // will by any thread.
193 ////////////////////////////////////////////////////////////////////
196  nassertr(connection != (Connection *)NULL, false);
197 
198  LightMutexHolder holder(_sockets_mutex);
199 
200  // Make sure it's not already on the _sockets list.
201  Sockets::const_iterator si;
202  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
203  if ((*si)->_connection == connection) {
204  // Whoops, already there.
205  return false;
206  }
207  }
208 
209  _sockets.push_back(new SocketInfo(connection));
210 
211  return true;
212 }
213 
214 ////////////////////////////////////////////////////////////////////
215 // Function: ConnectionReader::remove_connection
216 // Access: Published
217 // Description: Removes a socket from the list of sockets being
218 // monitored. Returns true if the socket was correctly
219 // removed, false if it was not on the list in the first
220 // place.
221 //
222 // remove_connection() is thread-safe, and may be called
223 // at will by any thread.
224 ////////////////////////////////////////////////////////////////////
227  LightMutexHolder holder(_sockets_mutex);
228 
229  // Walk through the list of sockets to find the one we're removing.
230  Sockets::iterator si;
231  si = _sockets.begin();
232  while (si != _sockets.end() && (*si)->_connection != connection) {
233  ++si;
234  }
235  if (si == _sockets.end()) {
236  return false;
237  }
238 
239  _removed_sockets.push_back(*si);
240  _sockets.erase(si);
241 
242  return true;
243 }
244 
245 ////////////////////////////////////////////////////////////////////
246 // Function: ConnectionReader::is_connection_ok
247 // Access: Published
248 // Description: Returns true if the indicated connection has been
249 // added to the ConnectionReader and is being monitored
250 // properly, false if it is not known, or if there was
251 // some error condition detected on the connection. (If
252 // there was an error condition, normally the
253 // ConnectionManager would have been informed and closed
254 // the connection.)
255 ////////////////////////////////////////////////////////////////////
258  LightMutexHolder holder(_sockets_mutex);
259 
260  // Walk through the list of sockets to find the one we're asking
261  // about.
262  Sockets::iterator si;
263  si = _sockets.begin();
264  while (si != _sockets.end() && (*si)->_connection != connection) {
265  ++si;
266  }
267  if (si == _sockets.end()) {
268  // Don't know that connection.
269  return false;
270  }
271 
272  SocketInfo *sinfo = (*si);
273  bool is_ok = !sinfo->_error;
274 
275  return is_ok;
276 }
277 
278 ////////////////////////////////////////////////////////////////////
279 // Function: ConnectionReader::poll
280 // Access: Published
281 // Description: Explicitly polls the available sockets to see if any
282 // of them have any noise. This function does nothing
283 // unless this is a polling-type ConnectionReader,
284 // i.e. it was created with zero threads (and
285 // is_polling() will return true).
286 //
287 // It is not necessary to call this explicitly for a
288 // QueuedConnectionReader.
289 ////////////////////////////////////////////////////////////////////
291 poll() {
292  if (!_polling) {
293  return;
294  }
295 
296  SocketInfo *sinfo = get_next_available_socket(false, -2);
297  if (sinfo != (SocketInfo *)NULL) {
298  double max_poll_cycle = get_net_max_poll_cycle();
299  if (max_poll_cycle < 0.0) {
300  // Continue to read all data.
301  while (sinfo != (SocketInfo *)NULL) {
302  process_incoming_data(sinfo);
303  sinfo = get_next_available_socket(false, -2);
304  }
305 
306  } else {
307  // Read only until a certain amount of time has elapsed.
308  TrueClock *global_clock = TrueClock::get_global_ptr();
309  double stop = global_clock->get_short_time() + max_poll_cycle;
310 
311  while (sinfo != (SocketInfo *)NULL) {
312  process_incoming_data(sinfo);
313  if (global_clock->get_short_time() >= stop) {
314  return;
315  }
316  sinfo = get_next_available_socket(false, -2);
317  }
318  }
319  }
320 }
321 
322 ////////////////////////////////////////////////////////////////////
323 // Function: ConnectionReader::get_manager
324 // Access: Published
325 // Description: Returns a pointer to the ConnectionManager object
326 // that serves this ConnectionReader.
327 ////////////////////////////////////////////////////////////////////
329 get_manager() const {
330  return _manager;
331 }
332 
333 ////////////////////////////////////////////////////////////////////
334 // Function: ConnectionReader::get_num_threads
335 // Access: Published
336 // Description: Returns the number of threads the ConnectionReader
337 // has been created with.
338 ////////////////////////////////////////////////////////////////////
341  return _threads.size();
342 }
343 
344 ////////////////////////////////////////////////////////////////////
345 // Function: ConnectionReader::set_raw_mode
346 // Access: Published
347 // Description: Sets the ConnectionReader into raw mode (or turns off
348 // raw mode). In raw mode, datagram headers are not
349 // expected; instead, all the data available on the pipe
350 // is treated as a single datagram.
351 //
352 // This is similar to set_tcp_header_size(0), except that it
353 // also turns off headers for UDP packets.
354 ////////////////////////////////////////////////////////////////////
356 set_raw_mode(bool mode) {
357  _raw_mode = mode;
358 }
359 
360 ////////////////////////////////////////////////////////////////////
361 // Function: ConnectionReader::get_raw_mode
362 // Access: Published
363 // Description: Returns the current setting of the raw mode flag.
364 // See set_raw_mode().
365 ////////////////////////////////////////////////////////////////////
367 get_raw_mode() const {
368  return _raw_mode;
369 }
370 
371 ////////////////////////////////////////////////////////////////////
372 // Function: ConnectionReader::set_tcp_header_size
373 // Access: Published
374 // Description: Sets the header size of TCP packets. At the present,
375 // legal values for this are 0, 2, or 4; this specifies
376 // the number of bytes to use encode the datagram length
377 // at the start of each TCP datagram. Sender and
378 // receiver must independently agree on this.
379 ////////////////////////////////////////////////////////////////////
381 set_tcp_header_size(int tcp_header_size) {
382  _tcp_header_size = tcp_header_size;
383 }
384 
385 ////////////////////////////////////////////////////////////////////
386 // Function: ConnectionReader::get_tcp_header_size
387 // Access: Published
388 // Description: Returns the current setting of TCP header size.
389 // See set_tcp_header_size().
390 ////////////////////////////////////////////////////////////////////
393  return _tcp_header_size;
394 }
395 
396 ////////////////////////////////////////////////////////////////////
397 // Function: ConnectionReader::shutdown
398 // Access: Published
399 // Description: Terminates all threads cleanly. Normally this is
400 // only called by the destructor, but it may be called
401 // explicitly before destruction.
402 ////////////////////////////////////////////////////////////////////
405  if (_shutdown) {
406  return;
407  }
408 
409  // First, begin the shutdown. This will tell our threads we want
410  // them to quit.
411  _shutdown = true;
412 
413  // Now wait for all of our threads to terminate.
414  Threads::iterator ti;
415  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
416  (*ti)->join();
417  }
418 }
419 
420 ////////////////////////////////////////////////////////////////////
421 // Function: ConnectionReader::flush_read_connection
422 // Access: Protected, Virtual
423 // Description: Attempts to read all the possible data from the
424 // indicated connection, which has just delivered a
425 // write error (and has therefore already been closed).
426 // If the connection is not monitered by this reader,
427 // does nothing.
428 ////////////////////////////////////////////////////////////////////
429 void ConnectionReader::
430 flush_read_connection(Connection *connection) {
431  // Ensure it doesn't get deleted.
432  SocketInfo sinfo(connection);
433 
434  if (!remove_connection(connection)) {
435  // Not already in the reader.
436  return;
437  }
438 
439  // The connection was previously in the reader, but has now been
440  // removed. Now we can flush it completely. We check if there is
441  // any read data available on just this one socket; we can do this
442  // right here in this thread, since we've already removed this
443  // connection from the reader.
444 
445  Socket_fdset fdset;
446  fdset.clear();
447  fdset.setForSocket(*(sinfo.get_socket()));
448  int num_results = fdset.WaitForRead(true, 0);
449  while (num_results != 0) {
450  sinfo._busy = true;
451  if (!process_incoming_data(&sinfo)) {
452  break;
453  }
454  fdset.setForSocket(*(sinfo.get_socket()));
455  num_results = fdset.WaitForRead(true, 0);
456  }
457 }
458 
459 ////////////////////////////////////////////////////////////////////
460 // Function: ConnectionReader::clear_manager
461 // Access: Protected
462 // Description: This should normally only be called when the
463 // associated ConnectionManager destructs. It resets
464 // the ConnectionManager pointer to NULL so we don't
465 // have a floating pointer. This makes the
466 // ConnectionReader invalid; presumably it also will be
467 // destructed momentarily.
468 ////////////////////////////////////////////////////////////////////
469 void ConnectionReader::
470 clear_manager() {
471  _manager = (ConnectionManager *)NULL;
472 }
473 
474 ////////////////////////////////////////////////////////////////////
475 // Function: ConnectionReader::finish_socket
476 // Access: Protected
477 // Description: To be called when a socket has been fully read and is
478 // ready for polling for additional data.
479 ////////////////////////////////////////////////////////////////////
480 void ConnectionReader::
481 finish_socket(SocketInfo *sinfo) {
482  nassertv(sinfo->_busy);
483 
484  // By marking the SocketInfo nonbusy, we make it available for
485  // future polls.
486  sinfo->_busy = false;
487 }
488 
489 ////////////////////////////////////////////////////////////////////
490 // Function: ConnectionReader::process_incoming_data
491 // Access: Protected, Virtual
492 // Description: This is run within a thread when the call to
493 // select() indicates there is data available on a
494 // socket. Returns true if the data is read
495 // successfully, false on failure (for instance, because
496 // the connection is closed).
497 ////////////////////////////////////////////////////////////////////
498 bool ConnectionReader::
499 process_incoming_data(SocketInfo *sinfo) {
500  if (_raw_mode) {
501  if (sinfo->is_udp()) {
502  return process_raw_incoming_udp_data(sinfo);
503  } else {
504  return process_raw_incoming_tcp_data(sinfo);
505  }
506  } else {
507  if (sinfo->is_udp()) {
508  return process_incoming_udp_data(sinfo);
509  } else {
510  return process_incoming_tcp_data(sinfo);
511  }
512  }
513 }
514 
515 ////////////////////////////////////////////////////////////////////
516 // Function: ConnectionReader::process_incoming_udp_data
517 // Access: Protected
518 // Description:
519 ////////////////////////////////////////////////////////////////////
520 bool ConnectionReader::
521 process_incoming_udp_data(SocketInfo *sinfo) {
522  Socket_UDP *socket;
523  DCAST_INTO_R(socket, sinfo->get_socket(), false);
524  Socket_Address addr;
525 
526  // Read as many bytes as we can.
527  char buffer[read_buffer_size];
528  int bytes_read = read_buffer_size;
529 
530  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
531 
532  if (!okflag) {
533  finish_socket(sinfo);
534  return false;
535 
536  } else if (bytes_read == 0) {
537  // The socket was closed (!). This shouldn't happen with a UDP
538  // connection. Oh well. Report that and return.
539  if (_manager != (ConnectionManager *)NULL) {
540  _manager->connection_reset(sinfo->_connection, 0);
541  }
542  finish_socket(sinfo);
543  return false;
544  }
545 
546  // Since we are not running in raw mode, we decode the header to
547  // determine how big the datagram is. This means we must have read
548  // at least a full header.
549  if (bytes_read < datagram_udp_header_size) {
550  net_cat.error()
551  << "Did not read entire header, discarding UDP datagram.\n";
552  finish_socket(sinfo);
553  return true;
554  }
555 
556  DatagramUDPHeader header(buffer);
557 
558  char *dp = buffer + datagram_udp_header_size;
559  bytes_read -= datagram_udp_header_size;
560 
561  NetDatagram datagram(dp, bytes_read);
562 
563  // Now that we've read all the data, it's time to finish the socket
564  // so another thread can read the next datagram.
565  finish_socket(sinfo);
566 
567  if (_shutdown) {
568  return false;
569  }
570 
571  // And now do whatever we need to do to process the datagram.
572  if (!header.verify_datagram(datagram)) {
573  net_cat.error()
574  << "Ignoring invalid UDP datagram.\n";
575  } else {
576  datagram.set_connection(sinfo->_connection);
577  datagram.set_address(NetAddress(addr));
578 
579  if (net_cat.is_spam()) {
580  net_cat.spam()
581  << "Received UDP datagram with "
582  << datagram_udp_header_size + datagram.get_length()
583  << " bytes on " << (void *)datagram.get_connection()
584  << " from " << datagram.get_address() << "\n";
585  }
586 
587  receive_datagram(datagram);
588  }
589 
590  return true;
591 }
592 
593 ////////////////////////////////////////////////////////////////////
594 // Function: ConnectionReader::process_incoming_tcp_data
595 // Access: Protected
596 // Description:
597 ////////////////////////////////////////////////////////////////////
598 bool ConnectionReader::
599 process_incoming_tcp_data(SocketInfo *sinfo) {
600  Socket_TCP *socket;
601  DCAST_INTO_R(socket, sinfo->get_socket(), false);
602 
603  // Read only the header bytes to start with.
604  char buffer[read_buffer_size];
605  int header_bytes_read = 0;
606 
607  // First, we have to read the first _tcp_header_size bytes.
608  while (header_bytes_read < _tcp_header_size) {
609  int bytes_read =
610  socket->RecvData(buffer + header_bytes_read,
611  _tcp_header_size - header_bytes_read);
612 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
613  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
614  socket->Active()) {
616  bytes_read = socket->RecvData(buffer + header_bytes_read,
617  _tcp_header_size - header_bytes_read);
618  }
619 #endif // SIMPLE_THREADS
620 
621  if (bytes_read <= 0) {
622  // The socket was closed. Report that and return.
623  if (_manager != (ConnectionManager *)NULL) {
624  _manager->connection_reset(sinfo->_connection, 0);
625  }
626  finish_socket(sinfo);
627  return false;
628  }
629 
630  header_bytes_read += bytes_read;
632  }
633 
634  // Now we must decode the header to determine how big the datagram
635  // is. This means we must have read at least a full header.
636  if (header_bytes_read != _tcp_header_size) {
637  // This should actually be impossible, by the read-loop logic
638  // above.
639  net_cat.error()
640  << "Did not read entire header, discarding TCP datagram.\n";
641  finish_socket(sinfo);
642  return true;
643  }
644 
645  DatagramTCPHeader header(buffer, _tcp_header_size);
646  int size = header.get_datagram_size(_tcp_header_size);
647 
648  // We have to loop until the entire datagram is read.
649  NetDatagram datagram;
650 
651  while (!_shutdown && (int)datagram.get_length() < size) {
652  int bytes_read;
653 
654  int read_bytes = read_buffer_size;
655 #ifdef SIMPLE_THREADS
656  // In the SIMPLE_THREADS case, we want to limit the number of
657  // bytes we read in a single epoch, to minimize the impact on the
658  // other threads.
659  read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
660 #endif
661 
662  bytes_read =
663  socket->RecvData(buffer, min(read_bytes,
664  (int)(size - datagram.get_length())));
665 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
666  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
667  socket->Active()) {
669  bytes_read =
670  socket->RecvData(buffer, min(read_bytes,
671  (int)(size - datagram.get_length())));
672  }
673 #endif // SIMPLE_THREADS
674 
675  char *dp = buffer;
676 
677  if (bytes_read <= 0) {
678  // The socket was closed. Report that and return.
679  if (_manager != (ConnectionManager *)NULL) {
680  _manager->connection_reset(sinfo->_connection, 0);
681  }
682  finish_socket(sinfo);
683  return false;
684  }
685 
686  int datagram_bytes =
687  min(bytes_read, (int)(size - datagram.get_length()));
688  datagram.append_data(dp, datagram_bytes);
689 
690  if (bytes_read > datagram_bytes) {
691  // There were some extra bytes at the end of the datagram. Maybe
692  // the beginning of the next datagram? Huh.
693  net_cat.error()
694  << "Discarding " << bytes_read - datagram_bytes
695  << " bytes following TCP datagram.\n";
696  }
698  }
699 
700  // Now that we've read all the data, it's time to finish the socket
701  // so another thread can read the next datagram.
702  finish_socket(sinfo);
703 
704  if (_shutdown) {
705  return false;
706  }
707 
708  // And now do whatever we need to do to process the datagram.
709  if (!header.verify_datagram(datagram, _tcp_header_size)) {
710  net_cat.error()
711  << "Ignoring invalid TCP datagram.\n";
712  } else {
713  datagram.set_connection(sinfo->_connection);
714  datagram.set_address(NetAddress(socket->GetPeerName()));
715 
716  if (net_cat.is_spam()) {
717  net_cat.spam()
718  << "Received TCP datagram with "
719  << _tcp_header_size + datagram.get_length()
720  << " bytes on " << (void *)datagram.get_connection()
721  << " from " << datagram.get_address() << "\n";
722  }
723 
724  receive_datagram(datagram);
725  }
726 
727  return true;
728 }
729 
730 ////////////////////////////////////////////////////////////////////
731 // Function: ConnectionReader::process_raw_incoming_udp_data
732 // Access: Protected
733 // Description:
734 ////////////////////////////////////////////////////////////////////
735 bool ConnectionReader::
736 process_raw_incoming_udp_data(SocketInfo *sinfo) {
737  Socket_UDP *socket;
738  DCAST_INTO_R(socket, sinfo->get_socket(), false);
739  Socket_Address addr;
740 
741  // Read as many bytes as we can.
742  char buffer[read_buffer_size];
743  int bytes_read = read_buffer_size;
744 
745  bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
746 
747  if (!okflag) {
748  finish_socket(sinfo);
749  return false;
750 
751  } else if (bytes_read == 0) {
752  // The socket was closed (!). This shouldn't happen with a UDP
753  // connection. Oh well. Report that and return.
754  if (_manager != (ConnectionManager *)NULL) {
755  _manager->connection_reset(sinfo->_connection, 0);
756  }
757  finish_socket(sinfo);
758  return false;
759  }
760 
761  // In raw mode, we simply extract all the bytes and make that a
762  // datagram.
763  NetDatagram datagram(buffer, bytes_read);
764 
765  // Now that we've read all the data, it's time to finish the socket
766  // so another thread can read the next datagram.
767  finish_socket(sinfo);
768 
769  if (_shutdown) {
770  return false;
771  }
772 
773  datagram.set_connection(sinfo->_connection);
774  datagram.set_address(NetAddress(addr));
775 
776  if (net_cat.is_spam()) {
777  net_cat.spam()
778  << "Received raw UDP datagram with " << datagram.get_length()
779  << " bytes on " << (void *)datagram.get_connection()
780  << " from " << datagram.get_address() << "\n";
781  }
782 
783  receive_datagram(datagram);
784 
785  return true;
786 }
787 
788 ////////////////////////////////////////////////////////////////////
789 // Function: ConnectionReader::process_raw_incoming_tcp_data
790 // Access: Protected
791 // Description:
792 ////////////////////////////////////////////////////////////////////
793 bool ConnectionReader::
794 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
795  Socket_TCP *socket;
796  DCAST_INTO_R(socket, sinfo->get_socket(), false);
797 
798  // Read as many bytes as we can.
799  char buffer[read_buffer_size];
800  int bytes_read = socket->RecvData(buffer, read_buffer_size);
801 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
802  while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
803  socket->Active()) {
805  bytes_read = socket->RecvData(buffer, read_buffer_size);
806  }
807 #endif // SIMPLE_THREADS
808 
809  if (bytes_read <= 0) {
810  // The socket was closed. Report that and return.
811  if (_manager != (ConnectionManager *)NULL) {
812  _manager->connection_reset(sinfo->_connection, 0);
813  }
814  finish_socket(sinfo);
815  return false;
816  }
817 
818  // In raw mode, we simply extract all the bytes and make that a
819  // datagram.
820  NetDatagram datagram(buffer, bytes_read);
821 
822  // Now that we've read all the data, it's time to finish the socket
823  // so another thread can read the next datagram.
824  finish_socket(sinfo);
825 
826  if (_shutdown) {
827  return false;
828  }
829 
830  datagram.set_connection(sinfo->_connection);
831  datagram.set_address(NetAddress(socket->GetPeerName()));
832 
833  if (net_cat.is_spam()) {
834  net_cat.spam()
835  << "Received raw TCP datagram with " << datagram.get_length()
836  << " bytes on " << (void *)datagram.get_connection()
837  << " from " << datagram.get_address() << "\n";
838  }
839 
840  receive_datagram(datagram);
841 
842  return true;
843 }
844 
845 ////////////////////////////////////////////////////////////////////
846 // Function: ConnectionReader::thread_run
847 // Access: Private
848 // Description: This is the actual executing function for each
849 // thread.
850 ////////////////////////////////////////////////////////////////////
851 void ConnectionReader::
852 thread_run(int thread_index) {
853  nassertv(!_polling);
854  nassertv(_threads[thread_index] == Thread::get_current_thread());
855 
856  while (!_shutdown) {
857  SocketInfo *sinfo =
858  get_next_available_socket(true, thread_index);
859  if (sinfo != (SocketInfo *)NULL) {
860  process_incoming_data(sinfo);
862  } else {
864  }
865  }
866 }
867 
868 
869 ////////////////////////////////////////////////////////////////////
870 // Function: ConnectionReader::get_next_available_socket
871 // Access: Private
872 // Description: Polls the known connections for activity and returns
873 // the next one known to have activity, or NULL if no
874 // activity is detected within the timeout interval.
875 //
876 // This function may block indefinitely if it is being
877 // called by multiple threads; if there are no other
878 // threads, it may block only if allow_block is true.
879 ////////////////////////////////////////////////////////////////////
880 ConnectionReader::SocketInfo *ConnectionReader::
881 get_next_available_socket(bool allow_block, int current_thread_index) {
882  // Go to sleep on the select() mutex. This guarantees that only one
883  // thread is in this function at a time.
884  MutexHolder holder(_select_mutex);
885 
886  do {
887  // First, check the result from the previous select call. If
888  // there are any sockets remaining there, process them first.
889  while (!_shutdown && _num_results > 0) {
890  nassertr(_next_index < (int)_selecting_sockets.size(), NULL);
891  int i = _next_index;
892  _next_index++;
893 
894  if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
895  _num_results--;
896  SocketInfo *sinfo = _selecting_sockets[i];
897 
898  // Some noise on this socket.
899  sinfo->_busy = true;
900  return sinfo;
901  }
902  }
903 
904  bool interrupted;
905  do {
906  interrupted = false;
907 
908  // Ok, no results from previous select calls. Prepare to set up
909  // for a new select.
910 
911  // First, report to anyone else who cares that we're the thread
912  // about to do the poll. That way, if any new sockets come
913  // available while we're polling, we can service them.
914  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
915 
916  rebuild_select_list();
917 
918  // Now we can execute the select.
919  _num_results = 0;
920  _next_index = 0;
921 
922  if (!_shutdown) {
923  PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0);
924  if (!allow_block) {
925  timeout = 0;
926  }
927 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
928  // In the presence of SIMPLE_THREADS, we never wait at all,
929  // but rather we yield the thread if we come up empty (so that
930  // we won't block the entire process).
931  timeout = 0;
932 #endif
933 
934  _num_results = _fdset.WaitForRead(false, timeout);
935  }
936 
937  if (_num_results == 0 && allow_block) {
938  // If we reached net_max_block, go back and reconsider. (We
939  // never timeout indefinitely, so we can check the shutdown
940  // flag every once in a while.)
941  interrupted = true;
943 
944  } else if (_num_results < 0) {
945  // If we had an error, just return. But yield the timeslice
946  // first.
948  return (SocketInfo *)NULL;
949  }
950  } while (!_shutdown && interrupted);
951 
952  AtomicAdjust::set(_currently_polling_thread, current_thread_index);
953 
954  // Repeat the above until we (a) find a socket with actual noise
955  // on it, or (b) return from PR_Poll() with no sockets available.
956  } while (!_shutdown && _num_results > 0);
957 
958  return (SocketInfo *)NULL;
959 }
960 
961 
962 ////////////////////////////////////////////////////////////////////
963 // Function: ConnectionReader::rebuild_select_list
964 // Access: Private
965 // Description: Rebuilds the _fdset and _selecting_sockets arrays
966 // based on the sockets that are currently available for
967 // selecting.
968 ////////////////////////////////////////////////////////////////////
969 void ConnectionReader::
970 rebuild_select_list() {
971  _fdset.clear();
972  _selecting_sockets.clear();
973 
974  LightMutexHolder holder(_sockets_mutex);
975  Sockets::const_iterator si;
976  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
977  SocketInfo *sinfo = (*si);
978  if (!sinfo->_busy && !sinfo->_error) {
979  _fdset.setForSocket(*sinfo->get_socket());
980  _selecting_sockets.push_back(sinfo);
981  }
982  }
983 
984  // This is also a fine time to delete the contents of the
985  // _removed_sockets list.
986  if (!_removed_sockets.empty()) {
987  Sockets still_busy_sockets;
988  for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
989  SocketInfo *sinfo = (*si);
990  if (sinfo->_busy) {
991  still_busy_sockets.push_back(sinfo);
992  } else {
993  delete sinfo;
994  }
995  }
996  _removed_sockets.swap(still_busy_sockets);
997  }
998 }
999 
1000 ////////////////////////////////////////////////////////////////////
1001 // Function: ConnectionReader::accumulate_fdset
1002 // Access: Private
1003 // Description: Adds the sockets from this ConnectionReader (or
1004 // ConnectionListener) to the indicated fdset. This is
1005 // used by ConnectionManager::block() to build an fdset
1006 // of all attached readers.
1007 ////////////////////////////////////////////////////////////////////
1008 void ConnectionReader::
1009 accumulate_fdset(Socket_fdset &fdset) {
1010  LightMutexHolder holder(_sockets_mutex);
1011  Sockets::const_iterator si;
1012  for (si = _sockets.begin(); si != _sockets.end(); ++si) {
1013  SocketInfo *sinfo = (*si);
1014  if (!sinfo->_busy && !sinfo->_error) {
1015  fdset.setForSocket(*sinfo->get_socket());
1016  }
1017  }
1018 }
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:81
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:15
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:43
Base functionality for a INET domain Socket this call should be the starting point for all other unix...
Definition: socket_ip.h:34
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
Definition: datagram.cxx:144
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
std::string RecvData(int max_len)
Read the data from the connection.
Definition: socket_tcp.h:210
ConnectionReader(ConnectionManager *manager, int num_threads, const string &thread_name=string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
The primary interface to the low-level networking layer in this package.
void shutdown()
Terminates all threads cleanly.
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:29
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:263
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:457
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:105
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:248
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:257
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
Definition: thread.I:145
int get_tcp_header_size() const
Returns the current setting of TCP header size.
void set_connection(const PT(Connection)&connection)
Specifies the socket to which the datagram should be written.
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
static bool is_threading_supported()
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.I:185
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
Definition: socket_fdset.h:89
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:28
Similar to MutexHolder, but for a light mutex.
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
An interface to whatever real-time clock we might have available in the current environment.
Definition: trueClock.h:38
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
A simple place to store and munipulate tcp and port address for communication layer.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
A thread; that is, a lightweight process.
Definition: thread.h:51
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
void clear()
Marks the content as empty.
Definition: socket_fdset.h:138
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:32
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent...
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound...
Definition: netAddress.h:27