Panda3D

connection.cxx

00001 // Filename: connection.cxx
00002 // Created by:  jns (07Feb00)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "connection.h"
00016 #include "connectionManager.h"
00017 #include "netDatagram.h"
00018 #include "datagramTCPHeader.h"
00019 #include "datagramUDPHeader.h"
00020 #include "config_net.h"
00021 #include "config_express.h" // for collect_tcp
00022 #include "trueClock.h"
00023 #include "pnotify.h"
00024 #include "lightReMutexHolder.h"
00025 #include "socket_ip.h"
00026 #include "socket_tcp.h"
00027 #include "socket_udp.h"
00028 #include "dcast.h"
00029 
00030 
00031 ////////////////////////////////////////////////////////////////////
00032 //     Function: Connection::Constructor
00033 //       Access: Published
00034 //  Description: Creates a connection.  Normally this constructor
00035 //               should not be used directly by user code; use one of
00036 //               the methods in ConnectionManager to make a new
00037 //               connection.
00038 ////////////////////////////////////////////////////////////////////
00039 Connection::
00040 Connection(ConnectionManager *manager, Socket_IP *socket) :
00041   _manager(manager),
00042   _socket(socket)
00043 {
00044   _collect_tcp = collect_tcp;
00045   _collect_tcp_interval = collect_tcp_interval;
00046   _queued_data_start = 0.0;
00047   _queued_count = 0;
00048 
00049 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00050   // In the presence of SIMPLE_THREADS, we use non-blocking I/O.  We
00051   // simulate blocking by yielding the thread.
00052   if (_socket->SetNonBlocking() != ALL_OK) {
00053     net_cat.warning()
00054       << "Unable to set non-blocking status on socket\n";
00055   }
00056 #endif
00057 }
00058 
00059 ////////////////////////////////////////////////////////////////////
00060 //     Function: Connection::Destructor
00061 //       Access: Published
00062 //  Description: Closes a connection.
00063 ////////////////////////////////////////////////////////////////////
00064 Connection::
00065 ~Connection() {
00066   net_cat.info()
00067     << "Deleting connection " << (void *)this << "\n";
00068 
00069   if (_socket != (Socket_IP *)NULL) {
00070     flush();
00071 
00072     _socket->Close();
00073     delete _socket;
00074   }
00075 }
00076 
00077 ////////////////////////////////////////////////////////////////////
00078 //     Function: Connection::get_address
00079 //       Access: Published
00080 //  Description: Returns the address bound to this connection, if it
00081 //               is a TCP connection.
00082 ////////////////////////////////////////////////////////////////////
00083 NetAddress Connection::
00084 get_address() const {
00085   Socket_Address addr = _socket->GetPeerName();
00086   return NetAddress(addr);
00087 }
00088 
00089 ////////////////////////////////////////////////////////////////////
00090 //     Function: Connection::get_manager
00091 //       Access: Published
00092 //  Description: Returns a pointer to the ConnectionManager object
00093 //               that serves this connection.
00094 ////////////////////////////////////////////////////////////////////
00095 ConnectionManager *Connection::
00096 get_manager() const {
00097   return _manager;
00098 }
00099 
00100 ////////////////////////////////////////////////////////////////////
00101 //     Function: Connection::get_socket
00102 //       Access: Published
00103 //  Description: Returns the internal Socket_IP that defines the
00104 //               connection.
00105 ////////////////////////////////////////////////////////////////////
00106 Socket_IP *Connection::
00107 get_socket() const {
00108   return _socket;
00109 }
00110 
00111 ////////////////////////////////////////////////////////////////////
00112 //     Function: Connection::set_collect_tcp
00113 //       Access: Published
00114 //  Description: Enables or disables "collect-tcp" mode.  In this
00115 //               mode, individual TCP packets are not sent
00116 //               immediately, but rather they are collected together
00117 //               and accumulated to be sent periodically as one larger
00118 //               TCP packet.  This cuts down on overhead from the
00119 //               TCP/IP protocol, especially if many small packets
00120 //               need to be sent on the same connection, but it
00121 //               introduces additional latency (since packets must be
00122 //               held before they can be sent).
00123 //
00124 //               See set_collect_tcp_interval() to specify the
00125 //               interval of time for which to hold packets before
00126 //               sending them.
00127 //
00128 //               If you enable this mode, you may also need to
00129 //               periodically call consider_flush() to flush the queue
00130 //               if no packets have been sent recently.
00131 ////////////////////////////////////////////////////////////////////
00132 void Connection::
00133 set_collect_tcp(bool collect_tcp) {
00134   _collect_tcp = collect_tcp;
00135 }
00136 
00137 ////////////////////////////////////////////////////////////////////
00138 //     Function: Connection::get_collect_tcp
00139 //       Access: Published
00140 //  Description: Returns the current setting of "collect-tcp" mode.
00141 //               See set_collect_tcp().
00142 ////////////////////////////////////////////////////////////////////
00143 bool Connection::
00144 get_collect_tcp() const {
00145   return _collect_tcp;
00146 }
00147 
00148 ////////////////////////////////////////////////////////////////////
00149 //     Function: Connection::set_collect_tcp_interval
00150 //       Access: Published
00151 //  Description: Specifies the interval in time, in seconds, for which
00152 //               to hold TCP packets before sending all of the
00153 //               recently received packets at once.  This only has
00154 //               meaning if "collect-tcp" mode is enabled; see
00155 //               set_collect_tcp().
00156 ////////////////////////////////////////////////////////////////////
00157 void Connection::
00158 set_collect_tcp_interval(double interval) {
00159   _collect_tcp_interval = interval;
00160 }
00161 
00162 ////////////////////////////////////////////////////////////////////
00163 //     Function: Connection::get_collect_tcp_interval
00164 //       Access: Published
00165 //  Description: Returns the interval in time, in seconds, for which
00166 //               to hold TCP packets before sending all of the
00167 //               recently received packets at once.  This only has
00168 //               meaning if "collect-tcp" mode is enabled; see
00169 //               set_collect_tcp().
00170 ////////////////////////////////////////////////////////////////////
00171 double Connection::
00172 get_collect_tcp_interval() const {
00173   return _collect_tcp_interval;
00174 }
00175 
00176 ////////////////////////////////////////////////////////////////////
00177 //     Function: Connection::consider_flush
00178 //       Access: Published
00179 //  Description: Sends the most recently queued TCP datagram(s) if
00180 //               enough time has elapsed.  This only has meaning if
00181 //               set_collect_tcp() has been set to true.
00182 ////////////////////////////////////////////////////////////////////
00183 bool Connection::
00184 consider_flush() {
00185   LightReMutexHolder holder(_write_mutex);
00186 
00187   if (!_collect_tcp) {
00188     return do_flush();
00189 
00190   } else {
00191     double elapsed = 
00192       TrueClock::get_global_ptr()->get_short_time() - _queued_data_start;
00193     // If the elapsed time is negative, someone must have reset the
00194     // clock back, so just go ahead and flush.
00195     if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) {
00196       return do_flush();
00197     }
00198   }
00199 
00200   return true;
00201 }
00202 
00203 ////////////////////////////////////////////////////////////////////
00204 //     Function: Connection::flush
00205 //       Access: Published
00206 //  Description: Sends the most recently queued TCP datagram(s) now.
00207 //               This only has meaning if set_collect_tcp() has been
00208 //               set to true.
00209 ////////////////////////////////////////////////////////////////////
00210 bool Connection::
00211 flush() {
00212   LightReMutexHolder holder(_write_mutex);
00213   return do_flush();
00214 }
00215 
00216 /*
00217 This method is disabled.  We don't provide enough interface to use
00218 non-blocking I/O effectively at this level, so we shouldn't provide
00219 this call.  Specifically, we don't provide a way to query whether an
00220 operation failed because it would have blocked or not.
00221 
00222 ////////////////////////////////////////////////////////////////////
00223 //     Function: Connection::set_nonblock
00224 //       Access: Published
00225 //  Description: Sets whether nonblocking I/O should be in effect.
00226 ////////////////////////////////////////////////////////////////////
00227 void Connection::
00228 set_nonblock(bool flag) {
00229   if (flag) {
00230     _socket->SetNonBlocking();
00231   } else {
00232     _socket->SetBlocking();
00233   }
00234 }
00235 */
00236 
00237 ////////////////////////////////////////////////////////////////////
00238 //     Function: Connection::set_linger
00239 //       Access: Published
00240 //  Description: Sets the time to linger on close if data is present.
00241 //               If flag is false, when you close a socket with data
00242 //               available the system attempts to deliver the data to
00243 //               the peer (the default behavior).  If flag is false
00244 //               but time is zero, the system discards any undelivered
00245 //               data when you close the socket.  If flag is false but
00246 //               time is nonzero, the system waits up to time seconds
00247 //               to deliver the data.
00248 ////////////////////////////////////////////////////////////////////
00249 void Connection::
00250 set_linger(bool flag, double time) {
00251   Socket_TCP *tcp;
00252   DCAST_INTO_V(tcp, _socket);
00253 
00254   if (flag) {
00255     tcp->SetLinger((int)time);
00256   } else {
00257     tcp->DontLinger();
00258   }
00259 }
00260 
00261 ////////////////////////////////////////////////////////////////////
00262 //     Function: Connection::set_reuse_addr
00263 //       Access: Published
00264 //  Description: Sets whether local address reuse is allowed.
00265 ////////////////////////////////////////////////////////////////////
00266 void Connection::
00267 set_reuse_addr(bool flag) {
00268   _socket->SetReuseAddress(flag);
00269 }
00270 
00271 ////////////////////////////////////////////////////////////////////
00272 //     Function: Connection::set_keep_alive
00273 //       Access: Published
00274 //  Description: Sets whether the connection is periodically tested to
00275 //               see if it is still alive.
00276 ////////////////////////////////////////////////////////////////////
00277 void Connection::
00278 set_keep_alive(bool flag) {
00279   // TODO.
00280 }
00281 
00282 ////////////////////////////////////////////////////////////////////
00283 //     Function: Connection::set_recv_buffer_size
00284 //       Access: Published
00285 //  Description: Sets the size of the receive buffer, in bytes.
00286 ////////////////////////////////////////////////////////////////////
00287 void Connection::
00288 set_recv_buffer_size(int size) {
00289   _socket->SetRecvBufferSize(size);
00290 }
00291 
00292 ////////////////////////////////////////////////////////////////////
00293 //     Function: Connection::set_send_buffer_size
00294 //       Access: Published
00295 //  Description: Sets the size of the send buffer, in bytes.
00296 ////////////////////////////////////////////////////////////////////
00297 void Connection::
00298 set_send_buffer_size(int size) {
00299   Socket_TCP *tcp;
00300   DCAST_INTO_V(tcp, _socket);
00301 
00302   tcp->SetSendBufferSize(size);
00303 }
00304 
00305 ////////////////////////////////////////////////////////////////////
00306 //     Function: Connection::set_ip_time_to_live
00307 //       Access: Published
00308 //  Description: Sets IP time-to-live.
00309 ////////////////////////////////////////////////////////////////////
00310 void Connection::
00311 set_ip_time_to_live(int ttl) {
00312   // TODO.
00313 }
00314 
00315 ////////////////////////////////////////////////////////////////////
00316 //     Function: Connection::set_ip_type_of_service
00317 //       Access: Published
00318 //  Description: Sets IP type-of-service and precedence.
00319 ////////////////////////////////////////////////////////////////////
00320 void Connection::
00321 set_ip_type_of_service(int tos) {
00322   // TODO.
00323 }
00324 
00325 ////////////////////////////////////////////////////////////////////
00326 //     Function: Connection::set_no_delay
00327 //       Access: Published
00328 //  Description: If flag is true, this disables the Nagle algorithm,
00329 //               and prevents delaying of send to coalesce packets.
00330 ////////////////////////////////////////////////////////////////////
00331 void Connection::
00332 set_no_delay(bool flag) {
00333   Socket_TCP *tcp;
00334   DCAST_INTO_V(tcp, _socket);
00335 
00336   tcp->SetNoDelay(flag);
00337 }
00338 
00339 ////////////////////////////////////////////////////////////////////
00340 //     Function: Connection::set_max_segment
00341 //       Access: Published
00342 //  Description: Sets the maximum segment size.
00343 ////////////////////////////////////////////////////////////////////
00344 void Connection::
00345 set_max_segment(int size) {
00346   // TODO.
00347 }
00348 
00349 ////////////////////////////////////////////////////////////////////
00350 //     Function: Connection::send_datagram
00351 //       Access: Private
00352 //  Description: This method is intended only to be called by
00353 //               ConnectionWriter.  It atomically writes the given
00354 //               datagram to the socket, returning true on success,
00355 //               false on failure.  If the socket seems to be closed,
00356 //               it notifies the ConnectionManager.
00357 ////////////////////////////////////////////////////////////////////
00358 bool Connection::
00359 send_datagram(const NetDatagram &datagram, int tcp_header_size) {
00360   nassertr(_socket != (Socket_IP *)NULL, false);
00361 
00362   if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
00363     // We have to send UDP right away.
00364     Socket_UDP *udp;
00365     DCAST_INTO_R(udp, _socket, false);
00366 
00367     LightReMutexHolder holder(_write_mutex);
00368     DatagramUDPHeader header(datagram);
00369     string data;
00370     data += header.get_header();
00371     data += datagram.get_message();
00372     
00373     if (net_cat.is_debug()) {
00374       header.verify_datagram(datagram);
00375     }
00376     
00377     int bytes_to_send = data.length();
00378     Socket_Address addr = datagram.get_address().get_addr();
00379 
00380     bool okflag = udp->SendTo(data, addr);
00381 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00382     while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
00383       Thread::force_yield();
00384       okflag = udp->SendTo(data, addr);
00385     }
00386 #endif  // SIMPLE_THREADS
00387       
00388     if (net_cat.is_spam()) {
00389       net_cat.spam()
00390         << "Sent UDP datagram with " 
00391         << bytes_to_send << " bytes to " << (void *)this 
00392         << ", ok = " << okflag << "\n";
00393     }
00394       
00395     return check_send_error(okflag);
00396   }
00397 
00398   // We might queue up TCP packets for later sending.
00399   if (tcp_header_size == 2 && datagram.get_length() >= 0x10000) {
00400     net_cat.error()
00401       << "Attempt to send TCP datagram of " << datagram.get_length()
00402       << " bytes--too long!\n";
00403     nassert_raise("Datagram too long");
00404     return false;
00405   }
00406 
00407   DatagramTCPHeader header(datagram, tcp_header_size);
00408 
00409   LightReMutexHolder holder(_write_mutex);
00410   _queued_data += header.get_header();
00411   _queued_data += datagram.get_message();
00412   _queued_count++;
00413   
00414   if (net_cat.is_debug()) {
00415     header.verify_datagram(datagram, tcp_header_size);
00416   }
00417 
00418   if (!_collect_tcp || 
00419       TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
00420     return do_flush();
00421   }
00422 
00423   return true;
00424 }
00425 
00426 ////////////////////////////////////////////////////////////////////
00427 //     Function: Connection::send_raw_datagram
00428 //       Access: Private
00429 //  Description: This method is intended only to be called by
00430 //               ConnectionWriter.  It atomically writes the given
00431 //               datagram to the socket, without the Datagram header.
00432 ////////////////////////////////////////////////////////////////////
00433 bool Connection::
00434 send_raw_datagram(const NetDatagram &datagram) {
00435   nassertr(_socket != (Socket_IP *)NULL, false);
00436 
00437   if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
00438     // We have to send UDP right away.
00439     Socket_UDP *udp;
00440     DCAST_INTO_R(udp, _socket, false);
00441 
00442     string data = datagram.get_message();
00443 
00444     LightReMutexHolder holder(_write_mutex);
00445     Socket_Address addr = datagram.get_address().get_addr();
00446     bool okflag = udp->SendTo(data, addr);
00447 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00448     while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
00449       Thread::force_yield();
00450       okflag = udp->SendTo(data, addr);
00451     }
00452 #endif  // SIMPLE_THREADS
00453     
00454     if (net_cat.is_spam()) {
00455       net_cat.spam()
00456         << "Sent UDP datagram with " 
00457         << data.size() << " bytes to " << (void *)this 
00458         << ", ok = " << okflag << "\n";
00459     }
00460 
00461     return check_send_error(okflag);
00462   }
00463 
00464   // We might queue up TCP packets for later sending.
00465   LightReMutexHolder holder(_write_mutex);
00466   _queued_data += datagram.get_message();
00467   _queued_count++;
00468 
00469   if (!_collect_tcp || 
00470       TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
00471     return do_flush();
00472   }
00473 
00474   return true;
00475 }
00476 
00477 ////////////////////////////////////////////////////////////////////
00478 //     Function: Connection::do_flush
00479 //       Access: Private
00480 //  Description: The private implementation of flush(), this assumes
00481 //               the _write_mutex is already held.
00482 ////////////////////////////////////////////////////////////////////
00483 bool Connection::
00484 do_flush() {
00485   if (_queued_data.empty()) {
00486     _queued_count = 0;
00487     _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
00488     return true;
00489   }
00490 
00491   if (net_cat.is_spam()) {
00492     net_cat.spam()
00493       << "Sending " << _queued_count << " TCP datagram(s) with " 
00494       << _queued_data.length() << " total bytes to " << (void *)this << "\n";
00495   }
00496 
00497   Socket_TCP *tcp;
00498   DCAST_INTO_R(tcp, _socket, false);
00499 
00500   string sending_data;
00501   _queued_data.swap(sending_data);
00502 
00503   _queued_count = 0;
00504   _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
00505 
00506 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
00507   int max_send = net_max_write_per_epoch;
00508   int data_sent = tcp->SendData(sending_data.data(), min((size_t)max_send, sending_data.size()));
00509   bool okflag = (data_sent == (int)sending_data.size());
00510   if (!okflag) {
00511     int total_sent = 0;
00512     if (data_sent > 0) {
00513       total_sent += data_sent;
00514     }
00515     double last_report = 0;
00516     while (!okflag && tcp->Active() &&
00517            (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
00518       if (data_sent == 0) {
00519         Thread::force_yield();
00520       } else {
00521         Thread::consider_yield();
00522       }
00523       data_sent = tcp->SendData(sending_data.data() + total_sent, min((size_t)max_send, sending_data.size() - total_sent));
00524       if (data_sent > 0) {
00525         total_sent += data_sent;
00526       }
00527       okflag = (total_sent == (int)sending_data.size());
00528     }
00529   }
00530 
00531 #else  // SIMPLE_THREADS
00532   int data_sent = tcp->SendData(sending_data);
00533   bool okflag = (data_sent == (int)sending_data.size());
00534 
00535 #endif  // SIMPLE_THREADS
00536 
00537   return check_send_error(okflag);
00538 }
00539 
00540 ////////////////////////////////////////////////////////////////////
00541 //     Function: Connection::check_send_error
00542 //       Access: Private
00543 //  Description: Checks the return value of a Send() or SendTo()
00544 //               call.
00545 ////////////////////////////////////////////////////////////////////
00546 bool Connection::
00547 check_send_error(bool okflag) {
00548   if (!okflag) {
00549     static ConfigVariableBool abort_send_error("abort-send-error", false);
00550     if (abort_send_error) {
00551       nassertr(false, false);
00552     }
00553 
00554     // Assume any error means the connection has been reset; tell
00555     // our manager about it and ignore it.
00556     if (_manager != (ConnectionManager *)NULL) {
00557       _manager->flush_read_connection(this);
00558       _manager->connection_reset(this, okflag);
00559     }
00560     return false;
00561   }
00562 
00563   return true;
00564 }
 All Classes Functions Variables Enumerations