Panda3D
connection.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file connection.cxx
10  * @author jns
11  * @date 2000-02-07
12  */
13 
14 #include "connection.h"
15 #include "connectionManager.h"
16 #include "netDatagram.h"
17 #include "datagramTCPHeader.h"
18 #include "datagramUDPHeader.h"
19 #include "config_net.h"
20 #include "config_express.h" // for collect_tcp
21 #include "trueClock.h"
22 #include "pnotify.h"
23 #include "lightReMutexHolder.h"
24 #include "socket_ip.h"
25 #include "socket_tcp.h"
26 #include "socket_udp.h"
27 #include "dcast.h"
28 
29 
30 /**
31  * Creates a connection. Normally this constructor should not be used
32  * directly by user code; use one of the methods in ConnectionManager to make
33  * a new connection.
34  */
36 Connection(ConnectionManager *manager, Socket_IP *socket) :
37  _manager(manager),
38  _socket(socket)
39 {
40  _collect_tcp = collect_tcp;
41  _collect_tcp_interval = collect_tcp_interval;
42  _queued_data_start = 0.0;
43  _queued_count = 0;
44 
45 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
46  // In the presence of SIMPLE_THREADS, we use non-blocking IO. We simulate
47  // blocking by yielding the thread.
48  if (_socket->SetNonBlocking() != ALL_OK) {
49  net_cat.warning()
50  << "Unable to set non-blocking status on socket\n";
51  }
52 #endif
53 }
54 
55 /**
56  * Closes a connection.
57  */
59 ~Connection() {
60  net_cat.info()
61  << "Deleting connection " << (void *)this << "\n";
62 
63  if (_socket != nullptr) {
64  flush();
65 
66  _socket->Close();
67  delete _socket;
68  }
69 }
70 
71 /**
72  * Returns the address bound to this connection, if it is a TCP connection.
73  */
75 get_address() const {
76  Socket_Address addr = _socket->GetPeerName();
77  return NetAddress(addr);
78 }
79 
80 /**
81  * Returns a pointer to the ConnectionManager object that serves this
82  * connection.
83  */
85 get_manager() const {
86  return _manager;
87 }
88 
89 /**
90  * Returns the internal Socket_IP that defines the connection.
91  */
93 get_socket() const {
94  return _socket;
95 }
96 
97 /**
98  * Enables or disables "collect-tcp" mode. In this mode, individual TCP
99  * packets are not sent immediately, but rather they are collected together
100  * and accumulated to be sent periodically as one larger TCP packet. This
101  * cuts down on overhead from the TCP/IP protocol, especially if many small
102  * packets need to be sent on the same connection, but it introduces
103  * additional latency (since packets must be held before they can be sent).
104  *
105  * See set_collect_tcp_interval() to specify the interval of time for which to
106  * hold packets before sending them.
107  *
108  * If you enable this mode, you may also need to periodically call
109  * consider_flush() to flush the queue if no packets have been sent recently.
110  */
112 set_collect_tcp(bool collect_tcp) {
113  _collect_tcp = collect_tcp;
114 }
115 
116 /**
117  * Returns the current setting of "collect-tcp" mode. See set_collect_tcp().
118  */
120 get_collect_tcp() const {
121  return _collect_tcp;
122 }
123 
124 /**
125  * Specifies the interval in time, in seconds, for which to hold TCP packets
126  * before sending all of the recently received packets at once. This only has
127  * meaning if "collect-tcp" mode is enabled; see set_collect_tcp().
128  */
130 set_collect_tcp_interval(double interval) {
131  _collect_tcp_interval = interval;
132 }
133 
134 /**
135  * Returns the interval in time, in seconds, for which to hold TCP packets
136  * before sending all of the recently received packets at once. This only has
137  * meaning if "collect-tcp" mode is enabled; see set_collect_tcp().
138  */
140 get_collect_tcp_interval() const {
141  return _collect_tcp_interval;
142 }
143 
144 /**
145  * Sends the most recently queued TCP datagram(s) if enough time has elapsed.
146  * This only has meaning if set_collect_tcp() has been set to true.
147  */
149 consider_flush() {
150  LightReMutexHolder holder(_write_mutex);
151 
152  if (!_collect_tcp) {
153  return do_flush();
154 
155  } else {
156  double elapsed =
157  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start;
158  // If the elapsed time is negative, someone must have reset the clock
159  // back, so just go ahead and flush.
160  if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) {
161  return do_flush();
162  }
163  }
164 
165  return true;
166 }
167 
168 /**
169  * Sends the most recently queued TCP datagram(s) now. This only has meaning
170  * if set_collect_tcp() has been set to true.
171  */
173 flush() {
174  LightReMutexHolder holder(_write_mutex);
175  return do_flush();
176 }
177 
178 
179 /**
180  * Sets whether nonblocking I/O should be in effect.
181  */
182 /*
183 This method is disabled. We don't provide enough interface to use
184 non-blocking I/O effectively at this level, so we shouldn't provide
185 this call. Specifically, we don't provide a way to query whether an
186 operation failed because it would have blocked or not.
187 void Connection::
188 set_nonblock(bool flag) {
189  if (flag) {
190  _socket->SetNonBlocking();
191  } else {
192  _socket->SetBlocking();
193  }
194 }
195 */
196 
197 /**
198  * Sets the time to linger on close if data is present. If flag is false,
199  * when you close a socket with data available the system attempts to deliver
200  * the data to the peer (the default behavior). If flag is false but time is
201  * zero, the system discards any undelivered data when you close the socket.
202  * If flag is false but time is nonzero, the system waits up to time seconds
203  * to deliver the data.
204  */
206 set_linger(bool flag, double time) {
207  Socket_TCP *tcp;
208  DCAST_INTO_V(tcp, _socket);
209 
210  if (flag) {
211  tcp->SetLinger((int)time);
212  } else {
213  tcp->DontLinger();
214  }
215 }
216 
217 /**
218  * Sets whether local address reuse is allowed.
219  */
221 set_reuse_addr(bool flag) {
222  _socket->SetReuseAddress(flag);
223 }
224 
225 /**
226  * Sets whether the connection is periodically tested to see if it is still
227  * alive.
228  */
230 set_keep_alive(bool flag) {
231  // TODO.
232 }
233 
234 /**
235  * Sets the size of the receive buffer, in bytes.
236  */
238 set_recv_buffer_size(int size) {
239  _socket->SetRecvBufferSize(size);
240 }
241 
242 /**
243  * Sets the size of the send buffer, in bytes.
244  */
246 set_send_buffer_size(int size) {
247  Socket_TCP *tcp;
248  DCAST_INTO_V(tcp, _socket);
249 
250  tcp->SetSendBufferSize(size);
251 }
252 
253 /**
254  * Sets IP time-to-live.
255  */
257 set_ip_time_to_live(int ttl) {
258  // TODO.
259 }
260 
261 /**
262  * Sets IP type-of-service and precedence.
263  */
265 set_ip_type_of_service(int tos) {
266  // TODO.
267 }
268 
269 /**
270  * If flag is true, this disables the Nagle algorithm, and prevents delaying
271  * of send to coalesce packets.
272  */
274 set_no_delay(bool flag) {
275  Socket_TCP *tcp;
276  DCAST_INTO_V(tcp, _socket);
277 
278  tcp->SetNoDelay(flag);
279 }
280 
281 /**
282  * Sets the maximum segment size.
283  */
285 set_max_segment(int size) {
286  // TODO.
287 }
288 
289 /**
290  * This method is intended only to be called by ConnectionWriter. It
291  * atomically writes the given datagram to the socket, returning true on
292  * success, false on failure. If the socket seems to be closed, it notifies
293  * the ConnectionManager.
294  */
295 bool Connection::
296 send_datagram(const NetDatagram &datagram, int tcp_header_size) {
297  nassertr(_socket != nullptr, false);
298 
299  if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
300  // We have to send UDP right away.
301  Socket_UDP *udp;
302  DCAST_INTO_R(udp, _socket, false);
303 
304  LightReMutexHolder holder(_write_mutex);
305  DatagramUDPHeader header(datagram);
306  std::string data;
307  data += header.get_header();
308  data += datagram.get_message();
309 
310  if (net_cat.is_debug()) {
311  header.verify_datagram(datagram);
312  }
313 
314  int bytes_to_send = data.length();
315  Socket_Address addr = datagram.get_address().get_addr();
316 
317  bool okflag = udp->SendTo(data, addr);
318 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
319  while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
321  okflag = udp->SendTo(data, addr);
322  }
323 #endif // SIMPLE_THREADS
324 
325  if (net_cat.is_spam()) {
326  net_cat.spam()
327  << "Sent UDP datagram with "
328  << bytes_to_send << " bytes to " << (void *)this
329  << ", ok = " << okflag << "\n";
330  }
331 
332  return check_send_error(okflag);
333  }
334 
335  // We might queue up TCP packets for later sending.
336  if (tcp_header_size == 2 && datagram.get_length() >= 0x10000) {
337  net_cat.error()
338  << "Attempt to send TCP datagram of " << datagram.get_length()
339  << " bytes--too long!\n";
340  nassert_raise("Datagram too long");
341  return false;
342  }
343 
344  DatagramTCPHeader header(datagram, tcp_header_size);
345 
346  LightReMutexHolder holder(_write_mutex);
347  _queued_data += header.get_header();
348  _queued_data += datagram.get_message();
349  _queued_count++;
350 
351  if (net_cat.is_debug()) {
352  header.verify_datagram(datagram, tcp_header_size);
353  }
354 
355  if (!_collect_tcp ||
356  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
357  return do_flush();
358  }
359 
360  return true;
361 }
362 
363 /**
364  * This method is intended only to be called by ConnectionWriter. It
365  * atomically writes the given datagram to the socket, without the Datagram
366  * header.
367  */
368 bool Connection::
369 send_raw_datagram(const NetDatagram &datagram) {
370  nassertr(_socket != nullptr, false);
371 
372  if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
373  // We have to send UDP right away.
374  Socket_UDP *udp;
375  DCAST_INTO_R(udp, _socket, false);
376 
377  std::string data = datagram.get_message();
378 
379  LightReMutexHolder holder(_write_mutex);
380  Socket_Address addr = datagram.get_address().get_addr();
381  bool okflag = udp->SendTo(data, addr);
382 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
383  while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
385  okflag = udp->SendTo(data, addr);
386  }
387 #endif // SIMPLE_THREADS
388 
389  if (net_cat.is_spam()) {
390  net_cat.spam()
391  << "Sent UDP datagram with "
392  << data.size() << " bytes to " << (void *)this
393  << ", ok = " << okflag << "\n";
394  }
395 
396  return check_send_error(okflag);
397  }
398 
399  // We might queue up TCP packets for later sending.
400  LightReMutexHolder holder(_write_mutex);
401  _queued_data += datagram.get_message();
402  _queued_count++;
403 
404  if (!_collect_tcp ||
405  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
406  return do_flush();
407  }
408 
409  return true;
410 }
411 
412 /**
413  * The private implementation of flush(), this assumes the _write_mutex is
414  * already held.
415  */
416 bool Connection::
417 do_flush() {
418  if (_queued_data.empty()) {
419  _queued_count = 0;
420  _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
421  return true;
422  }
423 
424  if (net_cat.is_spam()) {
425  net_cat.spam()
426  << "Sending " << _queued_count << " TCP datagram(s) with "
427  << _queued_data.length() << " total bytes to " << (void *)this << "\n";
428  }
429 
430  Socket_TCP *tcp;
431  DCAST_INTO_R(tcp, _socket, false);
432 
433  std::string sending_data;
434  _queued_data.swap(sending_data);
435 
436  _queued_count = 0;
437  _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
438 
439 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
440  int max_send = net_max_write_per_epoch;
441  int data_sent = tcp->SendData(sending_data.data(), std::min((size_t)max_send, sending_data.size()));
442  bool okflag = (data_sent == (int)sending_data.size());
443  if (!okflag) {
444  int total_sent = 0;
445  if (data_sent > 0) {
446  total_sent += data_sent;
447  }
448 
449  while (!okflag && tcp->Active() &&
450  (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
451  if (data_sent == 0) {
453  } else {
455  }
456  data_sent = tcp->SendData(sending_data.data() + total_sent, std::min((size_t)max_send, sending_data.size() - total_sent));
457  if (data_sent > 0) {
458  total_sent += data_sent;
459  }
460  okflag = (total_sent == (int)sending_data.size());
461  }
462  }
463 
464 #else // SIMPLE_THREADS
465  int data_sent = tcp->SendData(sending_data);
466  bool okflag = (data_sent == (int)sending_data.size());
467 
468 #endif // SIMPLE_THREADS
469 
470  return check_send_error(okflag);
471 }
472 
473 /**
474  * Checks the return value of a Send() or SendTo() call.
475  */
476 bool Connection::
477 check_send_error(bool okflag) {
478  if (!okflag) {
479  static ConfigVariableBool abort_send_error("abort-send-error", false);
480  if (abort_send_error) {
481  nassert_raise("send error");
482  return false;
483  }
484 
485  // Assume any error means the connection has been reset; tell our manager
486  // about it and ignore it.
487  if (_manager != nullptr) {
488  _manager->flush_read_connection(this);
489  _manager->connection_reset(this, okflag);
490  }
491  return false;
492  }
493 
494  return true;
495 }
This is a convenience class to specialize ConfigVariable as a boolean type.
The primary interface to the low-level networking layer in this package.
void set_collect_tcp(bool collect_tcp)
Enables or disables "collect-tcp" mode.
Definition: connection.cxx:112
void set_recv_buffer_size(int size)
Sets the size of the receive buffer, in bytes.
Definition: connection.cxx:238
void set_collect_tcp_interval(double interval)
Specifies the interval in time, in seconds, for which to hold TCP packets before sending all of the r...
Definition: connection.cxx:130
bool consider_flush()
Sends the most recently queued TCP datagram(s) if enough time has elapsed.
Definition: connection.cxx:149
~Connection()
Closes a connection.
Definition: connection.cxx:59
bool flush()
Sends the most recently queued TCP datagram(s) now.
Definition: connection.cxx:173
void set_keep_alive(bool flag)
Sets whether the connection is periodically tested to see if it is still alive.
Definition: connection.cxx:230
NetAddress get_address() const
Returns the address bound to this connection, if it is a TCP connection.
Definition: connection.cxx:75
void set_ip_time_to_live(int ttl)
Sets IP time-to-live.
Definition: connection.cxx:257
double get_collect_tcp_interval() const
Returns the interval in time, in seconds, for which to hold TCP packets before sending all of the rec...
Definition: connection.cxx:140
void set_reuse_addr(bool flag)
Sets whether local address reuse is allowed.
Definition: connection.cxx:221
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this connection.
Definition: connection.cxx:85
void set_linger(bool flag, double time)
Sets whether nonblocking I/O should be in effect.
Definition: connection.cxx:206
bool get_collect_tcp() const
Returns the current setting of "collect-tcp" mode.
Definition: connection.cxx:120
void set_ip_type_of_service(int tos)
Sets IP type-of-service and precedence.
Definition: connection.cxx:265
void set_no_delay(bool flag)
If flag is true, this disables the Nagle algorithm, and prevents delaying of send to coalesce packets...
Definition: connection.cxx:274
void set_send_buffer_size(int size)
Sets the size of the send buffer, in bytes.
Definition: connection.cxx:246
Connection(ConnectionManager *manager, Socket_IP *socket)
Creates a connection.
Definition: connection.cxx:36
void set_max_segment(int size)
Sets the maximum segment size.
Definition: connection.cxx:285
Socket_IP * get_socket() const
Returns the internal Socket_IP that defines the connection.
Definition: connection.cxx:93
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335
std::string get_message() const
Returns the datagram's data as a string.
Definition: datagram.I:314
Similar to MutexHolder, but for a light reentrant mutex.
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition: netAddress.h:25
const Socket_Address & get_addr() const
Returns the Socket_Address for this address.
Definition: netAddress.cxx:138
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
A simple place to store and manipulate tcp and port address for communication layer.
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
Definition: socket_ip.h:27
bool SetReuseAddress(bool flag=true)
Informs a socket to reuse IP address as needed.
Definition: socket_ip.h:212
static int GetLastError()
Gets the last errcode from a socket operation.
Definition: socket_ip.h:140
void Close()
Closes a socket if it is open (allocated).
Definition: socket_ip.h:128
int SetRecvBufferSize(int size)
Ok it sets the recv buffer size for both tcp and UDP.
Definition: socket_ip.h:157
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:96
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:253
int SetNonBlocking()
this function will throw a socket into non-blocking mode
Definition: socket_ip.h:169
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:12
int SetSendBufferSize(int insize)
Just like it sounds.
Definition: socket_tcp.h:107
int DontLinger()
Turn off the linger flag.
Definition: socket_tcp.h:91
int SetLinger(int interval_seconds=0)
will control the behavior of SO_LINGER for a TCP socket
Definition: socket_tcp.h:74
int SetNoDelay(bool flag=true)
Disable Nagle algorithm.
Definition: socket_tcp.h:62
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:24
bool SendTo(const char *data, int len, const Socket_Address &address)
Send data to specified address.
Definition: socket_udp.h:107
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:68
bool is_exact_type(TypeHandle handle) const
Returns true if the current object is the indicated type exactly.
Definition: typedObject.I:38
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.