Panda3D
connection.cxx
1 // Filename: connection.cxx
2 // Created by: jns (07Feb00)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "connection.h"
16 #include "connectionManager.h"
17 #include "netDatagram.h"
18 #include "datagramTCPHeader.h"
19 #include "datagramUDPHeader.h"
20 #include "config_net.h"
21 #include "config_express.h" // for collect_tcp
22 #include "trueClock.h"
23 #include "pnotify.h"
24 #include "lightReMutexHolder.h"
25 #include "socket_ip.h"
26 #include "socket_tcp.h"
27 #include "socket_udp.h"
28 #include "dcast.h"
29 
30 
31 ////////////////////////////////////////////////////////////////////
32 // Function: Connection::Constructor
33 // Access: Published
34 // Description: Creates a connection. Normally this constructor
35 // should not be used directly by user code; use one of
36 // the methods in ConnectionManager to make a new
37 // connection.
38 ////////////////////////////////////////////////////////////////////
41  _manager(manager),
42  _socket(socket)
43 {
44  _collect_tcp = collect_tcp;
45  _collect_tcp_interval = collect_tcp_interval;
46  _queued_data_start = 0.0;
47  _queued_count = 0;
48 
49 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
50  // In the presence of SIMPLE_THREADS, we use non-blocking I/O. We
51  // simulate blocking by yielding the thread.
52  if (_socket->SetNonBlocking() != ALL_OK) {
53  net_cat.warning()
54  << "Unable to set non-blocking status on socket\n";
55  }
56 #endif
57 }
58 
59 ////////////////////////////////////////////////////////////////////
60 // Function: Connection::Destructor
61 // Access: Published
62 // Description: Closes a connection.
63 ////////////////////////////////////////////////////////////////////
66  net_cat.info()
67  << "Deleting connection " << (void *)this << "\n";
68 
69  if (_socket != (Socket_IP *)NULL) {
70  flush();
71 
72  _socket->Close();
73  delete _socket;
74  }
75 }
76 
77 ////////////////////////////////////////////////////////////////////
78 // Function: Connection::get_address
79 // Access: Published
80 // Description: Returns the address bound to this connection, if it
81 // is a TCP connection.
82 ////////////////////////////////////////////////////////////////////
84 get_address() const {
85  Socket_Address addr = _socket->GetPeerName();
86  return NetAddress(addr);
87 }
88 
89 ////////////////////////////////////////////////////////////////////
90 // Function: Connection::get_manager
91 // Access: Published
92 // Description: Returns a pointer to the ConnectionManager object
93 // that serves this connection.
94 ////////////////////////////////////////////////////////////////////
96 get_manager() const {
97  return _manager;
98 }
99 
100 ////////////////////////////////////////////////////////////////////
101 // Function: Connection::get_socket
102 // Access: Published
103 // Description: Returns the internal Socket_IP that defines the
104 // connection.
105 ////////////////////////////////////////////////////////////////////
107 get_socket() const {
108  return _socket;
109 }
110 
111 ////////////////////////////////////////////////////////////////////
112 // Function: Connection::set_collect_tcp
113 // Access: Published
114 // Description: Enables or disables "collect-tcp" mode. In this
115 // mode, individual TCP packets are not sent
116 // immediately, but rather they are collected together
117 // and accumulated to be sent periodically as one larger
118 // TCP packet. This cuts down on overhead from the
119 // TCP/IP protocol, especially if many small packets
120 // need to be sent on the same connection, but it
121 // introduces additional latency (since packets must be
122 // held before they can be sent).
123 //
124 // See set_collect_tcp_interval() to specify the
125 // interval of time for which to hold packets before
126 // sending them.
127 //
128 // If you enable this mode, you may also need to
129 // periodically call consider_flush() to flush the queue
130 // if no packets have been sent recently.
131 ////////////////////////////////////////////////////////////////////
132 void Connection::
133 set_collect_tcp(bool collect_tcp) {
134  _collect_tcp = collect_tcp;
135 }
136 
137 ////////////////////////////////////////////////////////////////////
138 // Function: Connection::get_collect_tcp
139 // Access: Published
140 // Description: Returns the current setting of "collect-tcp" mode.
141 // See set_collect_tcp().
142 ////////////////////////////////////////////////////////////////////
143 bool Connection::
145  return _collect_tcp;
146 }
147 
148 ////////////////////////////////////////////////////////////////////
149 // Function: Connection::set_collect_tcp_interval
150 // Access: Published
151 // Description: Specifies the interval in time, in seconds, for which
152 // to hold TCP packets before sending all of the
153 // recently received packets at once. This only has
154 // meaning if "collect-tcp" mode is enabled; see
155 // set_collect_tcp().
156 ////////////////////////////////////////////////////////////////////
157 void Connection::
158 set_collect_tcp_interval(double interval) {
159  _collect_tcp_interval = interval;
160 }
161 
162 ////////////////////////////////////////////////////////////////////
163 // Function: Connection::get_collect_tcp_interval
164 // Access: Published
165 // Description: Returns the interval in time, in seconds, for which
166 // to hold TCP packets before sending all of the
167 // recently received packets at once. This only has
168 // meaning if "collect-tcp" mode is enabled; see
169 // set_collect_tcp().
170 ////////////////////////////////////////////////////////////////////
171 double Connection::
173  return _collect_tcp_interval;
174 }
175 
176 ////////////////////////////////////////////////////////////////////
177 // Function: Connection::consider_flush
178 // Access: Published
179 // Description: Sends the most recently queued TCP datagram(s) if
180 // enough time has elapsed. This only has meaning if
181 // set_collect_tcp() has been set to true.
182 ////////////////////////////////////////////////////////////////////
183 bool Connection::
185  LightReMutexHolder holder(_write_mutex);
186 
187  if (!_collect_tcp) {
188  return do_flush();
189 
190  } else {
191  double elapsed =
192  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start;
193  // If the elapsed time is negative, someone must have reset the
194  // clock back, so just go ahead and flush.
195  if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) {
196  return do_flush();
197  }
198  }
199 
200  return true;
201 }
202 
203 ////////////////////////////////////////////////////////////////////
204 // Function: Connection::flush
205 // Access: Published
206 // Description: Sends the most recently queued TCP datagram(s) now.
207 // This only has meaning if set_collect_tcp() has been
208 // set to true.
209 ////////////////////////////////////////////////////////////////////
210 bool Connection::
211 flush() {
212  LightReMutexHolder holder(_write_mutex);
213  return do_flush();
214 }
215 
216 /*
217 This method is disabled. We don't provide enough interface to use
218 non-blocking I/O effectively at this level, so we shouldn't provide
219 this call. Specifically, we don't provide a way to query whether an
220 operation failed because it would have blocked or not.
221 
222 ////////////////////////////////////////////////////////////////////
223 // Function: Connection::set_nonblock
224 // Access: Published
225 // Description: Sets whether nonblocking I/O should be in effect.
226 ////////////////////////////////////////////////////////////////////
227 void Connection::
228 set_nonblock(bool flag) {
229  if (flag) {
230  _socket->SetNonBlocking();
231  } else {
232  _socket->SetBlocking();
233  }
234 }
235 */
236 
237 ////////////////////////////////////////////////////////////////////
238 // Function: Connection::set_linger
239 // Access: Published
240 // Description: Sets the time to linger on close if data is present.
241 // If flag is false, when you close a socket with data
242 // available the system attempts to deliver the data to
243 // the peer (the default behavior). If flag is false
244 // but time is zero, the system discards any undelivered
245 // data when you close the socket. If flag is false but
246 // time is nonzero, the system waits up to time seconds
247 // to deliver the data.
248 ////////////////////////////////////////////////////////////////////
249 void Connection::
250 set_linger(bool flag, double time) {
251  Socket_TCP *tcp;
252  DCAST_INTO_V(tcp, _socket);
253 
254  if (flag) {
255  tcp->SetLinger((int)time);
256  } else {
257  tcp->DontLinger();
258  }
259 }
260 
261 ////////////////////////////////////////////////////////////////////
262 // Function: Connection::set_reuse_addr
263 // Access: Published
264 // Description: Sets whether local address reuse is allowed.
265 ////////////////////////////////////////////////////////////////////
266 void Connection::
267 set_reuse_addr(bool flag) {
268  _socket->SetReuseAddress(flag);
269 }
270 
271 ////////////////////////////////////////////////////////////////////
272 // Function: Connection::set_keep_alive
273 // Access: Published
274 // Description: Sets whether the connection is periodically tested to
275 // see if it is still alive.
276 ////////////////////////////////////////////////////////////////////
277 void Connection::
278 set_keep_alive(bool flag) {
279  // TODO.
280 }
281 
282 ////////////////////////////////////////////////////////////////////
283 // Function: Connection::set_recv_buffer_size
284 // Access: Published
285 // Description: Sets the size of the receive buffer, in bytes.
286 ////////////////////////////////////////////////////////////////////
287 void Connection::
289  _socket->SetRecvBufferSize(size);
290 }
291 
292 ////////////////////////////////////////////////////////////////////
293 // Function: Connection::set_send_buffer_size
294 // Access: Published
295 // Description: Sets the size of the send buffer, in bytes.
296 ////////////////////////////////////////////////////////////////////
297 void Connection::
299  Socket_TCP *tcp;
300  DCAST_INTO_V(tcp, _socket);
301 
302  tcp->SetSendBufferSize(size);
303 }
304 
305 ////////////////////////////////////////////////////////////////////
306 // Function: Connection::set_ip_time_to_live
307 // Access: Published
308 // Description: Sets IP time-to-live.
309 ////////////////////////////////////////////////////////////////////
310 void Connection::
312  // TODO.
313 }
314 
315 ////////////////////////////////////////////////////////////////////
316 // Function: Connection::set_ip_type_of_service
317 // Access: Published
318 // Description: Sets IP type-of-service and precedence.
319 ////////////////////////////////////////////////////////////////////
320 void Connection::
322  // TODO.
323 }
324 
325 ////////////////////////////////////////////////////////////////////
326 // Function: Connection::set_no_delay
327 // Access: Published
328 // Description: If flag is true, this disables the Nagle algorithm,
329 // and prevents delaying of send to coalesce packets.
330 ////////////////////////////////////////////////////////////////////
331 void Connection::
332 set_no_delay(bool flag) {
333  Socket_TCP *tcp;
334  DCAST_INTO_V(tcp, _socket);
335 
336  tcp->SetNoDelay(flag);
337 }
338 
339 ////////////////////////////////////////////////////////////////////
340 // Function: Connection::set_max_segment
341 // Access: Published
342 // Description: Sets the maximum segment size.
343 ////////////////////////////////////////////////////////////////////
344 void Connection::
345 set_max_segment(int size) {
346  // TODO.
347 }
348 
349 ////////////////////////////////////////////////////////////////////
350 // Function: Connection::send_datagram
351 // Access: Private
352 // Description: This method is intended only to be called by
353 // ConnectionWriter. It atomically writes the given
354 // datagram to the socket, returning true on success,
355 // false on failure. If the socket seems to be closed,
356 // it notifies the ConnectionManager.
357 ////////////////////////////////////////////////////////////////////
358 bool Connection::
359 send_datagram(const NetDatagram &datagram, int tcp_header_size) {
360  nassertr(_socket != (Socket_IP *)NULL, false);
361 
362  if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
363  // We have to send UDP right away.
364  Socket_UDP *udp;
365  DCAST_INTO_R(udp, _socket, false);
366 
367  LightReMutexHolder holder(_write_mutex);
368  DatagramUDPHeader header(datagram);
369  string data;
370  data += header.get_header();
371  data += datagram.get_message();
372 
373  if (net_cat.is_debug()) {
374  header.verify_datagram(datagram);
375  }
376 
377  int bytes_to_send = data.length();
378  Socket_Address addr = datagram.get_address().get_addr();
379 
380  bool okflag = udp->SendTo(data, addr);
381 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
382  while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
384  okflag = udp->SendTo(data, addr);
385  }
386 #endif // SIMPLE_THREADS
387 
388  if (net_cat.is_spam()) {
389  net_cat.spam()
390  << "Sent UDP datagram with "
391  << bytes_to_send << " bytes to " << (void *)this
392  << ", ok = " << okflag << "\n";
393  }
394 
395  return check_send_error(okflag);
396  }
397 
398  // We might queue up TCP packets for later sending.
399  if (tcp_header_size == 2 && datagram.get_length() >= 0x10000) {
400  net_cat.error()
401  << "Attempt to send TCP datagram of " << datagram.get_length()
402  << " bytes--too long!\n";
403  nassert_raise("Datagram too long");
404  return false;
405  }
406 
407  DatagramTCPHeader header(datagram, tcp_header_size);
408 
409  LightReMutexHolder holder(_write_mutex);
410  _queued_data += header.get_header();
411  _queued_data += datagram.get_message();
412  _queued_count++;
413 
414  if (net_cat.is_debug()) {
415  header.verify_datagram(datagram, tcp_header_size);
416  }
417 
418  if (!_collect_tcp ||
419  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
420  return do_flush();
421  }
422 
423  return true;
424 }
425 
426 ////////////////////////////////////////////////////////////////////
427 // Function: Connection::send_raw_datagram
428 // Access: Private
429 // Description: This method is intended only to be called by
430 // ConnectionWriter. It atomically writes the given
431 // datagram to the socket, without the Datagram header.
432 ////////////////////////////////////////////////////////////////////
433 bool Connection::
434 send_raw_datagram(const NetDatagram &datagram) {
435  nassertr(_socket != (Socket_IP *)NULL, false);
436 
437  if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
438  // We have to send UDP right away.
439  Socket_UDP *udp;
440  DCAST_INTO_R(udp, _socket, false);
441 
442  string data = datagram.get_message();
443 
444  LightReMutexHolder holder(_write_mutex);
445  Socket_Address addr = datagram.get_address().get_addr();
446  bool okflag = udp->SendTo(data, addr);
447 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
448  while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
450  okflag = udp->SendTo(data, addr);
451  }
452 #endif // SIMPLE_THREADS
453 
454  if (net_cat.is_spam()) {
455  net_cat.spam()
456  << "Sent UDP datagram with "
457  << data.size() << " bytes to " << (void *)this
458  << ", ok = " << okflag << "\n";
459  }
460 
461  return check_send_error(okflag);
462  }
463 
464  // We might queue up TCP packets for later sending.
465  LightReMutexHolder holder(_write_mutex);
466  _queued_data += datagram.get_message();
467  _queued_count++;
468 
469  if (!_collect_tcp ||
470  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
471  return do_flush();
472  }
473 
474  return true;
475 }
476 
477 ////////////////////////////////////////////////////////////////////
478 // Function: Connection::do_flush
479 // Access: Private
480 // Description: The private implementation of flush(), this assumes
481 // the _write_mutex is already held.
482 ////////////////////////////////////////////////////////////////////
483 bool Connection::
484 do_flush() {
485  if (_queued_data.empty()) {
486  _queued_count = 0;
487  _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
488  return true;
489  }
490 
491  if (net_cat.is_spam()) {
492  net_cat.spam()
493  << "Sending " << _queued_count << " TCP datagram(s) with "
494  << _queued_data.length() << " total bytes to " << (void *)this << "\n";
495  }
496 
497  Socket_TCP *tcp;
498  DCAST_INTO_R(tcp, _socket, false);
499 
500  string sending_data;
501  _queued_data.swap(sending_data);
502 
503  _queued_count = 0;
504  _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
505 
506 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
507  int max_send = net_max_write_per_epoch;
508  int data_sent = tcp->SendData(sending_data.data(), min((size_t)max_send, sending_data.size()));
509  bool okflag = (data_sent == (int)sending_data.size());
510  if (!okflag) {
511  int total_sent = 0;
512  if (data_sent > 0) {
513  total_sent += data_sent;
514  }
515  double last_report = 0;
516  while (!okflag && tcp->Active() &&
517  (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
518  if (data_sent == 0) {
520  } else {
522  }
523  data_sent = tcp->SendData(sending_data.data() + total_sent, min((size_t)max_send, sending_data.size() - total_sent));
524  if (data_sent > 0) {
525  total_sent += data_sent;
526  }
527  okflag = (total_sent == (int)sending_data.size());
528  }
529  }
530 
531 #else // SIMPLE_THREADS
532  int data_sent = tcp->SendData(sending_data);
533  bool okflag = (data_sent == (int)sending_data.size());
534 
535 #endif // SIMPLE_THREADS
536 
537  return check_send_error(okflag);
538 }
539 
540 ////////////////////////////////////////////////////////////////////
541 // Function: Connection::check_send_error
542 // Access: Private
543 // Description: Checks the return value of a Send() or SendTo()
544 // call.
545 ////////////////////////////////////////////////////////////////////
546 bool Connection::
547 check_send_error(bool okflag) {
548  if (!okflag) {
549  static ConfigVariableBool abort_send_error("abort-send-error", false);
550  if (abort_send_error) {
551  nassertr(false, false);
552  }
553 
554  // Assume any error means the connection has been reset; tell
555  // our manager about it and ignore it.
556  if (_manager != (ConnectionManager *)NULL) {
557  _manager->flush_read_connection(this);
558  _manager->connection_reset(this, okflag);
559  }
560  return false;
561  }
562 
563  return true;
564 }
int SetRecvBufferSize(int size)
Ok it sets the recv buffer size for both tcp and UDP.
Definition: socket_ip.h:171
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:81
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:15
bool is_exact_type(TypeHandle handle) const
Returns true if the current object is the indicated type exactly.
Definition: typedObject.I:74
void set_keep_alive(bool flag)
Sets whether the connection is periodically tested to see if it is still alive.
Definition: connection.cxx:278
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:43
bool verify_datagram(const NetDatagram &datagram) const
Verifies that the indicated datagram has the appropriate length and checksum.
Base functionality for a INET domain Socket this call should be the starting point for all other unix...
Definition: socket_ip.h:34
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
int SetNonBlocking()
this function will throw a socket into non-blocking mode
Definition: socket_ip.h:183
This is a convenience class to specialize ConfigVariable as a boolean type.
void set_ip_type_of_service(int tos)
Sets IP type-of-service and precedence.
Definition: connection.cxx:321
The primary interface to the low-level networking layer in this package.
~Connection()
Closes a connection.
Definition: connection.cxx:65
int DontLinger()
Turn off the linger flag.
Definition: socket_tcp.h:102
void set_linger(bool flag, double time)
Sets the time to linger on close if data is present.
Definition: connection.cxx:250
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:263
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:257
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:105
void set_max_segment(int size)
Sets the maximum segment size.
Definition: connection.cxx:345
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:248
void set_ip_time_to_live(int ttl)
Sets IP time-to-live.
Definition: connection.cxx:311
Connection(ConnectionManager *manager, Socket_IP *socket)
Creates a connection.
Definition: connection.cxx:40
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent...
bool get_collect_tcp() const
Returns the current setting of "collect-tcp" mode.
Definition: connection.cxx:144
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:28
Socket_IP * get_socket() const
Returns the internal Socket_IP that defines the connection.
Definition: connection.cxx:107
void set_recv_buffer_size(int size)
Sets the size of the receive buffer, in bytes.
Definition: connection.cxx:288
void set_collect_tcp(bool collect_tcp)
Enables or disables "collect-tcp" mode.
Definition: connection.cxx:133
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this connection.
Definition: connection.cxx:96
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
double get_collect_tcp_interval() const
Returns the interval in time, in seconds, for which to hold TCP packets before sending all of the rec...
Definition: connection.cxx:172
void set_reuse_addr(bool flag)
Sets whether local address reuse is allowed.
Definition: connection.cxx:267
void set_no_delay(bool flag)
If flag is true, this disables the Nagle algorithm, and prevents delaying of send to coalesce packets...
Definition: connection.cxx:332
bool consider_flush()
Sends the most recently queued TCP datagram(s) if enough time has elapsed.
Definition: connection.cxx:184
A simple place to store and munipulate tcp and port address for communication layer.
Similar to MutexHolder, but for a light reentrant mutex.
NetAddress get_address() const
Returns the address bound to this connection, if it is a TCP connection.
Definition: connection.cxx:84
int SetLinger(int interval_seconds=0)
will control the behavior of SO_LINGER for a TCP socket
Definition: socket_tcp.h:84
static int GetLastError()
gets the last errcode from a socket operation
Definition: socket_ip.h:152
void set_collect_tcp_interval(double interval)
Specifies the interval in time, in seconds, for which to hold TCP packets before sending all of the r...
Definition: connection.cxx:158
bool SetReuseAddress(bool flag=true)
Informs a socket to reuse IP address as needed.
Definition: socket_ip.h:228
void Close()
closes a socket if it is open (allocated)
Definition: socket_ip.h:141
string get_header() const
Returns a pointer to a block of data of length datagram_tcp_header_size, which can be written to the ...
void set_send_buffer_size(int size)
Sets the size of the send buffer, in bytes.
Definition: connection.cxx:298
string get_header() const
Returns a pointer to a block of data of length datagram_udp_header_size, which can be written to the ...
const Socket_Address & get_addr() const
Returns the Socket_Address for this address.
Definition: netAddress.cxx:163
bool SendTo(const char *data, int len, const Socket_Address &address)
Send data to specified address.
Definition: socket_udp.h:147
string get_message() const
Returns the datagram&#39;s data as a string.
Definition: datagram.I:431
bool verify_datagram(const NetDatagram &datagram, int header_size) const
Verifies that the indicated datagram has the appropriate length.
int SetSendBufferSize(int insize)
Just like it sounds.
Definition: socket_tcp.h:119
bool flush()
Sends the most recently queued TCP datagram(s) now.
Definition: connection.cxx:211
int SetNoDelay(bool flag=true)
Disable Nagle algorithm.
Definition: socket_tcp.h:68
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:457
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound...
Definition: netAddress.h:27