00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "connectionReader.h"
00016 #include "connectionManager.h"
00017 #include "netDatagram.h"
00018 #include "datagramTCPHeader.h"
00019 #include "datagramUDPHeader.h"
00020 #include "config_net.h"
00021 #include "trueClock.h"
00022 #include "socket_udp.h"
00023 #include "socket_tcp.h"
00024 #include "mutexHolder.h"
00025 #include "lightMutexHolder.h"
00026 #include "pnotify.h"
00027 #include "atomicAdjust.h"
00028 #include "config_downloader.h"
00029
00030 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
00031
00032
00033
00034
00035
00036
00037 ConnectionReader::SocketInfo::
00038 SocketInfo(const PT(Connection) &connection) :
00039 _connection(connection)
00040 {
00041 _busy = false;
00042 _error = false;
00043 }
00044
00045
00046
00047
00048
00049
00050 bool ConnectionReader::SocketInfo::
00051 is_udp() const {
00052 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
00053 }
00054
00055
00056
00057
00058
00059
00060 Socket_IP *ConnectionReader::SocketInfo::
00061 get_socket() const {
00062 return _connection->get_socket();
00063 }
00064
00065
00066
00067
00068
00069
00070 ConnectionReader::ReaderThread::
00071 ReaderThread(ConnectionReader *reader, const string &thread_name,
00072 int thread_index) :
00073 Thread(make_thread_name(thread_name, thread_index),
00074 make_thread_name(thread_name, thread_index)),
00075 _reader(reader),
00076 _thread_index(thread_index)
00077 {
00078 }
00079
00080
00081
00082
00083
00084
00085 void ConnectionReader::ReaderThread::
00086 thread_main() {
00087 _reader->thread_run(_thread_index);
00088 }
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099 ConnectionReader::
00100 ConnectionReader(ConnectionManager *manager, int num_threads,
00101 const string &thread_name) :
00102 _manager(manager)
00103 {
00104 if (!Thread::is_threading_supported()) {
00105 #ifndef NDEBUG
00106 if (num_threads != 0) {
00107 if (net_cat.is_debug()) {
00108 net_cat.debug()
00109 << "Threading support is not available.\n";
00110 }
00111 }
00112 #endif // NDEBUG
00113 num_threads = 0;
00114 }
00115
00116 _raw_mode = false;
00117 _tcp_header_size = tcp_header_size;
00118 _polling = (num_threads <= 0);
00119
00120 _shutdown = false;
00121
00122 _next_index = 0;
00123 _num_results = 0;
00124
00125 _currently_polling_thread = -1;
00126
00127 string reader_thread_name = thread_name;
00128 if (thread_name.empty()) {
00129 reader_thread_name = "ReaderThread";
00130 }
00131 int i;
00132 for (i = 0; i < num_threads; i++) {
00133 PT(ReaderThread) thread = new ReaderThread(this, reader_thread_name, i);
00134 _threads.push_back(thread);
00135 }
00136 for (i = 0; i < num_threads; i++) {
00137 _threads[i]->start(net_thread_priority, true);
00138 }
00139
00140 _manager->add_reader(this);
00141 }
00142
00143
00144
00145
00146
00147
00148 ConnectionReader::
00149 ~ConnectionReader() {
00150 if (_manager != (ConnectionManager *)NULL) {
00151 _manager->remove_reader(this);
00152 }
00153
00154 shutdown();
00155
00156
00157 Sockets::iterator si;
00158 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00159 delete (*si);
00160 }
00161 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00162 SocketInfo *sinfo = (*si);
00163 if (!sinfo->_busy) {
00164 delete sinfo;
00165 } else {
00166 net_cat.error()
00167 << "Reentrant deletion of ConnectionReader--don't delete these\n"
00168 << "in response to connection_reset().\n";
00169
00170
00171 sinfo->_connection.clear();
00172 }
00173 }
00174 }
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193 bool ConnectionReader::
00194 add_connection(Connection *connection) {
00195 nassertr(connection != (Connection *)NULL, false);
00196
00197 LightMutexHolder holder(_sockets_mutex);
00198
00199
00200 Sockets::const_iterator si;
00201 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00202 if ((*si)->_connection == connection) {
00203
00204 return false;
00205 }
00206 }
00207
00208 _sockets.push_back(new SocketInfo(connection));
00209
00210 return true;
00211 }
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 bool ConnectionReader::
00225 remove_connection(Connection *connection) {
00226 LightMutexHolder holder(_sockets_mutex);
00227
00228
00229 Sockets::iterator si;
00230 si = _sockets.begin();
00231 while (si != _sockets.end() && (*si)->_connection != connection) {
00232 ++si;
00233 }
00234 if (si == _sockets.end()) {
00235 return false;
00236 }
00237
00238 _removed_sockets.push_back(*si);
00239 _sockets.erase(si);
00240
00241 return true;
00242 }
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255 bool ConnectionReader::
00256 is_connection_ok(Connection *connection) {
00257 LightMutexHolder holder(_sockets_mutex);
00258
00259
00260
00261 Sockets::iterator si;
00262 si = _sockets.begin();
00263 while (si != _sockets.end() && (*si)->_connection != connection) {
00264 ++si;
00265 }
00266 if (si == _sockets.end()) {
00267
00268 return false;
00269 }
00270
00271 SocketInfo *sinfo = (*si);
00272 bool is_ok = !sinfo->_error;
00273
00274 return is_ok;
00275 }
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289 void ConnectionReader::
00290 poll() {
00291 if (!_polling) {
00292 return;
00293 }
00294
00295 SocketInfo *sinfo = get_next_available_socket(false, -2);
00296 if (sinfo != (SocketInfo *)NULL) {
00297 double max_poll_cycle = get_net_max_poll_cycle();
00298 if (max_poll_cycle < 0.0) {
00299
00300 while (sinfo != (SocketInfo *)NULL) {
00301 process_incoming_data(sinfo);
00302 sinfo = get_next_available_socket(false, -2);
00303 }
00304
00305 } else {
00306
00307 TrueClock *global_clock = TrueClock::get_global_ptr();
00308 double stop = global_clock->get_short_time() + max_poll_cycle;
00309
00310 while (sinfo != (SocketInfo *)NULL) {
00311 process_incoming_data(sinfo);
00312 if (global_clock->get_short_time() >= stop) {
00313 return;
00314 }
00315 sinfo = get_next_available_socket(false, -2);
00316 }
00317 }
00318 }
00319 }
00320
00321
00322
00323
00324
00325
00326
00327 ConnectionManager *ConnectionReader::
00328 get_manager() const {
00329 return _manager;
00330 }
00331
00332
00333
00334
00335
00336
00337
00338 int ConnectionReader::
00339 get_num_threads() const {
00340 return _threads.size();
00341 }
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354 void ConnectionReader::
00355 set_raw_mode(bool mode) {
00356 _raw_mode = mode;
00357 }
00358
00359
00360
00361
00362
00363
00364
00365 bool ConnectionReader::
00366 get_raw_mode() const {
00367 return _raw_mode;
00368 }
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379 void ConnectionReader::
00380 set_tcp_header_size(int tcp_header_size) {
00381 _tcp_header_size = tcp_header_size;
00382 }
00383
00384
00385
00386
00387
00388
00389
00390 int ConnectionReader::
00391 get_tcp_header_size() const {
00392 return _tcp_header_size;
00393 }
00394
00395
00396
00397
00398
00399
00400
00401
00402 void ConnectionReader::
00403 shutdown() {
00404 if (_shutdown) {
00405 return;
00406 }
00407
00408
00409
00410 _shutdown = true;
00411
00412
00413 Threads::iterator ti;
00414 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00415 (*ti)->join();
00416 }
00417 }
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428 void ConnectionReader::
00429 flush_read_connection(Connection *connection) {
00430
00431 SocketInfo sinfo(connection);
00432
00433 if (!remove_connection(connection)) {
00434
00435 return;
00436 }
00437
00438
00439
00440
00441
00442
00443
00444 Socket_fdset fdset;
00445 fdset.clear();
00446 fdset.setForSocket(*(sinfo.get_socket()));
00447 int num_results = fdset.WaitForRead(true, 0);
00448 while (num_results != 0) {
00449 sinfo._busy = true;
00450 if (!process_incoming_data(&sinfo)) {
00451 break;
00452 }
00453 fdset.setForSocket(*(sinfo.get_socket()));
00454 num_results = fdset.WaitForRead(true, 0);
00455 }
00456 }
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468 void ConnectionReader::
00469 clear_manager() {
00470 _manager = (ConnectionManager *)NULL;
00471 }
00472
00473
00474
00475
00476
00477
00478
00479 void ConnectionReader::
00480 finish_socket(SocketInfo *sinfo) {
00481 nassertv(sinfo->_busy);
00482
00483
00484
00485 sinfo->_busy = false;
00486 }
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497 bool ConnectionReader::
00498 process_incoming_data(SocketInfo *sinfo) {
00499 if (_raw_mode) {
00500 if (sinfo->is_udp()) {
00501 return process_raw_incoming_udp_data(sinfo);
00502 } else {
00503 return process_raw_incoming_tcp_data(sinfo);
00504 }
00505 } else {
00506 if (sinfo->is_udp()) {
00507 return process_incoming_udp_data(sinfo);
00508 } else {
00509 return process_incoming_tcp_data(sinfo);
00510 }
00511 }
00512 }
00513
00514
00515
00516
00517
00518
00519 bool ConnectionReader::
00520 process_incoming_udp_data(SocketInfo *sinfo) {
00521 Socket_UDP *socket;
00522 DCAST_INTO_R(socket, sinfo->get_socket(), false);
00523 Socket_Address addr;
00524
00525
00526 char buffer[read_buffer_size];
00527 int bytes_read = read_buffer_size;
00528
00529 bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
00530
00531 if (!okflag) {
00532 finish_socket(sinfo);
00533 return false;
00534
00535 } else if (bytes_read == 0) {
00536
00537
00538 if (_manager != (ConnectionManager *)NULL) {
00539 _manager->connection_reset(sinfo->_connection, 0);
00540 }
00541 finish_socket(sinfo);
00542 return false;
00543 }
00544
00545
00546
00547
00548 if (bytes_read < datagram_udp_header_size) {
00549 net_cat.error()
00550 << "Did not read entire header, discarding UDP datagram.\n";
00551 finish_socket(sinfo);
00552 return true;
00553 }
00554
00555 DatagramUDPHeader header(buffer);
00556
00557 char *dp = buffer + datagram_udp_header_size;
00558 bytes_read -= datagram_udp_header_size;
00559
00560 NetDatagram datagram(dp, bytes_read);
00561
00562
00563
00564 finish_socket(sinfo);
00565
00566 if (_shutdown) {
00567 return false;
00568 }
00569
00570
00571 if (!header.verify_datagram(datagram)) {
00572 net_cat.error()
00573 << "Ignoring invalid UDP datagram.\n";
00574 } else {
00575 datagram.set_connection(sinfo->_connection);
00576 datagram.set_address(NetAddress(addr));
00577
00578 if (net_cat.is_spam()) {
00579 net_cat.spam()
00580 << "Received UDP datagram with "
00581 << datagram_udp_header_size + datagram.get_length()
00582 << " bytes on " << (void *)datagram.get_connection()
00583 << " from " << datagram.get_address() << "\n";
00584 }
00585
00586 receive_datagram(datagram);
00587 }
00588
00589 return true;
00590 }
00591
00592
00593
00594
00595
00596
00597 bool ConnectionReader::
00598 process_incoming_tcp_data(SocketInfo *sinfo) {
00599 Socket_TCP *socket;
00600 DCAST_INTO_R(socket, sinfo->get_socket(), false);
00601
00602
00603 char buffer[read_buffer_size];
00604 int header_bytes_read = 0;
00605
00606
00607 while (header_bytes_read < _tcp_header_size) {
00608 int bytes_read =
00609 socket->RecvData(buffer + header_bytes_read,
00610 _tcp_header_size - header_bytes_read);
00611 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00612 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
00613 socket->Active()) {
00614 Thread::force_yield();
00615 bytes_read = socket->RecvData(buffer + header_bytes_read,
00616 _tcp_header_size - header_bytes_read);
00617 }
00618 #endif // SIMPLE_THREADS
00619
00620 if (bytes_read <= 0) {
00621
00622 if (_manager != (ConnectionManager *)NULL) {
00623 _manager->connection_reset(sinfo->_connection, 0);
00624 }
00625 finish_socket(sinfo);
00626 return false;
00627 }
00628
00629 header_bytes_read += bytes_read;
00630 Thread::consider_yield();
00631 }
00632
00633
00634
00635 if (header_bytes_read != _tcp_header_size) {
00636
00637
00638 net_cat.error()
00639 << "Did not read entire header, discarding TCP datagram.\n";
00640 finish_socket(sinfo);
00641 return true;
00642 }
00643
00644 DatagramTCPHeader header(buffer, _tcp_header_size);
00645 int size = header.get_datagram_size(_tcp_header_size);
00646
00647
00648 NetDatagram datagram;
00649
00650 while (!_shutdown && (int)datagram.get_length() < size) {
00651 int bytes_read;
00652
00653 int read_bytes = read_buffer_size;
00654 #ifdef SIMPLE_THREADS
00655
00656
00657
00658 read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
00659 #endif
00660
00661 bytes_read =
00662 socket->RecvData(buffer, min(read_bytes,
00663 (int)(size - datagram.get_length())));
00664 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00665 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
00666 socket->Active()) {
00667 Thread::force_yield();
00668 bytes_read =
00669 socket->RecvData(buffer, min(read_bytes,
00670 (int)(size - datagram.get_length())));
00671 }
00672 #endif // SIMPLE_THREADS
00673
00674 char *dp = buffer;
00675
00676 if (bytes_read <= 0) {
00677
00678 if (_manager != (ConnectionManager *)NULL) {
00679 _manager->connection_reset(sinfo->_connection, 0);
00680 }
00681 finish_socket(sinfo);
00682 return false;
00683 }
00684
00685 int datagram_bytes =
00686 min(bytes_read, (int)(size - datagram.get_length()));
00687 datagram.append_data(dp, datagram_bytes);
00688
00689 if (bytes_read > datagram_bytes) {
00690
00691
00692 net_cat.error()
00693 << "Discarding " << bytes_read - datagram_bytes
00694 << " bytes following TCP datagram.\n";
00695 }
00696 Thread::consider_yield();
00697 }
00698
00699
00700
00701 finish_socket(sinfo);
00702
00703 if (_shutdown) {
00704 return false;
00705 }
00706
00707
00708 if (!header.verify_datagram(datagram, _tcp_header_size)) {
00709 net_cat.error()
00710 << "Ignoring invalid TCP datagram.\n";
00711 } else {
00712 datagram.set_connection(sinfo->_connection);
00713 datagram.set_address(NetAddress(socket->GetPeerName()));
00714
00715 if (net_cat.is_spam()) {
00716 net_cat.spam()
00717 << "Received TCP datagram with "
00718 << _tcp_header_size + datagram.get_length()
00719 << " bytes on " << (void *)datagram.get_connection()
00720 << " from " << datagram.get_address() << "\n";
00721 }
00722
00723 receive_datagram(datagram);
00724 }
00725
00726 return true;
00727 }
00728
00729
00730
00731
00732
00733
00734 bool ConnectionReader::
00735 process_raw_incoming_udp_data(SocketInfo *sinfo) {
00736 Socket_UDP *socket;
00737 DCAST_INTO_R(socket, sinfo->get_socket(), false);
00738 Socket_Address addr;
00739
00740
00741 char buffer[read_buffer_size];
00742 int bytes_read = read_buffer_size;
00743
00744 bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
00745
00746 if (!okflag) {
00747 finish_socket(sinfo);
00748 return false;
00749
00750 } else if (bytes_read == 0) {
00751
00752
00753 if (_manager != (ConnectionManager *)NULL) {
00754 _manager->connection_reset(sinfo->_connection, 0);
00755 }
00756 finish_socket(sinfo);
00757 return false;
00758 }
00759
00760
00761
00762 NetDatagram datagram(buffer, bytes_read);
00763
00764
00765
00766 finish_socket(sinfo);
00767
00768 if (_shutdown) {
00769 return false;
00770 }
00771
00772 datagram.set_connection(sinfo->_connection);
00773 datagram.set_address(NetAddress(addr));
00774
00775 if (net_cat.is_spam()) {
00776 net_cat.spam()
00777 << "Received raw UDP datagram with " << datagram.get_length()
00778 << " bytes on " << (void *)datagram.get_connection()
00779 << " from " << datagram.get_address() << "\n";
00780 }
00781
00782 receive_datagram(datagram);
00783
00784 return true;
00785 }
00786
00787
00788
00789
00790
00791
00792 bool ConnectionReader::
00793 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
00794 Socket_TCP *socket;
00795 DCAST_INTO_R(socket, sinfo->get_socket(), false);
00796
00797
00798 char buffer[read_buffer_size];
00799 int bytes_read = socket->RecvData(buffer, read_buffer_size);
00800 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00801 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
00802 socket->Active()) {
00803 Thread::force_yield();
00804 bytes_read = socket->RecvData(buffer, read_buffer_size);
00805 }
00806 #endif // SIMPLE_THREADS
00807
00808 if (bytes_read <= 0) {
00809
00810 if (_manager != (ConnectionManager *)NULL) {
00811 _manager->connection_reset(sinfo->_connection, 0);
00812 }
00813 finish_socket(sinfo);
00814 return false;
00815 }
00816
00817
00818
00819 NetDatagram datagram(buffer, bytes_read);
00820
00821
00822
00823 finish_socket(sinfo);
00824
00825 if (_shutdown) {
00826 return false;
00827 }
00828
00829 datagram.set_connection(sinfo->_connection);
00830 datagram.set_address(NetAddress(socket->GetPeerName()));
00831
00832 if (net_cat.is_spam()) {
00833 net_cat.spam()
00834 << "Received raw TCP datagram with " << datagram.get_length()
00835 << " bytes on " << (void *)datagram.get_connection()
00836 << " from " << datagram.get_address() << "\n";
00837 }
00838
00839 receive_datagram(datagram);
00840
00841 return true;
00842 }
00843
00844
00845
00846
00847
00848
00849
00850 void ConnectionReader::
00851 thread_run(int thread_index) {
00852 nassertv(!_polling);
00853 nassertv(_threads[thread_index] == Thread::get_current_thread());
00854
00855 while (!_shutdown) {
00856 SocketInfo *sinfo =
00857 get_next_available_socket(true, thread_index);
00858 if (sinfo != (SocketInfo *)NULL) {
00859 process_incoming_data(sinfo);
00860 Thread::consider_yield();
00861 } else {
00862 Thread::force_yield();
00863 }
00864 }
00865 }
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879 ConnectionReader::SocketInfo *ConnectionReader::
00880 get_next_available_socket(bool allow_block, int current_thread_index) {
00881
00882
00883 MutexHolder holder(_select_mutex);
00884
00885 do {
00886
00887
00888 while (!_shutdown && _num_results > 0) {
00889 nassertr(_next_index < (int)_selecting_sockets.size(), NULL);
00890 int i = _next_index;
00891 _next_index++;
00892
00893 if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
00894 _num_results--;
00895 SocketInfo *sinfo = _selecting_sockets[i];
00896
00897
00898 sinfo->_busy = true;
00899 return sinfo;
00900 }
00901 }
00902
00903 bool interrupted;
00904 do {
00905 interrupted = false;
00906
00907
00908
00909
00910
00911
00912
00913 AtomicAdjust::set(_currently_polling_thread, current_thread_index);
00914
00915 rebuild_select_list();
00916
00917
00918 _num_results = 0;
00919 _next_index = 0;
00920
00921 if (!_shutdown) {
00922 PN_uint32 timeout = (PN_uint32)(get_net_max_block() * 1000.0);
00923 if (!allow_block) {
00924 timeout = 0;
00925 }
00926 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00927
00928
00929
00930 timeout = 0;
00931 #endif
00932
00933 _num_results = _fdset.WaitForRead(false, timeout);
00934 }
00935
00936 if (_num_results == 0 && allow_block) {
00937
00938
00939
00940 interrupted = true;
00941 Thread::force_yield();
00942
00943 } else if (_num_results < 0) {
00944
00945
00946 Thread::force_yield();
00947 return (SocketInfo *)NULL;
00948 }
00949 } while (!_shutdown && interrupted);
00950
00951 AtomicAdjust::set(_currently_polling_thread, current_thread_index);
00952
00953
00954
00955 } while (!_shutdown && _num_results > 0);
00956
00957 return (SocketInfo *)NULL;
00958 }
00959
00960
00961
00962
00963
00964
00965
00966
00967
00968 void ConnectionReader::
00969 rebuild_select_list() {
00970 _fdset.clear();
00971 _selecting_sockets.clear();
00972
00973 LightMutexHolder holder(_sockets_mutex);
00974 Sockets::const_iterator si;
00975 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00976 SocketInfo *sinfo = (*si);
00977 if (!sinfo->_busy && !sinfo->_error) {
00978 _fdset.setForSocket(*sinfo->get_socket());
00979 _selecting_sockets.push_back(sinfo);
00980 }
00981 }
00982
00983
00984
00985 if (!_removed_sockets.empty()) {
00986 Sockets still_busy_sockets;
00987 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00988 SocketInfo *sinfo = (*si);
00989 if (sinfo->_busy) {
00990 still_busy_sockets.push_back(sinfo);
00991 } else {
00992 delete sinfo;
00993 }
00994 }
00995 _removed_sockets.swap(still_busy_sockets);
00996 }
00997 }
00998
00999
01000
01001
01002
01003
01004
01005
01006
01007 void ConnectionReader::
01008 accumulate_fdset(Socket_fdset &fdset) {
01009 LightMutexHolder holder(_sockets_mutex);
01010 Sockets::const_iterator si;
01011 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
01012 SocketInfo *sinfo = (*si);
01013 if (!sinfo->_busy && !sinfo->_error) {
01014 fdset.setForSocket(*sinfo->get_socket());
01015 }
01016 }
01017 }