Panda3D
Loading...
Searching...
No Matches
connectionReader.cxx
Go to the documentation of this file.
1/**
2 * PANDA 3D SOFTWARE
3 * Copyright (c) Carnegie Mellon University. All rights reserved.
4 *
5 * All use of this software is subject to the terms of the revised BSD
6 * license. You should have received a copy of this license along
7 * with this source code in a file named "LICENSE."
8 *
9 * @file connectionReader.cxx
10 * @author drose
11 * @date 2000-02-08
12 */
13
14#include "connectionReader.h"
15#include "dcast.h"
16#include "connectionManager.h"
17#include "netDatagram.h"
18#include "datagramTCPHeader.h"
19#include "datagramUDPHeader.h"
20#include "config_net.h"
21#include "trueClock.h"
22#include "socket_udp.h"
23#include "socket_tcp.h"
24#include "mutexHolder.h"
25#include "lightMutexHolder.h"
26#include "pnotify.h"
27#include "atomicAdjust.h"
28#include "config_downloader.h"
29
30using std::min;
31
32static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
33
34/**
35 *
36 */
37ConnectionReader::SocketInfo::
38SocketInfo(const PT(Connection) &connection) :
39 _connection(connection)
40{
41 _busy = false;
42 _error = false;
43}
44
45/**
46 *
47 */
48bool ConnectionReader::SocketInfo::
49is_udp() const {
50 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
51}
52
53/**
54 *
55 */
56Socket_IP *ConnectionReader::SocketInfo::
57get_socket() const {
58 return _connection->get_socket();
59}
60
61/**
62 *
63 */
64ConnectionReader::ReaderThread::
65ReaderThread(ConnectionReader *reader, const std::string &thread_name,
66 int thread_index) :
67 Thread(make_thread_name(thread_name, thread_index),
68 make_thread_name(thread_name, thread_index)),
69 _reader(reader),
70 _thread_index(thread_index)
71{
72}
73
74/**
75 *
76 */
77void ConnectionReader::ReaderThread::
78thread_main() {
79 _reader->thread_run(_thread_index);
80}
81
82/**
83 * Creates a new ConnectionReader with the indicated number of threads to
84 * handle requests. If num_threads is 0, the sockets will only be read by
85 * polling, during an explicit poll() call. (QueuedConnectionReader will do
86 * this automatically.)
87 */
89ConnectionReader(ConnectionManager *manager, int num_threads,
90 const std::string &thread_name) :
91 _manager(manager)
92{
94#ifndef NDEBUG
95 if (num_threads != 0) {
96 if (net_cat.is_debug()) {
97 net_cat.debug()
98 << "Threading support is not available.\n";
99 }
100 }
101#endif // NDEBUG
102 num_threads = 0;
103 }
104
105 _raw_mode = false;
106 _tcp_header_size = tcp_header_size;
107 _polling = (num_threads <= 0);
108
109 _shutdown = false;
110
111 _next_index = 0;
112 _num_results = 0;
113
114 _currently_polling_thread = -1;
115
116 std::string reader_thread_name = thread_name;
117 if (thread_name.empty()) {
118 reader_thread_name = "ReaderThread";
119 }
120 int i;
121 for (i = 0; i < num_threads; i++) {
122 PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
123 _threads.push_back(thread);
124 }
125 for (i = 0; i < num_threads; i++) {
126 _threads[i]->start(net_thread_priority, true);
127 }
128
129 _manager->add_reader(this);
130}
131
132/**
133 *
134 */
135ConnectionReader::
136~ConnectionReader() {
137 if (_manager != nullptr) {
138 _manager->remove_reader(this);
139 }
140
141 shutdown();
142
143 // Delete all of our old sockets.
144 Sockets::iterator si;
145 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
146 delete (*si);
147 }
148 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
149 SocketInfo *sinfo = (*si);
150 if (!sinfo->_busy) {
151 delete sinfo;
152 } else {
153 net_cat.error()
154 << "Reentrant deletion of ConnectionReader--don't delete these\n"
155 << "in response to connection_reset().\n";
156
157 // We'll have to do the best we can to recover.
158 sinfo->_connection.clear();
159 }
160 }
161}
162
163/**
164 * Adds a new socket to the list of sockets the ConnectionReader will monitor.
165 * A datagram that comes in on any of the monitored sockets will be reported.
166 * In the case of a ConnectionListener, this adds a new rendezvous socket; any
167 * activity on any of the monitored sockets will cause a connection to be
168 * accepted.
169 *
170 * The return value is true if the connection was added, false if it was
171 * already there.
172 *
173 * add_connection() is thread-safe, and may be called at will by any thread.
174 */
176add_connection(Connection *connection) {
177 nassertr(connection != nullptr, false);
178
179 LightMutexHolder holder(_sockets_mutex);
180
181 // Make sure it's not already on the _sockets list.
182 Sockets::const_iterator si;
183 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
184 if ((*si)->_connection == connection) {
185 // Whoops, already there.
186 return false;
187 }
188 }
189
190 _sockets.push_back(new SocketInfo(connection));
191
192 return true;
193}
194
195/**
196 * Removes a socket from the list of sockets being monitored. Returns true if
197 * the socket was correctly removed, false if it was not on the list in the
198 * first place.
199 *
200 * remove_connection() is thread-safe, and may be called at will by any
201 * thread.
202 */
204remove_connection(Connection *connection) {
205 LightMutexHolder holder(_sockets_mutex);
206
207 // Walk through the list of sockets to find the one we're removing.
208 Sockets::iterator si;
209 si = _sockets.begin();
210 while (si != _sockets.end() && (*si)->_connection != connection) {
211 ++si;
212 }
213 if (si == _sockets.end()) {
214 return false;
215 }
216
217 _removed_sockets.push_back(*si);
218 _sockets.erase(si);
219
220 return true;
221}
222
223/**
224 * Returns true if the indicated connection has been added to the
225 * ConnectionReader and is being monitored properly, false if it is not known,
226 * or if there was some error condition detected on the connection. (If there
227 * was an error condition, normally the ConnectionManager would have been
228 * informed and closed the connection.)
229 */
231is_connection_ok(Connection *connection) {
232 LightMutexHolder holder(_sockets_mutex);
233
234 // Walk through the list of sockets to find the one we're asking about.
235 Sockets::iterator si;
236 si = _sockets.begin();
237 while (si != _sockets.end() && (*si)->_connection != connection) {
238 ++si;
239 }
240 if (si == _sockets.end()) {
241 // Don't know that connection.
242 return false;
243 }
244
245 SocketInfo *sinfo = (*si);
246 bool is_ok = !sinfo->_error;
247
248 return is_ok;
249}
250
251/**
252 * Explicitly polls the available sockets to see if any of them have any
253 * noise. This function does nothing unless this is a polling-type
254 * ConnectionReader, i.e. it was created with zero threads (and is_polling()
255 * will return true).
256 *
257 * It is not necessary to call this explicitly for a QueuedConnectionReader.
258 */
260poll() {
261 if (!_polling) {
262 return;
263 }
264
265 SocketInfo *sinfo = get_next_available_socket(false, -2);
266 if (sinfo != nullptr) {
267 double max_poll_cycle = get_net_max_poll_cycle();
268 if (max_poll_cycle < 0.0) {
269 // Continue to read all data.
270 while (sinfo != nullptr) {
271 process_incoming_data(sinfo);
272 sinfo = get_next_available_socket(false, -2);
273 }
274
275 } else {
276 // Read only until a certain amount of time has elapsed.
277 TrueClock *global_clock = TrueClock::get_global_ptr();
278 double stop = global_clock->get_short_time() + max_poll_cycle;
279
280 while (sinfo != nullptr) {
281 process_incoming_data(sinfo);
282 if (global_clock->get_short_time() >= stop) {
283 return;
284 }
285 sinfo = get_next_available_socket(false, -2);
286 }
287 }
288 }
289}
290
291/**
292 * Returns a pointer to the ConnectionManager object that serves this
293 * ConnectionReader.
294 */
296get_manager() const {
297 return _manager;
298}
299
300/**
301 * Returns the number of threads the ConnectionReader has been created with.
302 */
304get_num_threads() const {
305 return _threads.size();
306}
307
308/**
309 * Sets the ConnectionReader into raw mode (or turns off raw mode). In raw
310 * mode, datagram headers are not expected; instead, all the data available on
311 * the pipe is treated as a single datagram.
312 *
313 * This is similar to set_tcp_header_size(0), except that it also turns off
314 * headers for UDP packets.
315 */
317set_raw_mode(bool mode) {
318 _raw_mode = mode;
319}
320
321/**
322 * Returns the current setting of the raw mode flag. See set_raw_mode().
323 */
325get_raw_mode() const {
326 return _raw_mode;
327}
328
329/**
330 * Sets the header size of TCP packets. At the present, legal values for this
331 * are 0, 2, or 4; this specifies the number of bytes to use encode the
332 * datagram length at the start of each TCP datagram. Sender and receiver
333 * must independently agree on this.
334 */
336set_tcp_header_size(int tcp_header_size) {
337 _tcp_header_size = tcp_header_size;
338}
339
340/**
341 * Returns the current setting of TCP header size. See set_tcp_header_size().
342 */
344get_tcp_header_size() const {
345 return _tcp_header_size;
346}
347
348/**
349 * Terminates all threads cleanly. Normally this is only called by the
350 * destructor, but it may be called explicitly before destruction.
351 */
353shutdown() {
354 if (_shutdown) {
355 return;
356 }
357
358 // First, begin the shutdown. This will tell our threads we want them to
359 // quit.
360 _shutdown = true;
361
362 // Now wait for all of our threads to terminate.
363 Threads::iterator ti;
364 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
365 (*ti)->join();
366 }
367}
368
369/**
370 * Attempts to read all the possible data from the indicated connection, which
371 * has just delivered a write error (and has therefore already been closed).
372 * If the connection is not monitered by this reader, does nothing.
373 */
374void ConnectionReader::
375flush_read_connection(Connection *connection) {
376 // Ensure it doesn't get deleted.
377 SocketInfo sinfo(connection);
378
379 if (!remove_connection(connection)) {
380 // Not already in the reader.
381 return;
382 }
383
384 // The connection was previously in the reader, but has now been removed.
385 // Now we can flush it completely. We check if there is any read data
386 // available on just this one socket; we can do this right here in this
387 // thread, since we've already removed this connection from the reader.
388
389 Socket_fdset fdset;
390 fdset.clear();
391 fdset.setForSocket(*(sinfo.get_socket()));
392 int num_results = fdset.WaitForRead(true, 0);
393 while (num_results != 0) {
394 sinfo._busy = true;
395 if (!process_incoming_data(&sinfo)) {
396 break;
397 }
398 fdset.setForSocket(*(sinfo.get_socket()));
399 num_results = fdset.WaitForRead(true, 0);
400 }
401}
402
403/**
404 * This should normally only be called when the associated ConnectionManager
405 * destructs. It resets the ConnectionManager pointer to NULL so we don't
406 * have a floating pointer. This makes the ConnectionReader invalid;
407 * presumably it also will be destructed momentarily.
408 */
409void ConnectionReader::
410clear_manager() {
411 _manager = nullptr;
412}
413
414/**
415 * To be called when a socket has been fully read and is ready for polling for
416 * additional data.
417 */
418void ConnectionReader::
419finish_socket(SocketInfo *sinfo) {
420 nassertv(sinfo->_busy);
421
422 // By marking the SocketInfo nonbusy, we make it available for future polls.
423 sinfo->_busy = false;
424}
425
426/**
427 * This is run within a thread when the call to select() indicates there is
428 * data available on a socket. Returns true if the data is read successfully,
429 * false on failure (for instance, because the connection is closed).
430 */
431bool ConnectionReader::
432process_incoming_data(SocketInfo *sinfo) {
433 if (_raw_mode) {
434 if (sinfo->is_udp()) {
435 return process_raw_incoming_udp_data(sinfo);
436 } else {
437 return process_raw_incoming_tcp_data(sinfo);
438 }
439 } else {
440 if (sinfo->is_udp()) {
441 return process_incoming_udp_data(sinfo);
442 } else {
443 return process_incoming_tcp_data(sinfo);
444 }
445 }
446}
447
448/**
449 *
450 */
451bool ConnectionReader::
452process_incoming_udp_data(SocketInfo *sinfo) {
453 Socket_UDP *socket;
454 DCAST_INTO_R(socket, sinfo->get_socket(), false);
455 Socket_Address addr;
456
457 // Read as many bytes as we can.
458 char buffer[read_buffer_size];
459 int bytes_read = read_buffer_size;
460
461 bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
462
463 if (!okflag) {
464 finish_socket(sinfo);
465 return false;
466
467 } else if (bytes_read == 0) {
468 // The socket was closed (!). This shouldn't happen with a UDP
469 // connection. Oh well. Report that and return.
470 if (_manager != nullptr) {
471 _manager->connection_reset(sinfo->_connection, 0);
472 }
473 finish_socket(sinfo);
474 return false;
475 }
476
477 // Since we are not running in raw mode, we decode the header to determine
478 // how big the datagram is. This means we must have read at least a full
479 // header.
480 if (bytes_read < datagram_udp_header_size) {
481 net_cat.error()
482 << "Did not read entire header, discarding UDP datagram.\n";
483 finish_socket(sinfo);
484 return true;
485 }
486
487 DatagramUDPHeader header(buffer);
488
489 char *dp = buffer + datagram_udp_header_size;
490 bytes_read -= datagram_udp_header_size;
491
492 NetDatagram datagram(dp, bytes_read);
493
494 // Now that we've read all the data, it's time to finish the socket so
495 // another thread can read the next datagram.
496 finish_socket(sinfo);
497
498 if (_shutdown) {
499 return false;
500 }
501
502 // And now do whatever we need to do to process the datagram.
503 if (!header.verify_datagram(datagram)) {
504 net_cat.error()
505 << "Ignoring invalid UDP datagram.\n";
506 } else {
507 datagram.set_connection(sinfo->_connection);
508 datagram.set_address(NetAddress(addr));
509
510 if (net_cat.is_spam()) {
511 net_cat.spam()
512 << "Received UDP datagram with "
513 << datagram_udp_header_size + datagram.get_length()
514 << " bytes on " << (void *)datagram.get_connection()
515 << " from " << datagram.get_address() << "\n";
516 }
517
518 receive_datagram(datagram);
519 }
520
521 return true;
522}
523
524/**
525 *
526 */
527bool ConnectionReader::
528process_incoming_tcp_data(SocketInfo *sinfo) {
529 Socket_TCP *socket;
530 DCAST_INTO_R(socket, sinfo->get_socket(), false);
531
532 // Read only the header bytes to start with.
533 char buffer[read_buffer_size];
534 int header_bytes_read = 0;
535
536 // First, we have to read the first _tcp_header_size bytes.
537 while (header_bytes_read < _tcp_header_size) {
538 int bytes_read =
539 socket->RecvData(buffer + header_bytes_read,
540 _tcp_header_size - header_bytes_read);
541#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
542 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
543 socket->Active()) {
545 bytes_read = socket->RecvData(buffer + header_bytes_read,
546 _tcp_header_size - header_bytes_read);
547 }
548#endif // SIMPLE_THREADS
549
550 if (bytes_read <= 0) {
551 // The socket was closed. Report that and return.
552 if (_manager != nullptr) {
553 _manager->connection_reset(sinfo->_connection, 0);
554 }
555 finish_socket(sinfo);
556 return false;
557 }
558
559 header_bytes_read += bytes_read;
561 }
562
563 // Now we must decode the header to determine how big the datagram is. This
564 // means we must have read at least a full header.
565 if (header_bytes_read != _tcp_header_size) {
566 // This should actually be impossible, by the read-loop logic above.
567 net_cat.error()
568 << "Did not read entire header, discarding TCP datagram.\n";
569 finish_socket(sinfo);
570 return true;
571 }
572
573 DatagramTCPHeader header(buffer, _tcp_header_size);
574 int size = header.get_datagram_size(_tcp_header_size);
575
576 // We have to loop until the entire datagram is read.
577 NetDatagram datagram;
578
579 while (!_shutdown && (int)datagram.get_length() < size) {
580 int bytes_read;
581
582 int read_bytes = read_buffer_size;
583#ifdef SIMPLE_THREADS
584 // In the SIMPLE_THREADS case, we want to limit the number of bytes we
585 // read in a single epoch, to minimize the impact on the other threads.
586 read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
587#endif
588
589 bytes_read =
590 socket->RecvData(buffer, min(read_bytes,
591 (int)(size - datagram.get_length())));
592#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
593 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
594 socket->Active()) {
596 bytes_read =
597 socket->RecvData(buffer, min(read_bytes,
598 (int)(size - datagram.get_length())));
599 }
600#endif // SIMPLE_THREADS
601
602 char *dp = buffer;
603
604 if (bytes_read <= 0) {
605 // The socket was closed. Report that and return.
606 if (_manager != nullptr) {
607 _manager->connection_reset(sinfo->_connection, 0);
608 }
609 finish_socket(sinfo);
610 return false;
611 }
612
613 int datagram_bytes =
614 min(bytes_read, (int)(size - datagram.get_length()));
615 datagram.append_data(dp, datagram_bytes);
616
617 if (bytes_read > datagram_bytes) {
618 // There were some extra bytes at the end of the datagram. Maybe the
619 // beginning of the next datagram? Huh.
620 net_cat.error()
621 << "Discarding " << bytes_read - datagram_bytes
622 << " bytes following TCP datagram.\n";
623 }
625 }
626
627 // Now that we've read all the data, it's time to finish the socket so
628 // another thread can read the next datagram.
629 finish_socket(sinfo);
630
631 if (_shutdown) {
632 return false;
633 }
634
635 // And now do whatever we need to do to process the datagram.
636 if (!header.verify_datagram(datagram, _tcp_header_size)) {
637 net_cat.error()
638 << "Ignoring invalid TCP datagram.\n";
639 } else {
640 datagram.set_connection(sinfo->_connection);
641 datagram.set_address(NetAddress(socket->GetPeerName()));
642
643 if (net_cat.is_spam()) {
644 net_cat.spam()
645 << "Received TCP datagram with "
646 << _tcp_header_size + datagram.get_length()
647 << " bytes on " << (void *)datagram.get_connection()
648 << " from " << datagram.get_address() << "\n";
649 }
650
651 receive_datagram(datagram);
652 }
653
654 return true;
655}
656
657/**
658 *
659 */
660bool ConnectionReader::
661process_raw_incoming_udp_data(SocketInfo *sinfo) {
662 Socket_UDP *socket;
663 DCAST_INTO_R(socket, sinfo->get_socket(), false);
664 Socket_Address addr;
665
666 // Read as many bytes as we can.
667 char buffer[read_buffer_size];
668 int bytes_read = read_buffer_size;
669
670 bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
671
672 if (!okflag) {
673 finish_socket(sinfo);
674 return false;
675
676 } else if (bytes_read == 0) {
677 // The socket was closed (!). This shouldn't happen with a UDP
678 // connection. Oh well. Report that and return.
679 if (_manager != nullptr) {
680 _manager->connection_reset(sinfo->_connection, 0);
681 }
682 finish_socket(sinfo);
683 return false;
684 }
685
686 // In raw mode, we simply extract all the bytes and make that a datagram.
687 NetDatagram datagram(buffer, bytes_read);
688
689 // Now that we've read all the data, it's time to finish the socket so
690 // another thread can read the next datagram.
691 finish_socket(sinfo);
692
693 if (_shutdown) {
694 return false;
695 }
696
697 datagram.set_connection(sinfo->_connection);
698 datagram.set_address(NetAddress(addr));
699
700 if (net_cat.is_spam()) {
701 net_cat.spam()
702 << "Received raw UDP datagram with " << datagram.get_length()
703 << " bytes on " << (void *)datagram.get_connection()
704 << " from " << datagram.get_address() << "\n";
705 }
706
707 receive_datagram(datagram);
708
709 return true;
710}
711
712/**
713 *
714 */
715bool ConnectionReader::
716process_raw_incoming_tcp_data(SocketInfo *sinfo) {
717 Socket_TCP *socket;
718 DCAST_INTO_R(socket, sinfo->get_socket(), false);
719
720 // Read as many bytes as we can.
721 char buffer[read_buffer_size];
722 int bytes_read = socket->RecvData(buffer, read_buffer_size);
723#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
724 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
725 socket->Active()) {
727 bytes_read = socket->RecvData(buffer, read_buffer_size);
728 }
729#endif // SIMPLE_THREADS
730
731 if (bytes_read <= 0) {
732 // The socket was closed. Report that and return.
733 if (_manager != nullptr) {
734 _manager->connection_reset(sinfo->_connection, 0);
735 }
736 finish_socket(sinfo);
737 return false;
738 }
739
740 // In raw mode, we simply extract all the bytes and make that a datagram.
741 NetDatagram datagram(buffer, bytes_read);
742
743 // Now that we've read all the data, it's time to finish the socket so
744 // another thread can read the next datagram.
745 finish_socket(sinfo);
746
747 if (_shutdown) {
748 return false;
749 }
750
751 datagram.set_connection(sinfo->_connection);
752 datagram.set_address(NetAddress(socket->GetPeerName()));
753
754 if (net_cat.is_spam()) {
755 net_cat.spam()
756 << "Received raw TCP datagram with " << datagram.get_length()
757 << " bytes on " << (void *)datagram.get_connection()
758 << " from " << datagram.get_address() << "\n";
759 }
760
761 receive_datagram(datagram);
762
763 return true;
764}
765
766/**
767 * This is the actual executing function for each thread.
768 */
769void ConnectionReader::
770thread_run(int thread_index) {
771 nassertv(!_polling);
772 nassertv(_threads[thread_index] == Thread::get_current_thread());
773
774 while (!_shutdown) {
775 SocketInfo *sinfo =
776 get_next_available_socket(true, thread_index);
777 if (sinfo != nullptr) {
778 process_incoming_data(sinfo);
780 } else {
782 }
783 }
784}
785
786
787/**
788 * Polls the known connections for activity and returns the next one known to
789 * have activity, or NULL if no activity is detected within the timeout
790 * interval.
791 *
792 * This function may block indefinitely if it is being called by multiple
793 * threads; if there are no other threads, it may block only if allow_block is
794 * true.
795 */
796ConnectionReader::SocketInfo *ConnectionReader::
797get_next_available_socket(bool allow_block, int current_thread_index) {
798 // Go to sleep on the select() mutex. This guarantees that only one thread
799 // is in this function at a time.
800 MutexHolder holder(_select_mutex);
801
802 do {
803 // First, check the result from the previous select call. If there are
804 // any sockets remaining there, process them first.
805 while (!_shutdown && _num_results > 0) {
806 nassertr(_next_index < (int)_selecting_sockets.size(), nullptr);
807 int i = _next_index;
808 _next_index++;
809
810 if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
811 _num_results--;
812 SocketInfo *sinfo = _selecting_sockets[i];
813
814 // Some noise on this socket.
815 sinfo->_busy = true;
816 return sinfo;
817 }
818 }
819
820 bool interrupted;
821 do {
822 interrupted = false;
823
824 // Ok, no results from previous select calls. Prepare to set up for a
825 // new select.
826
827 // First, report to anyone else who cares that we're the thread about to
828 // do the poll. That way, if any new sockets come available while we're
829 // polling, we can service them.
830 AtomicAdjust::set(_currently_polling_thread, current_thread_index);
831
832 rebuild_select_list();
833
834 // Now we can execute the select.
835 _num_results = 0;
836 _next_index = 0;
837
838 if (!_shutdown) {
839 uint32_t timeout = (uint32_t)(get_net_max_block() * 1000.0);
840 if (!allow_block) {
841 timeout = 0;
842 }
843#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
844 // In the presence of SIMPLE_THREADS, we never wait at all, but rather
845 // we yield the thread if we come up empty (so that we won't block the
846 // entire process).
847 timeout = 0;
848#endif
849
850 _num_results = _fdset.WaitForRead(false, timeout);
851 }
852
853 if (_num_results == 0 && allow_block) {
854 // If we reached net_max_block, go back and reconsider. (We never
855 // timeout indefinitely, so we can check the shutdown flag every once
856 // in a while.)
857 interrupted = true;
859
860 } else if (_num_results < 0) {
861 // If we had an error, just return. But yield the timeslice first.
863 return nullptr;
864 }
865 } while (!_shutdown && interrupted);
866
867 AtomicAdjust::set(_currently_polling_thread, current_thread_index);
868
869 // Repeat the above until we (a) find a socket with actual noise on it, or
870 // (b) return from PR_Poll() with no sockets available.
871 } while (!_shutdown && _num_results > 0);
872
873 return nullptr;
874}
875
876
877/**
878 * Rebuilds the _fdset and _selecting_sockets arrays based on the sockets that
879 * are currently available for selecting.
880 */
881void ConnectionReader::
882rebuild_select_list() {
883 _fdset.clear();
884 _selecting_sockets.clear();
885
886 LightMutexHolder holder(_sockets_mutex);
887 Sockets::const_iterator si;
888 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
889 SocketInfo *sinfo = (*si);
890 if (!sinfo->_busy && !sinfo->_error) {
891 _fdset.setForSocket(*sinfo->get_socket());
892 _selecting_sockets.push_back(sinfo);
893 }
894 }
895
896 // This is also a fine time to delete the contents of the _removed_sockets
897 // list.
898 if (!_removed_sockets.empty()) {
899 Sockets still_busy_sockets;
900 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
901 SocketInfo *sinfo = (*si);
902 if (sinfo->_busy) {
903 still_busy_sockets.push_back(sinfo);
904 } else {
905 delete sinfo;
906 }
907 }
908 _removed_sockets.swap(still_busy_sockets);
909 }
910}
911
912/**
913 * Adds the sockets from this ConnectionReader (or ConnectionListener) to the
914 * indicated fdset. This is used by ConnectionManager::block() to build an
915 * fdset of all attached readers.
916 */
917void ConnectionReader::
918accumulate_fdset(Socket_fdset &fdset) {
919 LightMutexHolder holder(_sockets_mutex);
920 Sockets::const_iterator si;
921 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
922 SocketInfo *sinfo = (*si);
923 if (!sinfo->_busy && !sinfo->_error) {
924 fdset.setForSocket(*sinfo->get_socket());
925 }
926 }
927}
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
The primary interface to the low-level networking layer in this package.
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
ConnectionReader(ConnectionManager *manager, int num_threads, const std::string &thread_name=std::string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
void shutdown()
Terminates all threads cleanly.
Represents a single TCP or UDP socket for input or output.
Definition connection.h:29
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
size_t get_length() const
Returns the number of bytes in the datagram.
Definition datagram.I:335
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
Definition datagram.cxx:129
Similar to MutexHolder, but for a light mutex.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition mutexHolder.h:25
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition netAddress.h:25
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition netDatagram.h:40
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
A simple place to store and manipulate tcp and port address for communication layer.
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
Definition socket_ip.h:27
bool Active()
Ask if the socket is open (allocated)
Definition socket_ip.h:96
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition socket_ip.h:253
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition socket_tcp.h:12
std::string RecvData(int max_len)
Read the data from the connection - if error 0 if socket closed for read or length is 0 + bytes read ...
Definition socket_tcp.h:185
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
Base functionality for a combination UDP Reader and Writer.
Definition socket_udp.h:24
void clear()
Marks the content as empty.
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
A thread; that is, a lightweight process.
Definition thread.h:46
is_threading_supported
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition thread.h:112
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition thread.I:212
get_current_thread
Returns a pointer to the currently-executing Thread object.
Definition thread.h:109
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition thread.I:201
An interface to whatever real-time clock we might have available in the current environment.
Definition trueClock.h:33
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition trueClock.I:68
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.