00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "connection.h"
00016 #include "connectionManager.h"
00017 #include "netDatagram.h"
00018 #include "datagramTCPHeader.h"
00019 #include "datagramUDPHeader.h"
00020 #include "config_net.h"
00021 #include "config_express.h"
00022 #include "trueClock.h"
00023 #include "pnotify.h"
00024 #include "lightReMutexHolder.h"
00025 #include "socket_ip.h"
00026 #include "socket_tcp.h"
00027 #include "socket_udp.h"
00028 #include "dcast.h"
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 Connection::
00040 Connection(ConnectionManager *manager, Socket_IP *socket) :
00041 _manager(manager),
00042 _socket(socket)
00043 {
00044 _collect_tcp = collect_tcp;
00045 _collect_tcp_interval = collect_tcp_interval;
00046 _queued_data_start = 0.0;
00047 _queued_count = 0;
00048
00049 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00050
00051
00052 if (_socket->SetNonBlocking() != ALL_OK) {
00053 net_cat.warning()
00054 << "Unable to set non-blocking status on socket\n";
00055 }
00056 #endif
00057 }
00058
00059
00060
00061
00062
00063
00064 Connection::
00065 ~Connection() {
00066 net_cat.info()
00067 << "Deleting connection " << (void *)this << "\n";
00068
00069 if (_socket != (Socket_IP *)NULL) {
00070 flush();
00071
00072 _socket->Close();
00073 delete _socket;
00074 }
00075 }
00076
00077
00078
00079
00080
00081
00082
00083 NetAddress Connection::
00084 get_address() const {
00085 Socket_Address addr = _socket->GetPeerName();
00086 return NetAddress(addr);
00087 }
00088
00089
00090
00091
00092
00093
00094
00095 ConnectionManager *Connection::
00096 get_manager() const {
00097 return _manager;
00098 }
00099
00100
00101
00102
00103
00104
00105
00106 Socket_IP *Connection::
00107 get_socket() const {
00108 return _socket;
00109 }
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 void Connection::
00133 set_collect_tcp(bool collect_tcp) {
00134 _collect_tcp = collect_tcp;
00135 }
00136
00137
00138
00139
00140
00141
00142
00143 bool Connection::
00144 get_collect_tcp() const {
00145 return _collect_tcp;
00146 }
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157 void Connection::
00158 set_collect_tcp_interval(double interval) {
00159 _collect_tcp_interval = interval;
00160 }
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 double Connection::
00172 get_collect_tcp_interval() const {
00173 return _collect_tcp_interval;
00174 }
00175
00176
00177
00178
00179
00180
00181
00182
00183 bool Connection::
00184 consider_flush() {
00185 LightReMutexHolder holder(_write_mutex);
00186
00187 if (!_collect_tcp) {
00188 return do_flush();
00189
00190 } else {
00191 double elapsed =
00192 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start;
00193
00194
00195 if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) {
00196 return do_flush();
00197 }
00198 }
00199
00200 return true;
00201 }
00202
00203
00204
00205
00206
00207
00208
00209
00210 bool Connection::
00211 flush() {
00212 LightReMutexHolder holder(_write_mutex);
00213 return do_flush();
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223 // Function: Connection::set_nonblock
00224 // Access: Published
00225 // Description: Sets whether nonblocking I/O should be in effect.
00226
00227 void Connection::
00228 set_nonblock(bool flag) {
00229 if (flag) {
00230 _socket->SetNonBlocking();
00231 } else {
00232 _socket->SetBlocking();
00233 }
00234 }
00235 */
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249 void Connection::
00250 set_linger(bool flag, double time) {
00251 Socket_TCP *tcp;
00252 DCAST_INTO_V(tcp, _socket);
00253
00254 if (flag) {
00255 tcp->SetLinger((int)time);
00256 } else {
00257 tcp->DontLinger();
00258 }
00259 }
00260
00261
00262
00263
00264
00265
00266 void Connection::
00267 set_reuse_addr(bool flag) {
00268 _socket->SetReuseAddress(flag);
00269 }
00270
00271
00272
00273
00274
00275
00276
00277 void Connection::
00278 set_keep_alive(bool flag) {
00279
00280 }
00281
00282
00283
00284
00285
00286
00287 void Connection::
00288 set_recv_buffer_size(int size) {
00289 _socket->SetRecvBufferSize(size);
00290 }
00291
00292
00293
00294
00295
00296
00297 void Connection::
00298 set_send_buffer_size(int size) {
00299 Socket_TCP *tcp;
00300 DCAST_INTO_V(tcp, _socket);
00301
00302 tcp->SetSendBufferSize(size);
00303 }
00304
00305
00306
00307
00308
00309
00310 void Connection::
00311 set_ip_time_to_live(int ttl) {
00312
00313 }
00314
00315
00316
00317
00318
00319
00320 void Connection::
00321 set_ip_type_of_service(int tos) {
00322
00323 }
00324
00325
00326
00327
00328
00329
00330
00331 void Connection::
00332 set_no_delay(bool flag) {
00333 Socket_TCP *tcp;
00334 DCAST_INTO_V(tcp, _socket);
00335
00336 tcp->SetNoDelay(flag);
00337 }
00338
00339
00340
00341
00342
00343
00344 void Connection::
00345 set_max_segment(int size) {
00346
00347 }
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358 bool Connection::
00359 send_datagram(const NetDatagram &datagram, int tcp_header_size) {
00360 nassertr(_socket != (Socket_IP *)NULL, false);
00361
00362 if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
00363
00364 Socket_UDP *udp;
00365 DCAST_INTO_R(udp, _socket, false);
00366
00367 LightReMutexHolder holder(_write_mutex);
00368 DatagramUDPHeader header(datagram);
00369 string data;
00370 data += header.get_header();
00371 data += datagram.get_message();
00372
00373 if (net_cat.is_debug()) {
00374 header.verify_datagram(datagram);
00375 }
00376
00377 int bytes_to_send = data.length();
00378 Socket_Address addr = datagram.get_address().get_addr();
00379
00380 bool okflag = udp->SendTo(data, addr);
00381 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00382 while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
00383 Thread::force_yield();
00384 okflag = udp->SendTo(data, addr);
00385 }
00386 #endif // SIMPLE_THREADS
00387
00388 if (net_cat.is_spam()) {
00389 net_cat.spam()
00390 << "Sent UDP datagram with "
00391 << bytes_to_send << " bytes to " << (void *)this
00392 << ", ok = " << okflag << "\n";
00393 }
00394
00395 return check_send_error(okflag);
00396 }
00397
00398
00399 if (tcp_header_size == 2 && datagram.get_length() >= 0x10000) {
00400 net_cat.error()
00401 << "Attempt to send TCP datagram of " << datagram.get_length()
00402 << " bytes--too long!\n";
00403 nassert_raise("Datagram too long");
00404 return false;
00405 }
00406
00407 DatagramTCPHeader header(datagram, tcp_header_size);
00408
00409 LightReMutexHolder holder(_write_mutex);
00410 _queued_data += header.get_header();
00411 _queued_data += datagram.get_message();
00412 _queued_count++;
00413
00414 if (net_cat.is_debug()) {
00415 header.verify_datagram(datagram, tcp_header_size);
00416 }
00417
00418 if (!_collect_tcp ||
00419 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
00420 return do_flush();
00421 }
00422
00423 return true;
00424 }
00425
00426
00427
00428
00429
00430
00431
00432
00433 bool Connection::
00434 send_raw_datagram(const NetDatagram &datagram) {
00435 nassertr(_socket != (Socket_IP *)NULL, false);
00436
00437 if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
00438
00439 Socket_UDP *udp;
00440 DCAST_INTO_R(udp, _socket, false);
00441
00442 string data = datagram.get_message();
00443
00444 LightReMutexHolder holder(_write_mutex);
00445 Socket_Address addr = datagram.get_address().get_addr();
00446 bool okflag = udp->SendTo(data, addr);
00447 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00448 while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
00449 Thread::force_yield();
00450 okflag = udp->SendTo(data, addr);
00451 }
00452 #endif // SIMPLE_THREADS
00453
00454 if (net_cat.is_spam()) {
00455 net_cat.spam()
00456 << "Sent UDP datagram with "
00457 << data.size() << " bytes to " << (void *)this
00458 << ", ok = " << okflag << "\n";
00459 }
00460
00461 return check_send_error(okflag);
00462 }
00463
00464
00465 LightReMutexHolder holder(_write_mutex);
00466 _queued_data += datagram.get_message();
00467 _queued_count++;
00468
00469 if (!_collect_tcp ||
00470 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
00471 return do_flush();
00472 }
00473
00474 return true;
00475 }
00476
00477
00478
00479
00480
00481
00482
00483 bool Connection::
00484 do_flush() {
00485 if (_queued_data.empty()) {
00486 _queued_count = 0;
00487 _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
00488 return true;
00489 }
00490
00491 if (net_cat.is_spam()) {
00492 net_cat.spam()
00493 << "Sending " << _queued_count << " TCP datagram(s) with "
00494 << _queued_data.length() << " total bytes to " << (void *)this << "\n";
00495 }
00496
00497 Socket_TCP *tcp;
00498 DCAST_INTO_R(tcp, _socket, false);
00499
00500 string sending_data;
00501 _queued_data.swap(sending_data);
00502
00503 _queued_count = 0;
00504 _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
00505
00506 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00507 int max_send = net_max_write_per_epoch;
00508 int data_sent = tcp->SendData(sending_data.data(), min((size_t)max_send, sending_data.size()));
00509 bool okflag = (data_sent == (int)sending_data.size());
00510 if (!okflag) {
00511 int total_sent = 0;
00512 if (data_sent > 0) {
00513 total_sent += data_sent;
00514 }
00515 double last_report = 0;
00516 while (!okflag && tcp->Active() &&
00517 (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
00518 if (data_sent == 0) {
00519 Thread::force_yield();
00520 } else {
00521 Thread::consider_yield();
00522 }
00523 data_sent = tcp->SendData(sending_data.data() + total_sent, min((size_t)max_send, sending_data.size() - total_sent));
00524 if (data_sent > 0) {
00525 total_sent += data_sent;
00526 }
00527 okflag = (total_sent == (int)sending_data.size());
00528 }
00529 }
00530
00531 #else // SIMPLE_THREADS
00532 int data_sent = tcp->SendData(sending_data);
00533 bool okflag = (data_sent == (int)sending_data.size());
00534
00535 #endif // SIMPLE_THREADS
00536
00537 return check_send_error(okflag);
00538 }
00539
00540
00541
00542
00543
00544
00545
00546 bool Connection::
00547 check_send_error(bool okflag) {
00548 if (!okflag) {
00549 static ConfigVariableBool abort_send_error("abort-send-error", false);
00550 if (abort_send_error) {
00551 nassertr(false, false);
00552 }
00553
00554
00555
00556 if (_manager != (ConnectionManager *)NULL) {
00557 _manager->flush_read_connection(this);
00558 _manager->connection_reset(this, okflag);
00559 }
00560 return false;
00561 }
00562
00563 return true;
00564 }