15 #include "connectionReader.h"
17 #include "connectionManager.h"
18 #include "netDatagram.h"
19 #include "datagramTCPHeader.h"
20 #include "datagramUDPHeader.h"
21 #include "config_net.h"
22 #include "trueClock.h"
23 #include "socket_udp.h"
24 #include "socket_tcp.h"
25 #include "mutexHolder.h"
26 #include "lightMutexHolder.h"
28 #include "atomicAdjust.h"
29 #include "config_downloader.h"
31 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
38 ConnectionReader::SocketInfo::
40 _connection(connection)
51 bool ConnectionReader::SocketInfo::
53 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
63 return _connection->get_socket();
71 ConnectionReader::ReaderThread::
74 Thread(make_thread_name(thread_name, thread_index),
75 make_thread_name(thread_name, thread_index)),
77 _thread_index(thread_index)
86 void ConnectionReader::ReaderThread::
88 _reader->thread_run(_thread_index);
102 const string &thread_name) :
107 if (num_threads != 0) {
108 if (net_cat.is_debug()) {
110 <<
"Threading support is not available.\n";
118 _tcp_header_size = tcp_header_size;
119 _polling = (num_threads <= 0);
126 _currently_polling_thread = -1;
128 string reader_thread_name = thread_name;
129 if (thread_name.empty()) {
130 reader_thread_name =
"ReaderThread";
133 for (i = 0; i < num_threads; i++) {
134 PT(ReaderThread) thread =
new ReaderThread(
this, reader_thread_name, i);
135 _threads.push_back(thread);
137 for (i = 0; i < num_threads; i++) {
138 _threads[i]->start(net_thread_priority,
true);
141 _manager->add_reader(
this);
150 ~ConnectionReader() {
152 _manager->remove_reader(
this);
158 Sockets::iterator si;
159 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
162 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
163 SocketInfo *sinfo = (*si);
168 <<
"Reentrant deletion of ConnectionReader--don't delete these\n"
169 <<
"in response to connection_reset().\n";
172 sinfo->_connection.clear();
196 nassertr(connection != (
Connection *)NULL,
false);
201 Sockets::const_iterator si;
202 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
203 if ((*si)->_connection == connection) {
209 _sockets.push_back(
new SocketInfo(connection));
230 Sockets::iterator si;
231 si = _sockets.begin();
232 while (si != _sockets.end() && (*si)->_connection != connection) {
235 if (si == _sockets.end()) {
239 _removed_sockets.push_back(*si);
262 Sockets::iterator si;
263 si = _sockets.begin();
264 while (si != _sockets.end() && (*si)->_connection != connection) {
267 if (si == _sockets.end()) {
272 SocketInfo *sinfo = (*si);
273 bool is_ok = !sinfo->_error;
296 SocketInfo *sinfo = get_next_available_socket(
false, -2);
297 if (sinfo != (SocketInfo *)NULL) {
298 double max_poll_cycle = get_net_max_poll_cycle();
299 if (max_poll_cycle < 0.0) {
301 while (sinfo != (SocketInfo *)NULL) {
302 process_incoming_data(sinfo);
303 sinfo = get_next_available_socket(
false, -2);
309 double stop = global_clock->get_short_time() + max_poll_cycle;
311 while (sinfo != (SocketInfo *)NULL) {
312 process_incoming_data(sinfo);
313 if (global_clock->get_short_time() >= stop) {
316 sinfo = get_next_available_socket(
false, -2);
341 return _threads.size();
382 _tcp_header_size = tcp_header_size;
393 return _tcp_header_size;
414 Threads::iterator ti;
415 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
429 void ConnectionReader::
430 flush_read_connection(
Connection *connection) {
432 SocketInfo sinfo(connection);
447 fdset.setForSocket(*(sinfo.get_socket()));
448 int num_results = fdset.WaitForRead(
true, 0);
449 while (num_results != 0) {
451 if (!process_incoming_data(&sinfo)) {
454 fdset.setForSocket(*(sinfo.get_socket()));
455 num_results = fdset.WaitForRead(
true, 0);
469 void ConnectionReader::
480 void ConnectionReader::
481 finish_socket(SocketInfo *sinfo) {
482 nassertv(sinfo->_busy);
486 sinfo->_busy =
false;
498 bool ConnectionReader::
499 process_incoming_data(SocketInfo *sinfo) {
501 if (sinfo->is_udp()) {
502 return process_raw_incoming_udp_data(sinfo);
504 return process_raw_incoming_tcp_data(sinfo);
507 if (sinfo->is_udp()) {
508 return process_incoming_udp_data(sinfo);
510 return process_incoming_tcp_data(sinfo);
520 bool ConnectionReader::
521 process_incoming_udp_data(SocketInfo *sinfo) {
523 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
527 char buffer[read_buffer_size];
528 int bytes_read = read_buffer_size;
530 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
533 finish_socket(sinfo);
536 }
else if (bytes_read == 0) {
540 _manager->connection_reset(sinfo->_connection, 0);
542 finish_socket(sinfo);
549 if (bytes_read < datagram_udp_header_size) {
551 <<
"Did not read entire header, discarding UDP datagram.\n";
552 finish_socket(sinfo);
558 char *dp = buffer + datagram_udp_header_size;
559 bytes_read -= datagram_udp_header_size;
565 finish_socket(sinfo);
572 if (!header.verify_datagram(datagram)) {
574 <<
"Ignoring invalid UDP datagram.\n";
576 datagram.set_connection(sinfo->_connection);
579 if (net_cat.is_spam()) {
581 <<
"Received UDP datagram with "
582 << datagram_udp_header_size + datagram.get_length()
583 <<
" bytes on " << (
void *)datagram.get_connection()
584 <<
" from " << datagram.get_address() <<
"\n";
587 receive_datagram(datagram);
598 bool ConnectionReader::
599 process_incoming_tcp_data(SocketInfo *sinfo) {
601 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
604 char buffer[read_buffer_size];
605 int header_bytes_read = 0;
608 while (header_bytes_read < _tcp_header_size) {
610 socket->
RecvData(buffer + header_bytes_read,
611 _tcp_header_size - header_bytes_read);
612 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
613 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
616 bytes_read = socket->
RecvData(buffer + header_bytes_read,
617 _tcp_header_size - header_bytes_read);
619 #endif // SIMPLE_THREADS
621 if (bytes_read <= 0) {
624 _manager->connection_reset(sinfo->_connection, 0);
626 finish_socket(sinfo);
630 header_bytes_read += bytes_read;
636 if (header_bytes_read != _tcp_header_size) {
640 <<
"Did not read entire header, discarding TCP datagram.\n";
641 finish_socket(sinfo);
646 int size = header.get_datagram_size(_tcp_header_size);
651 while (!_shutdown && (
int)datagram.
get_length() < size) {
654 int read_bytes = read_buffer_size;
655 #ifdef SIMPLE_THREADS
659 read_bytes = min(read_buffer_size, (
int)net_max_read_per_epoch);
663 socket->
RecvData(buffer, min(read_bytes,
665 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
666 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
670 socket->
RecvData(buffer, min(read_bytes,
673 #endif // SIMPLE_THREADS
677 if (bytes_read <= 0) {
680 _manager->connection_reset(sinfo->_connection, 0);
682 finish_socket(sinfo);
687 min(bytes_read, (
int)(size - datagram.
get_length()));
690 if (bytes_read > datagram_bytes) {
694 <<
"Discarding " << bytes_read - datagram_bytes
695 <<
" bytes following TCP datagram.\n";
702 finish_socket(sinfo);
709 if (!header.verify_datagram(datagram, _tcp_header_size)) {
711 <<
"Ignoring invalid TCP datagram.\n";
716 if (net_cat.is_spam()) {
718 <<
"Received TCP datagram with "
720 <<
" bytes on " << (
void *)datagram.get_connection()
724 receive_datagram(datagram);
735 bool ConnectionReader::
736 process_raw_incoming_udp_data(SocketInfo *sinfo) {
738 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
742 char buffer[read_buffer_size];
743 int bytes_read = read_buffer_size;
745 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
748 finish_socket(sinfo);
751 }
else if (bytes_read == 0) {
755 _manager->connection_reset(sinfo->_connection, 0);
757 finish_socket(sinfo);
767 finish_socket(sinfo);
776 if (net_cat.is_spam()) {
778 <<
"Received raw UDP datagram with " << datagram.
get_length()
779 <<
" bytes on " << (
void *)datagram.get_connection()
783 receive_datagram(datagram);
793 bool ConnectionReader::
794 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
796 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
799 char buffer[read_buffer_size];
800 int bytes_read = socket->
RecvData(buffer, read_buffer_size);
801 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
802 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
805 bytes_read = socket->
RecvData(buffer, read_buffer_size);
807 #endif // SIMPLE_THREADS
809 if (bytes_read <= 0) {
812 _manager->connection_reset(sinfo->_connection, 0);
814 finish_socket(sinfo);
824 finish_socket(sinfo);
833 if (net_cat.is_spam()) {
835 <<
"Received raw TCP datagram with " << datagram.
get_length()
836 <<
" bytes on " << (
void *)datagram.get_connection()
840 receive_datagram(datagram);
851 void ConnectionReader::
852 thread_run(
int thread_index) {
858 get_next_available_socket(
true, thread_index);
859 if (sinfo != (SocketInfo *)NULL) {
860 process_incoming_data(sinfo);
880 ConnectionReader::SocketInfo *ConnectionReader::
881 get_next_available_socket(
bool allow_block,
int current_thread_index) {
889 while (!_shutdown && _num_results > 0) {
890 nassertr(_next_index < (
int)_selecting_sockets.size(), NULL);
894 if (_fdset.
IsSetFor(*_selecting_sockets[i]->get_socket())) {
896 SocketInfo *sinfo = _selecting_sockets[i];
916 rebuild_select_list();
923 PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0);
927 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
934 _num_results = _fdset.WaitForRead(
false, timeout);
937 if (_num_results == 0 && allow_block) {
944 }
else if (_num_results < 0) {
948 return (SocketInfo *)NULL;
950 }
while (!_shutdown && interrupted);
956 }
while (!_shutdown && _num_results > 0);
958 return (SocketInfo *)NULL;
969 void ConnectionReader::
970 rebuild_select_list() {
972 _selecting_sockets.clear();
975 Sockets::const_iterator si;
976 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
977 SocketInfo *sinfo = (*si);
978 if (!sinfo->_busy && !sinfo->_error) {
979 _fdset.setForSocket(*sinfo->get_socket());
980 _selecting_sockets.push_back(sinfo);
986 if (!_removed_sockets.empty()) {
987 Sockets still_busy_sockets;
988 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
989 SocketInfo *sinfo = (*si);
991 still_busy_sockets.push_back(sinfo);
996 _removed_sockets.swap(still_busy_sockets);
1008 void ConnectionReader::
1011 Sockets::const_iterator si;
1012 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
1013 SocketInfo *sinfo = (*si);
1014 if (!sinfo->_busy && !sinfo->_error) {
1015 fdset.setForSocket(*sinfo->get_socket());
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
A specific kind of Datagram, especially for sending across or receiving from a network.
Base functionality for a INET domain Socket this call should be the starting point for all other unix...
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
std::string RecvData(int max_len)
Read the data from the connection.
ConnectionReader(ConnectionManager *manager, int num_threads, const string &thread_name=string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
The primary interface to the low-level networking layer in this package.
void shutdown()
Terminates all threads cleanly.
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
size_t get_length() const
Returns the number of bytes in the datagram.
bool Active()
Ask if the socket is open (allocated)
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
void set_connection(const PT(Connection)&connection)
Specifies the socket to which the datagram should be written.
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
static bool is_threading_supported()
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
Base functionality for a combination UDP Reader and Writer.
Similar to MutexHolder, but for a light mutex.
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
An interface to whatever real-time clock we might have available in the current environment.
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
A simple place to store and munipulate tcp and port address for communication layer.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
A thread; that is, a lightweight process.
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
void clear()
Marks the content as empty.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
Represents a single TCP or UDP socket for input or output.
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent...
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound...