15 #include "connectionReader.h" 17 #include "connectionManager.h" 18 #include "netDatagram.h" 19 #include "datagramTCPHeader.h" 20 #include "datagramUDPHeader.h" 21 #include "config_net.h" 22 #include "trueClock.h" 23 #include "socket_udp.h" 24 #include "socket_tcp.h" 25 #include "mutexHolder.h" 26 #include "lightMutexHolder.h" 28 #include "atomicAdjust.h" 29 #include "config_downloader.h" 31 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
38 ConnectionReader::SocketInfo::
40 _connection(connection)
51 bool ConnectionReader::SocketInfo::
53 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
63 return _connection->get_socket();
71 ConnectionReader::ReaderThread::
74 Thread(make_thread_name(thread_name, thread_index),
75 make_thread_name(thread_name, thread_index)),
77 _thread_index(thread_index)
86 void ConnectionReader::ReaderThread::
88 _reader->thread_run(_thread_index);
102 const string &thread_name) :
107 if (num_threads != 0) {
108 if (net_cat.is_debug()) {
110 <<
"Threading support is not available.\n";
118 _tcp_header_size = tcp_header_size;
119 _polling = (num_threads <= 0);
126 _currently_polling_thread = -1;
128 string reader_thread_name = thread_name;
129 if (thread_name.empty()) {
130 reader_thread_name =
"ReaderThread";
133 for (i = 0; i < num_threads; i++) {
134 PT(ReaderThread) thread =
new ReaderThread(
this, reader_thread_name, i);
135 _threads.push_back(thread);
137 for (i = 0; i < num_threads; i++) {
138 _threads[i]->start(net_thread_priority,
true);
141 _manager->add_reader(
this);
150 ~ConnectionReader() {
152 _manager->remove_reader(
this);
158 Sockets::iterator si;
159 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
162 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
163 SocketInfo *sinfo = (*si);
168 <<
"Reentrant deletion of ConnectionReader--don't delete these\n" 169 <<
"in response to connection_reset().\n";
172 sinfo->_connection.clear();
196 nassertr(connection != (
Connection *)NULL,
false);
201 Sockets::const_iterator si;
202 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
203 if ((*si)->_connection == connection) {
209 _sockets.push_back(
new SocketInfo(connection));
230 Sockets::iterator si;
231 si = _sockets.begin();
232 while (si != _sockets.end() && (*si)->_connection != connection) {
235 if (si == _sockets.end()) {
239 _removed_sockets.push_back(*si);
262 Sockets::iterator si;
263 si = _sockets.begin();
264 while (si != _sockets.end() && (*si)->_connection != connection) {
267 if (si == _sockets.end()) {
272 SocketInfo *sinfo = (*si);
273 bool is_ok = !sinfo->_error;
296 SocketInfo *sinfo = get_next_available_socket(
false, -2);
297 if (sinfo != (SocketInfo *)NULL) {
298 double max_poll_cycle = get_net_max_poll_cycle();
299 if (max_poll_cycle < 0.0) {
301 while (sinfo != (SocketInfo *)NULL) {
302 process_incoming_data(sinfo);
303 sinfo = get_next_available_socket(
false, -2);
309 double stop = global_clock->get_short_time() + max_poll_cycle;
311 while (sinfo != (SocketInfo *)NULL) {
312 process_incoming_data(sinfo);
313 if (global_clock->get_short_time() >= stop) {
316 sinfo = get_next_available_socket(
false, -2);
341 return _threads.size();
382 _tcp_header_size = tcp_header_size;
393 return _tcp_header_size;
414 Threads::iterator ti;
415 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
429 void ConnectionReader::
430 flush_read_connection(
Connection *connection) {
432 SocketInfo sinfo(connection);
447 fdset.setForSocket(*(sinfo.get_socket()));
448 int num_results = fdset.WaitForRead(
true, 0);
449 while (num_results != 0) {
451 if (!process_incoming_data(&sinfo)) {
454 fdset.setForSocket(*(sinfo.get_socket()));
455 num_results = fdset.WaitForRead(
true, 0);
469 void ConnectionReader::
480 void ConnectionReader::
481 finish_socket(SocketInfo *sinfo) {
482 nassertv(sinfo->_busy);
486 sinfo->_busy =
false;
498 bool ConnectionReader::
499 process_incoming_data(SocketInfo *sinfo) {
501 if (sinfo->is_udp()) {
502 return process_raw_incoming_udp_data(sinfo);
504 return process_raw_incoming_tcp_data(sinfo);
507 if (sinfo->is_udp()) {
508 return process_incoming_udp_data(sinfo);
510 return process_incoming_tcp_data(sinfo);
520 bool ConnectionReader::
521 process_incoming_udp_data(SocketInfo *sinfo) {
523 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
527 char buffer[read_buffer_size];
528 int bytes_read = read_buffer_size;
530 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
533 finish_socket(sinfo);
536 }
else if (bytes_read == 0) {
540 _manager->connection_reset(sinfo->_connection, 0);
542 finish_socket(sinfo);
549 if (bytes_read < datagram_udp_header_size) {
551 <<
"Did not read entire header, discarding UDP datagram.\n";
552 finish_socket(sinfo);
558 char *dp = buffer + datagram_udp_header_size;
559 bytes_read -= datagram_udp_header_size;
565 finish_socket(sinfo);
574 <<
"Ignoring invalid UDP datagram.\n";
579 if (net_cat.is_spam()) {
581 <<
"Received UDP datagram with " 582 << datagram_udp_header_size + datagram.
get_length()
583 <<
" bytes on " << (
void *)datagram.get_connection()
587 receive_datagram(datagram);
598 bool ConnectionReader::
599 process_incoming_tcp_data(SocketInfo *sinfo) {
601 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
604 char buffer[read_buffer_size];
605 int header_bytes_read = 0;
608 while (header_bytes_read < _tcp_header_size) {
610 socket->
RecvData(buffer + header_bytes_read,
611 _tcp_header_size - header_bytes_read);
612 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 613 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
616 bytes_read = socket->
RecvData(buffer + header_bytes_read,
617 _tcp_header_size - header_bytes_read);
619 #endif // SIMPLE_THREADS 621 if (bytes_read <= 0) {
624 _manager->connection_reset(sinfo->_connection, 0);
626 finish_socket(sinfo);
630 header_bytes_read += bytes_read;
636 if (header_bytes_read != _tcp_header_size) {
640 <<
"Did not read entire header, discarding TCP datagram.\n";
641 finish_socket(sinfo);
651 while (!_shutdown && (
int)datagram.
get_length() < size) {
654 int read_bytes = read_buffer_size;
655 #ifdef SIMPLE_THREADS 659 read_bytes = min(read_buffer_size, (
int)net_max_read_per_epoch);
663 socket->
RecvData(buffer, min(read_bytes,
665 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 666 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
670 socket->
RecvData(buffer, min(read_bytes,
673 #endif // SIMPLE_THREADS 677 if (bytes_read <= 0) {
680 _manager->connection_reset(sinfo->_connection, 0);
682 finish_socket(sinfo);
687 min(bytes_read, (
int)(size - datagram.
get_length()));
690 if (bytes_read > datagram_bytes) {
694 <<
"Discarding " << bytes_read - datagram_bytes
695 <<
" bytes following TCP datagram.\n";
702 finish_socket(sinfo);
711 <<
"Ignoring invalid TCP datagram.\n";
716 if (net_cat.is_spam()) {
718 <<
"Received TCP datagram with " 720 <<
" bytes on " << (
void *)datagram.get_connection()
724 receive_datagram(datagram);
735 bool ConnectionReader::
736 process_raw_incoming_udp_data(SocketInfo *sinfo) {
738 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
742 char buffer[read_buffer_size];
743 int bytes_read = read_buffer_size;
745 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
748 finish_socket(sinfo);
751 }
else if (bytes_read == 0) {
755 _manager->connection_reset(sinfo->_connection, 0);
757 finish_socket(sinfo);
767 finish_socket(sinfo);
776 if (net_cat.is_spam()) {
778 <<
"Received raw UDP datagram with " << datagram.
get_length()
779 <<
" bytes on " << (
void *)datagram.get_connection()
783 receive_datagram(datagram);
793 bool ConnectionReader::
794 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
796 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
799 char buffer[read_buffer_size];
800 int bytes_read = socket->
RecvData(buffer, read_buffer_size);
801 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 802 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
805 bytes_read = socket->
RecvData(buffer, read_buffer_size);
807 #endif // SIMPLE_THREADS 809 if (bytes_read <= 0) {
812 _manager->connection_reset(sinfo->_connection, 0);
814 finish_socket(sinfo);
824 finish_socket(sinfo);
833 if (net_cat.is_spam()) {
835 <<
"Received raw TCP datagram with " << datagram.
get_length()
836 <<
" bytes on " << (
void *)datagram.get_connection()
840 receive_datagram(datagram);
851 void ConnectionReader::
852 thread_run(
int thread_index) {
858 get_next_available_socket(
true, thread_index);
859 if (sinfo != (SocketInfo *)NULL) {
860 process_incoming_data(sinfo);
880 ConnectionReader::SocketInfo *ConnectionReader::
881 get_next_available_socket(
bool allow_block,
int current_thread_index) {
889 while (!_shutdown && _num_results > 0) {
890 nassertr(_next_index < (
int)_selecting_sockets.size(), NULL);
894 if (_fdset.
IsSetFor(*_selecting_sockets[i]->get_socket())) {
896 SocketInfo *sinfo = _selecting_sockets[i];
916 rebuild_select_list();
923 PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0);
927 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 934 _num_results = _fdset.WaitForRead(
false, timeout);
937 if (_num_results == 0 && allow_block) {
944 }
else if (_num_results < 0) {
948 return (SocketInfo *)NULL;
950 }
while (!_shutdown && interrupted);
956 }
while (!_shutdown && _num_results > 0);
958 return (SocketInfo *)NULL;
969 void ConnectionReader::
970 rebuild_select_list() {
972 _selecting_sockets.clear();
975 Sockets::const_iterator si;
976 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
977 SocketInfo *sinfo = (*si);
978 if (!sinfo->_busy && !sinfo->_error) {
979 _fdset.setForSocket(*sinfo->get_socket());
980 _selecting_sockets.push_back(sinfo);
986 if (!_removed_sockets.empty()) {
988 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
989 SocketInfo *sinfo = (*si);
991 still_busy_sockets.push_back(sinfo);
996 _removed_sockets.swap(still_busy_sockets);
1008 void ConnectionReader::
1011 Sockets::const_iterator si;
1012 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
1013 SocketInfo *sinfo = (*si);
1014 if (!sinfo->_busy && !sinfo->_error) {
1015 fdset.setForSocket(*sinfo->get_socket());
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
A specific kind of Datagram, especially for sending across or receiving from a network.
Base functionality for a INET domain Socket this call should be the starting point for all other unix...
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
std::string RecvData(int max_len)
Read the data from the connection.
ConnectionReader(ConnectionManager *manager, int num_threads, const string &thread_name=string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
The primary interface to the low-level networking layer in this package.
void shutdown()
Terminates all threads cleanly.
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
bool Active()
Ask if the socket is open (allocated)
static void force_yield()
Suspends the current thread for the rest of the current epoch.
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
This is our own Panda specialization on the default STL vector.
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent...
static bool is_threading_supported()
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
Base functionality for a combination UDP Reader and Writer.
Similar to MutexHolder, but for a light mutex.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
An interface to whatever real-time clock we might have available in the current environment.
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
A simple place to store and munipulate tcp and port address for communication layer.
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
A thread; that is, a lightweight process.
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
void clear()
Marks the content as empty.
Represents a single TCP or UDP socket for input or output.
size_t get_length() const
Returns the number of bytes in the datagram.
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound...