Panda3D
|
00001 // Filename: connection.cxx 00002 // Created by: jns (07Feb00) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "connection.h" 00016 #include "connectionManager.h" 00017 #include "netDatagram.h" 00018 #include "datagramTCPHeader.h" 00019 #include "datagramUDPHeader.h" 00020 #include "config_net.h" 00021 #include "config_express.h" // for collect_tcp 00022 #include "trueClock.h" 00023 #include "pnotify.h" 00024 #include "lightReMutexHolder.h" 00025 #include "socket_ip.h" 00026 #include "socket_tcp.h" 00027 #include "socket_udp.h" 00028 #include "dcast.h" 00029 00030 00031 //////////////////////////////////////////////////////////////////// 00032 // Function: Connection::Constructor 00033 // Access: Published 00034 // Description: Creates a connection. Normally this constructor 00035 // should not be used directly by user code; use one of 00036 // the methods in ConnectionManager to make a new 00037 // connection. 00038 //////////////////////////////////////////////////////////////////// 00039 Connection:: 00040 Connection(ConnectionManager *manager, Socket_IP *socket) : 00041 _manager(manager), 00042 _socket(socket) 00043 { 00044 _collect_tcp = collect_tcp; 00045 _collect_tcp_interval = collect_tcp_interval; 00046 _queued_data_start = 0.0; 00047 _queued_count = 0; 00048 00049 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00050 // In the presence of SIMPLE_THREADS, we use non-blocking I/O. We 00051 // simulate blocking by yielding the thread. 00052 if (_socket->SetNonBlocking() != ALL_OK) { 00053 net_cat.warning() 00054 << "Unable to set non-blocking status on socket\n"; 00055 } 00056 #endif 00057 } 00058 00059 //////////////////////////////////////////////////////////////////// 00060 // Function: Connection::Destructor 00061 // Access: Published 00062 // Description: Closes a connection. 00063 //////////////////////////////////////////////////////////////////// 00064 Connection:: 00065 ~Connection() { 00066 net_cat.info() 00067 << "Deleting connection " << (void *)this << "\n"; 00068 00069 if (_socket != (Socket_IP *)NULL) { 00070 flush(); 00071 00072 _socket->Close(); 00073 delete _socket; 00074 } 00075 } 00076 00077 //////////////////////////////////////////////////////////////////// 00078 // Function: Connection::get_address 00079 // Access: Published 00080 // Description: Returns the address bound to this connection, if it 00081 // is a TCP connection. 00082 //////////////////////////////////////////////////////////////////// 00083 NetAddress Connection:: 00084 get_address() const { 00085 Socket_Address addr = _socket->GetPeerName(); 00086 return NetAddress(addr); 00087 } 00088 00089 //////////////////////////////////////////////////////////////////// 00090 // Function: Connection::get_manager 00091 // Access: Published 00092 // Description: Returns a pointer to the ConnectionManager object 00093 // that serves this connection. 00094 //////////////////////////////////////////////////////////////////// 00095 ConnectionManager *Connection:: 00096 get_manager() const { 00097 return _manager; 00098 } 00099 00100 //////////////////////////////////////////////////////////////////// 00101 // Function: Connection::get_socket 00102 // Access: Published 00103 // Description: Returns the internal Socket_IP that defines the 00104 // connection. 00105 //////////////////////////////////////////////////////////////////// 00106 Socket_IP *Connection:: 00107 get_socket() const { 00108 return _socket; 00109 } 00110 00111 //////////////////////////////////////////////////////////////////// 00112 // Function: Connection::set_collect_tcp 00113 // Access: Published 00114 // Description: Enables or disables "collect-tcp" mode. In this 00115 // mode, individual TCP packets are not sent 00116 // immediately, but rather they are collected together 00117 // and accumulated to be sent periodically as one larger 00118 // TCP packet. This cuts down on overhead from the 00119 // TCP/IP protocol, especially if many small packets 00120 // need to be sent on the same connection, but it 00121 // introduces additional latency (since packets must be 00122 // held before they can be sent). 00123 // 00124 // See set_collect_tcp_interval() to specify the 00125 // interval of time for which to hold packets before 00126 // sending them. 00127 // 00128 // If you enable this mode, you may also need to 00129 // periodically call consider_flush() to flush the queue 00130 // if no packets have been sent recently. 00131 //////////////////////////////////////////////////////////////////// 00132 void Connection:: 00133 set_collect_tcp(bool collect_tcp) { 00134 _collect_tcp = collect_tcp; 00135 } 00136 00137 //////////////////////////////////////////////////////////////////// 00138 // Function: Connection::get_collect_tcp 00139 // Access: Published 00140 // Description: Returns the current setting of "collect-tcp" mode. 00141 // See set_collect_tcp(). 00142 //////////////////////////////////////////////////////////////////// 00143 bool Connection:: 00144 get_collect_tcp() const { 00145 return _collect_tcp; 00146 } 00147 00148 //////////////////////////////////////////////////////////////////// 00149 // Function: Connection::set_collect_tcp_interval 00150 // Access: Published 00151 // Description: Specifies the interval in time, in seconds, for which 00152 // to hold TCP packets before sending all of the 00153 // recently received packets at once. This only has 00154 // meaning if "collect-tcp" mode is enabled; see 00155 // set_collect_tcp(). 00156 //////////////////////////////////////////////////////////////////// 00157 void Connection:: 00158 set_collect_tcp_interval(double interval) { 00159 _collect_tcp_interval = interval; 00160 } 00161 00162 //////////////////////////////////////////////////////////////////// 00163 // Function: Connection::get_collect_tcp_interval 00164 // Access: Published 00165 // Description: Returns the interval in time, in seconds, for which 00166 // to hold TCP packets before sending all of the 00167 // recently received packets at once. This only has 00168 // meaning if "collect-tcp" mode is enabled; see 00169 // set_collect_tcp(). 00170 //////////////////////////////////////////////////////////////////// 00171 double Connection:: 00172 get_collect_tcp_interval() const { 00173 return _collect_tcp_interval; 00174 } 00175 00176 //////////////////////////////////////////////////////////////////// 00177 // Function: Connection::consider_flush 00178 // Access: Published 00179 // Description: Sends the most recently queued TCP datagram(s) if 00180 // enough time has elapsed. This only has meaning if 00181 // set_collect_tcp() has been set to true. 00182 //////////////////////////////////////////////////////////////////// 00183 bool Connection:: 00184 consider_flush() { 00185 LightReMutexHolder holder(_write_mutex); 00186 00187 if (!_collect_tcp) { 00188 return do_flush(); 00189 00190 } else { 00191 double elapsed = 00192 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start; 00193 // If the elapsed time is negative, someone must have reset the 00194 // clock back, so just go ahead and flush. 00195 if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) { 00196 return do_flush(); 00197 } 00198 } 00199 00200 return true; 00201 } 00202 00203 //////////////////////////////////////////////////////////////////// 00204 // Function: Connection::flush 00205 // Access: Published 00206 // Description: Sends the most recently queued TCP datagram(s) now. 00207 // This only has meaning if set_collect_tcp() has been 00208 // set to true. 00209 //////////////////////////////////////////////////////////////////// 00210 bool Connection:: 00211 flush() { 00212 LightReMutexHolder holder(_write_mutex); 00213 return do_flush(); 00214 } 00215 00216 /* 00217 This method is disabled. We don't provide enough interface to use 00218 non-blocking I/O effectively at this level, so we shouldn't provide 00219 this call. Specifically, we don't provide a way to query whether an 00220 operation failed because it would have blocked or not. 00221 00222 //////////////////////////////////////////////////////////////////// 00223 // Function: Connection::set_nonblock 00224 // Access: Published 00225 // Description: Sets whether nonblocking I/O should be in effect. 00226 //////////////////////////////////////////////////////////////////// 00227 void Connection:: 00228 set_nonblock(bool flag) { 00229 if (flag) { 00230 _socket->SetNonBlocking(); 00231 } else { 00232 _socket->SetBlocking(); 00233 } 00234 } 00235 */ 00236 00237 //////////////////////////////////////////////////////////////////// 00238 // Function: Connection::set_linger 00239 // Access: Published 00240 // Description: Sets the time to linger on close if data is present. 00241 // If flag is false, when you close a socket with data 00242 // available the system attempts to deliver the data to 00243 // the peer (the default behavior). If flag is false 00244 // but time is zero, the system discards any undelivered 00245 // data when you close the socket. If flag is false but 00246 // time is nonzero, the system waits up to time seconds 00247 // to deliver the data. 00248 //////////////////////////////////////////////////////////////////// 00249 void Connection:: 00250 set_linger(bool flag, double time) { 00251 Socket_TCP *tcp; 00252 DCAST_INTO_V(tcp, _socket); 00253 00254 if (flag) { 00255 tcp->SetLinger((int)time); 00256 } else { 00257 tcp->DontLinger(); 00258 } 00259 } 00260 00261 //////////////////////////////////////////////////////////////////// 00262 // Function: Connection::set_reuse_addr 00263 // Access: Published 00264 // Description: Sets whether local address reuse is allowed. 00265 //////////////////////////////////////////////////////////////////// 00266 void Connection:: 00267 set_reuse_addr(bool flag) { 00268 _socket->SetReuseAddress(flag); 00269 } 00270 00271 //////////////////////////////////////////////////////////////////// 00272 // Function: Connection::set_keep_alive 00273 // Access: Published 00274 // Description: Sets whether the connection is periodically tested to 00275 // see if it is still alive. 00276 //////////////////////////////////////////////////////////////////// 00277 void Connection:: 00278 set_keep_alive(bool flag) { 00279 // TODO. 00280 } 00281 00282 //////////////////////////////////////////////////////////////////// 00283 // Function: Connection::set_recv_buffer_size 00284 // Access: Published 00285 // Description: Sets the size of the receive buffer, in bytes. 00286 //////////////////////////////////////////////////////////////////// 00287 void Connection:: 00288 set_recv_buffer_size(int size) { 00289 _socket->SetRecvBufferSize(size); 00290 } 00291 00292 //////////////////////////////////////////////////////////////////// 00293 // Function: Connection::set_send_buffer_size 00294 // Access: Published 00295 // Description: Sets the size of the send buffer, in bytes. 00296 //////////////////////////////////////////////////////////////////// 00297 void Connection:: 00298 set_send_buffer_size(int size) { 00299 Socket_TCP *tcp; 00300 DCAST_INTO_V(tcp, _socket); 00301 00302 tcp->SetSendBufferSize(size); 00303 } 00304 00305 //////////////////////////////////////////////////////////////////// 00306 // Function: Connection::set_ip_time_to_live 00307 // Access: Published 00308 // Description: Sets IP time-to-live. 00309 //////////////////////////////////////////////////////////////////// 00310 void Connection:: 00311 set_ip_time_to_live(int ttl) { 00312 // TODO. 00313 } 00314 00315 //////////////////////////////////////////////////////////////////// 00316 // Function: Connection::set_ip_type_of_service 00317 // Access: Published 00318 // Description: Sets IP type-of-service and precedence. 00319 //////////////////////////////////////////////////////////////////// 00320 void Connection:: 00321 set_ip_type_of_service(int tos) { 00322 // TODO. 00323 } 00324 00325 //////////////////////////////////////////////////////////////////// 00326 // Function: Connection::set_no_delay 00327 // Access: Published 00328 // Description: If flag is true, this disables the Nagle algorithm, 00329 // and prevents delaying of send to coalesce packets. 00330 //////////////////////////////////////////////////////////////////// 00331 void Connection:: 00332 set_no_delay(bool flag) { 00333 Socket_TCP *tcp; 00334 DCAST_INTO_V(tcp, _socket); 00335 00336 tcp->SetNoDelay(flag); 00337 } 00338 00339 //////////////////////////////////////////////////////////////////// 00340 // Function: Connection::set_max_segment 00341 // Access: Published 00342 // Description: Sets the maximum segment size. 00343 //////////////////////////////////////////////////////////////////// 00344 void Connection:: 00345 set_max_segment(int size) { 00346 // TODO. 00347 } 00348 00349 //////////////////////////////////////////////////////////////////// 00350 // Function: Connection::send_datagram 00351 // Access: Private 00352 // Description: This method is intended only to be called by 00353 // ConnectionWriter. It atomically writes the given 00354 // datagram to the socket, returning true on success, 00355 // false on failure. If the socket seems to be closed, 00356 // it notifies the ConnectionManager. 00357 //////////////////////////////////////////////////////////////////// 00358 bool Connection:: 00359 send_datagram(const NetDatagram &datagram, int tcp_header_size) { 00360 nassertr(_socket != (Socket_IP *)NULL, false); 00361 00362 if (_socket->is_exact_type(Socket_UDP::get_class_type())) { 00363 // We have to send UDP right away. 00364 Socket_UDP *udp; 00365 DCAST_INTO_R(udp, _socket, false); 00366 00367 LightReMutexHolder holder(_write_mutex); 00368 DatagramUDPHeader header(datagram); 00369 string data; 00370 data += header.get_header(); 00371 data += datagram.get_message(); 00372 00373 if (net_cat.is_debug()) { 00374 header.verify_datagram(datagram); 00375 } 00376 00377 int bytes_to_send = data.length(); 00378 Socket_Address addr = datagram.get_address().get_addr(); 00379 00380 bool okflag = udp->SendTo(data, addr); 00381 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00382 while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) { 00383 Thread::force_yield(); 00384 okflag = udp->SendTo(data, addr); 00385 } 00386 #endif // SIMPLE_THREADS 00387 00388 if (net_cat.is_spam()) { 00389 net_cat.spam() 00390 << "Sent UDP datagram with " 00391 << bytes_to_send << " bytes to " << (void *)this 00392 << ", ok = " << okflag << "\n"; 00393 } 00394 00395 return check_send_error(okflag); 00396 } 00397 00398 // We might queue up TCP packets for later sending. 00399 if (tcp_header_size == 2 && datagram.get_length() >= 0x10000) { 00400 net_cat.error() 00401 << "Attempt to send TCP datagram of " << datagram.get_length() 00402 << " bytes--too long!\n"; 00403 nassert_raise("Datagram too long"); 00404 return false; 00405 } 00406 00407 DatagramTCPHeader header(datagram, tcp_header_size); 00408 00409 LightReMutexHolder holder(_write_mutex); 00410 _queued_data += header.get_header(); 00411 _queued_data += datagram.get_message(); 00412 _queued_count++; 00413 00414 if (net_cat.is_debug()) { 00415 header.verify_datagram(datagram, tcp_header_size); 00416 } 00417 00418 if (!_collect_tcp || 00419 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) { 00420 return do_flush(); 00421 } 00422 00423 return true; 00424 } 00425 00426 //////////////////////////////////////////////////////////////////// 00427 // Function: Connection::send_raw_datagram 00428 // Access: Private 00429 // Description: This method is intended only to be called by 00430 // ConnectionWriter. It atomically writes the given 00431 // datagram to the socket, without the Datagram header. 00432 //////////////////////////////////////////////////////////////////// 00433 bool Connection:: 00434 send_raw_datagram(const NetDatagram &datagram) { 00435 nassertr(_socket != (Socket_IP *)NULL, false); 00436 00437 if (_socket->is_exact_type(Socket_UDP::get_class_type())) { 00438 // We have to send UDP right away. 00439 Socket_UDP *udp; 00440 DCAST_INTO_R(udp, _socket, false); 00441 00442 string data = datagram.get_message(); 00443 00444 LightReMutexHolder holder(_write_mutex); 00445 Socket_Address addr = datagram.get_address().get_addr(); 00446 bool okflag = udp->SendTo(data, addr); 00447 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00448 while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) { 00449 Thread::force_yield(); 00450 okflag = udp->SendTo(data, addr); 00451 } 00452 #endif // SIMPLE_THREADS 00453 00454 if (net_cat.is_spam()) { 00455 net_cat.spam() 00456 << "Sent UDP datagram with " 00457 << data.size() << " bytes to " << (void *)this 00458 << ", ok = " << okflag << "\n"; 00459 } 00460 00461 return check_send_error(okflag); 00462 } 00463 00464 // We might queue up TCP packets for later sending. 00465 LightReMutexHolder holder(_write_mutex); 00466 _queued_data += datagram.get_message(); 00467 _queued_count++; 00468 00469 if (!_collect_tcp || 00470 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) { 00471 return do_flush(); 00472 } 00473 00474 return true; 00475 } 00476 00477 //////////////////////////////////////////////////////////////////// 00478 // Function: Connection::do_flush 00479 // Access: Private 00480 // Description: The private implementation of flush(), this assumes 00481 // the _write_mutex is already held. 00482 //////////////////////////////////////////////////////////////////// 00483 bool Connection:: 00484 do_flush() { 00485 if (_queued_data.empty()) { 00486 _queued_count = 0; 00487 _queued_data_start = TrueClock::get_global_ptr()->get_short_time(); 00488 return true; 00489 } 00490 00491 if (net_cat.is_spam()) { 00492 net_cat.spam() 00493 << "Sending " << _queued_count << " TCP datagram(s) with " 00494 << _queued_data.length() << " total bytes to " << (void *)this << "\n"; 00495 } 00496 00497 Socket_TCP *tcp; 00498 DCAST_INTO_R(tcp, _socket, false); 00499 00500 string sending_data; 00501 _queued_data.swap(sending_data); 00502 00503 _queued_count = 0; 00504 _queued_data_start = TrueClock::get_global_ptr()->get_short_time(); 00505 00506 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 00507 int max_send = net_max_write_per_epoch; 00508 int data_sent = tcp->SendData(sending_data.data(), min((size_t)max_send, sending_data.size())); 00509 bool okflag = (data_sent == (int)sending_data.size()); 00510 if (!okflag) { 00511 int total_sent = 0; 00512 if (data_sent > 0) { 00513 total_sent += data_sent; 00514 } 00515 double last_report = 0; 00516 while (!okflag && tcp->Active() && 00517 (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) { 00518 if (data_sent == 0) { 00519 Thread::force_yield(); 00520 } else { 00521 Thread::consider_yield(); 00522 } 00523 data_sent = tcp->SendData(sending_data.data() + total_sent, min((size_t)max_send, sending_data.size() - total_sent)); 00524 if (data_sent > 0) { 00525 total_sent += data_sent; 00526 } 00527 okflag = (total_sent == (int)sending_data.size()); 00528 } 00529 } 00530 00531 #else // SIMPLE_THREADS 00532 int data_sent = tcp->SendData(sending_data); 00533 bool okflag = (data_sent == (int)sending_data.size()); 00534 00535 #endif // SIMPLE_THREADS 00536 00537 return check_send_error(okflag); 00538 } 00539 00540 //////////////////////////////////////////////////////////////////// 00541 // Function: Connection::check_send_error 00542 // Access: Private 00543 // Description: Checks the return value of a Send() or SendTo() 00544 // call. 00545 //////////////////////////////////////////////////////////////////// 00546 bool Connection:: 00547 check_send_error(bool okflag) { 00548 if (!okflag) { 00549 static ConfigVariableBool abort_send_error("abort-send-error", false); 00550 if (abort_send_error) { 00551 nassertr(false, false); 00552 } 00553 00554 // Assume any error means the connection has been reset; tell 00555 // our manager about it and ignore it. 00556 if (_manager != (ConnectionManager *)NULL) { 00557 _manager->flush_read_connection(this); 00558 _manager->connection_reset(this, okflag); 00559 } 00560 return false; 00561 } 00562 00563 return true; 00564 }