23 #include "socket_tcp.h"
32 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
37 ConnectionReader::SocketInfo::
39 _connection(connection)
48 bool ConnectionReader::SocketInfo::
50 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
58 return _connection->get_socket();
64 ConnectionReader::ReaderThread::
67 Thread(make_thread_name(thread_name, thread_index),
68 make_thread_name(thread_name, thread_index)),
70 _thread_index(thread_index)
77 void ConnectionReader::ReaderThread::
79 _reader->thread_run(_thread_index);
90 const std::string &thread_name) :
95 if (num_threads != 0) {
96 if (net_cat.is_debug()) {
98 <<
"Threading support is not available.\n";
106 _tcp_header_size = tcp_header_size;
107 _polling = (num_threads <= 0);
114 _currently_polling_thread = -1;
116 std::string reader_thread_name = thread_name;
117 if (thread_name.empty()) {
118 reader_thread_name =
"ReaderThread";
121 for (i = 0; i < num_threads; i++) {
122 PT(ReaderThread) thread =
new ReaderThread(
this, reader_thread_name, i);
123 _threads.push_back(thread);
125 for (i = 0; i < num_threads; i++) {
126 _threads[i]->start(net_thread_priority,
true);
129 _manager->add_reader(
this);
136 ~ConnectionReader() {
137 if (_manager !=
nullptr) {
138 _manager->remove_reader(
this);
144 Sockets::iterator si;
145 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
148 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
149 SocketInfo *sinfo = (*si);
154 <<
"Reentrant deletion of ConnectionReader--don't delete these\n"
155 <<
"in response to connection_reset().\n";
158 sinfo->_connection.clear();
177 nassertr(connection !=
nullptr,
false);
182 Sockets::const_iterator si;
183 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
184 if ((*si)->_connection == connection) {
190 _sockets.push_back(
new SocketInfo(connection));
208 Sockets::iterator si;
209 si = _sockets.begin();
210 while (si != _sockets.end() && (*si)->_connection != connection) {
213 if (si == _sockets.end()) {
217 _removed_sockets.push_back(*si);
235 Sockets::iterator si;
236 si = _sockets.begin();
237 while (si != _sockets.end() && (*si)->_connection != connection) {
240 if (si == _sockets.end()) {
245 SocketInfo *sinfo = (*si);
246 bool is_ok = !sinfo->_error;
265 SocketInfo *sinfo = get_next_available_socket(
false, -2);
266 if (sinfo !=
nullptr) {
267 double max_poll_cycle = get_net_max_poll_cycle();
268 if (max_poll_cycle < 0.0) {
270 while (sinfo !=
nullptr) {
271 process_incoming_data(sinfo);
272 sinfo = get_next_available_socket(
false, -2);
278 double stop = global_clock->get_short_time() + max_poll_cycle;
280 while (sinfo !=
nullptr) {
281 process_incoming_data(sinfo);
282 if (global_clock->get_short_time() >= stop) {
285 sinfo = get_next_available_socket(
false, -2);
305 return _threads.size();
337 _tcp_header_size = tcp_header_size;
345 return _tcp_header_size;
363 Threads::iterator ti;
364 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
374 void ConnectionReader::
375 flush_read_connection(
Connection *connection) {
377 SocketInfo sinfo(connection);
391 fdset.setForSocket(*(sinfo.get_socket()));
392 int num_results = fdset.WaitForRead(
true, 0);
393 while (num_results != 0) {
395 if (!process_incoming_data(&sinfo)) {
398 fdset.setForSocket(*(sinfo.get_socket()));
399 num_results = fdset.WaitForRead(
true, 0);
409 void ConnectionReader::
418 void ConnectionReader::
419 finish_socket(SocketInfo *sinfo) {
420 nassertv(sinfo->_busy);
423 sinfo->_busy =
false;
431 bool ConnectionReader::
432 process_incoming_data(SocketInfo *sinfo) {
434 if (sinfo->is_udp()) {
435 return process_raw_incoming_udp_data(sinfo);
437 return process_raw_incoming_tcp_data(sinfo);
440 if (sinfo->is_udp()) {
441 return process_incoming_udp_data(sinfo);
443 return process_incoming_tcp_data(sinfo);
451 bool ConnectionReader::
452 process_incoming_udp_data(SocketInfo *sinfo) {
454 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
458 char buffer[read_buffer_size];
459 int bytes_read = read_buffer_size;
461 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
464 finish_socket(sinfo);
467 }
else if (bytes_read == 0) {
470 if (_manager !=
nullptr) {
471 _manager->connection_reset(sinfo->_connection, 0);
473 finish_socket(sinfo);
480 if (bytes_read < datagram_udp_header_size) {
482 <<
"Did not read entire header, discarding UDP datagram.\n";
483 finish_socket(sinfo);
489 char *dp = buffer + datagram_udp_header_size;
490 bytes_read -= datagram_udp_header_size;
496 finish_socket(sinfo);
503 if (!header.verify_datagram(datagram)) {
505 <<
"Ignoring invalid UDP datagram.\n";
507 datagram.set_connection(sinfo->_connection);
510 if (net_cat.is_spam()) {
512 <<
"Received UDP datagram with "
513 << datagram_udp_header_size + datagram.get_length()
514 <<
" bytes on " << (
void *)datagram.get_connection()
515 <<
" from " << datagram.get_address() <<
"\n";
518 receive_datagram(datagram);
527 bool ConnectionReader::
528 process_incoming_tcp_data(SocketInfo *sinfo) {
530 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
533 char buffer[read_buffer_size];
534 int header_bytes_read = 0;
537 while (header_bytes_read < _tcp_header_size) {
539 socket->
RecvData(buffer + header_bytes_read,
540 _tcp_header_size - header_bytes_read);
541 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
542 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
545 bytes_read = socket->
RecvData(buffer + header_bytes_read,
546 _tcp_header_size - header_bytes_read);
550 if (bytes_read <= 0) {
552 if (_manager !=
nullptr) {
553 _manager->connection_reset(sinfo->_connection, 0);
555 finish_socket(sinfo);
559 header_bytes_read += bytes_read;
565 if (header_bytes_read != _tcp_header_size) {
568 <<
"Did not read entire header, discarding TCP datagram.\n";
569 finish_socket(sinfo);
574 int size = header.get_datagram_size(_tcp_header_size);
579 while (!_shutdown && (
int)datagram.
get_length() < size) {
582 int read_bytes = read_buffer_size;
583 #ifdef SIMPLE_THREADS
586 read_bytes = min(read_buffer_size, (
int)net_max_read_per_epoch);
590 socket->
RecvData(buffer, min(read_bytes,
592 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
593 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
597 socket->
RecvData(buffer, min(read_bytes,
604 if (bytes_read <= 0) {
606 if (_manager !=
nullptr) {
607 _manager->connection_reset(sinfo->_connection, 0);
609 finish_socket(sinfo);
614 min(bytes_read, (
int)(size - datagram.
get_length()));
617 if (bytes_read > datagram_bytes) {
621 <<
"Discarding " << bytes_read - datagram_bytes
622 <<
" bytes following TCP datagram.\n";
629 finish_socket(sinfo);
636 if (!header.verify_datagram(datagram, _tcp_header_size)) {
638 <<
"Ignoring invalid TCP datagram.\n";
643 if (net_cat.is_spam()) {
645 <<
"Received TCP datagram with "
647 <<
" bytes on " << (
void *)datagram.get_connection()
651 receive_datagram(datagram);
660 bool ConnectionReader::
661 process_raw_incoming_udp_data(SocketInfo *sinfo) {
663 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
667 char buffer[read_buffer_size];
668 int bytes_read = read_buffer_size;
670 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
673 finish_socket(sinfo);
676 }
else if (bytes_read == 0) {
679 if (_manager !=
nullptr) {
680 _manager->connection_reset(sinfo->_connection, 0);
682 finish_socket(sinfo);
691 finish_socket(sinfo);
700 if (net_cat.is_spam()) {
702 <<
"Received raw UDP datagram with " << datagram.
get_length()
703 <<
" bytes on " << (
void *)datagram.get_connection()
707 receive_datagram(datagram);
715 bool ConnectionReader::
716 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
718 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
721 char buffer[read_buffer_size];
722 int bytes_read = socket->
RecvData(buffer, read_buffer_size);
723 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
724 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
727 bytes_read = socket->
RecvData(buffer, read_buffer_size);
731 if (bytes_read <= 0) {
733 if (_manager !=
nullptr) {
734 _manager->connection_reset(sinfo->_connection, 0);
736 finish_socket(sinfo);
745 finish_socket(sinfo);
754 if (net_cat.is_spam()) {
756 <<
"Received raw TCP datagram with " << datagram.
get_length()
757 <<
" bytes on " << (
void *)datagram.get_connection()
761 receive_datagram(datagram);
769 void ConnectionReader::
770 thread_run(
int thread_index) {
776 get_next_available_socket(
true, thread_index);
777 if (sinfo !=
nullptr) {
778 process_incoming_data(sinfo);
796 ConnectionReader::SocketInfo *ConnectionReader::
797 get_next_available_socket(
bool allow_block,
int current_thread_index) {
805 while (!_shutdown && _num_results > 0) {
806 nassertr(_next_index < (
int)_selecting_sockets.size(),
nullptr);
810 if (_fdset.
IsSetFor(*_selecting_sockets[i]->get_socket())) {
812 SocketInfo *sinfo = _selecting_sockets[i];
832 rebuild_select_list();
839 uint32_t timeout = (uint32_t)(get_net_max_block() * 1000.0);
843 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
850 _num_results = _fdset.WaitForRead(
false, timeout);
853 if (_num_results == 0 && allow_block) {
860 }
else if (_num_results < 0) {
865 }
while (!_shutdown && interrupted);
871 }
while (!_shutdown && _num_results > 0);
881 void ConnectionReader::
882 rebuild_select_list() {
884 _selecting_sockets.clear();
887 Sockets::const_iterator si;
888 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
889 SocketInfo *sinfo = (*si);
890 if (!sinfo->_busy && !sinfo->_error) {
891 _fdset.setForSocket(*sinfo->get_socket());
892 _selecting_sockets.push_back(sinfo);
898 if (!_removed_sockets.empty()) {
899 Sockets still_busy_sockets;
900 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
901 SocketInfo *sinfo = (*si);
903 still_busy_sockets.push_back(sinfo);
908 _removed_sockets.swap(still_busy_sockets);
917 void ConnectionReader::
920 Sockets::const_iterator si;
921 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
922 SocketInfo *sinfo = (*si);
923 if (!sinfo->_busy && !sinfo->_error) {
924 fdset.setForSocket(*sinfo->get_socket());
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
The primary interface to the low-level networking layer in this package.
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
ConnectionReader(ConnectionManager *manager, int num_threads, const std::string &thread_name=std::string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
void shutdown()
Terminates all threads cleanly.
Represents a single TCP or UDP socket for input or output.
size_t get_length() const
Returns the number of bytes in the datagram.
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
Similar to MutexHolder, but for a light mutex.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
A specific kind of Datagram, especially for sending across or receiving from a network.
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
A simple place to store and manipulate tcp and port address for communication layer.
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
bool Active()
Ask if the socket is open (allocated)
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
std::string RecvData(int max_len)
Read the data from the connection - if error 0 if socket closed for read or length is 0 + bytes read ...
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
Base functionality for a combination UDP Reader and Writer.
void clear()
Marks the content as empty.
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
A thread; that is, a lightweight process.
is_threading_supported
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
get_current_thread
Returns a pointer to the currently-executing Thread object.
static void force_yield()
Suspends the current thread for the rest of the current epoch.
An interface to whatever real-time clock we might have available in the current environment.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.