Panda3D
connection.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file connection.cxx
10  * @author jns
11  * @date 2000-02-07
12  */
13 
14 #include "connection.h"
15 #include "connectionManager.h"
16 #include "netDatagram.h"
17 #include "datagramTCPHeader.h"
18 #include "datagramUDPHeader.h"
19 #include "config_net.h"
20 #include "config_express.h" // for collect_tcp
21 #include "trueClock.h"
22 #include "pnotify.h"
23 #include "lightReMutexHolder.h"
24 #include "socket_ip.h"
25 #include "socket_tcp.h"
26 #include "socket_udp.h"
27 #include "dcast.h"
28 
29 
30 /**
31  * Creates a connection. Normally this constructor should not be used
32  * directly by user code; use one of the methods in ConnectionManager to make
33  * a new connection.
34  */
37  _manager(manager),
38  _socket(socket)
39 {
40  _collect_tcp = collect_tcp;
41  _collect_tcp_interval = collect_tcp_interval;
42  _queued_data_start = 0.0;
43  _queued_count = 0;
44 
45 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
46  // In the presence of SIMPLE_THREADS, we use non-blocking IO. We simulate
47  // blocking by yielding the thread.
48  if (_socket->SetNonBlocking() != ALL_OK) {
49  net_cat.warning()
50  << "Unable to set non-blocking status on socket\n";
51  }
52 #endif
53 }
54 
55 /**
56  * Closes a connection.
57  */
60  net_cat.info()
61  << "Deleting connection " << (void *)this << "\n";
62 
63  if (_socket != nullptr) {
64  flush();
65 
66  _socket->Close();
67  delete _socket;
68  }
69 }
70 
71 /**
72  * Returns the address bound to this connection, if it is a TCP connection.
73  */
75 get_address() const {
76  Socket_Address addr = _socket->GetPeerName();
77  return NetAddress(addr);
78 }
79 
80 /**
81  * Returns a pointer to the ConnectionManager object that serves this
82  * connection.
83  */
85 get_manager() const {
86  return _manager;
87 }
88 
89 /**
90  * Returns the internal Socket_IP that defines the connection.
91  */
93 get_socket() const {
94  return _socket;
95 }
96 
97 /**
98  * Enables or disables "collect-tcp" mode. In this mode, individual TCP
99  * packets are not sent immediately, but rather they are collected together
100  * and accumulated to be sent periodically as one larger TCP packet. This
101  * cuts down on overhead from the TCP/IP protocol, especially if many small
102  * packets need to be sent on the same connection, but it introduces
103  * additional latency (since packets must be held before they can be sent).
104  *
105  * See set_collect_tcp_interval() to specify the interval of time for which to
106  * hold packets before sending them.
107  *
108  * If you enable this mode, you may also need to periodically call
109  * consider_flush() to flush the queue if no packets have been sent recently.
110  */
111 void Connection::
112 set_collect_tcp(bool collect_tcp) {
113  _collect_tcp = collect_tcp;
114 }
115 
116 /**
117  * Returns the current setting of "collect-tcp" mode. See set_collect_tcp().
118  */
119 bool Connection::
121  return _collect_tcp;
122 }
123 
124 /**
125  * Specifies the interval in time, in seconds, for which to hold TCP packets
126  * before sending all of the recently received packets at once. This only has
127  * meaning if "collect-tcp" mode is enabled; see set_collect_tcp().
128  */
129 void Connection::
130 set_collect_tcp_interval(double interval) {
131  _collect_tcp_interval = interval;
132 }
133 
134 /**
135  * Returns the interval in time, in seconds, for which to hold TCP packets
136  * before sending all of the recently received packets at once. This only has
137  * meaning if "collect-tcp" mode is enabled; see set_collect_tcp().
138  */
139 double Connection::
141  return _collect_tcp_interval;
142 }
143 
144 /**
145  * Sends the most recently queued TCP datagram(s) if enough time has elapsed.
146  * This only has meaning if set_collect_tcp() has been set to true.
147  */
148 bool Connection::
150  LightReMutexHolder holder(_write_mutex);
151 
152  if (!_collect_tcp) {
153  return do_flush();
154 
155  } else {
156  double elapsed =
157  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start;
158  // If the elapsed time is negative, someone must have reset the clock
159  // back, so just go ahead and flush.
160  if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) {
161  return do_flush();
162  }
163  }
164 
165  return true;
166 }
167 
168 /**
169  * Sends the most recently queued TCP datagram(s) now. This only has meaning
170  * if set_collect_tcp() has been set to true.
171  */
172 bool Connection::
173 flush() {
174  LightReMutexHolder holder(_write_mutex);
175  return do_flush();
176 }
177 
178 
179 /**
180  * Sets whether nonblocking I/O should be in effect.
181  */
182 /*
183 This method is disabled. We don't provide enough interface to use
184 non-blocking I/O effectively at this level, so we shouldn't provide
185 this call. Specifically, we don't provide a way to query whether an
186 operation failed because it would have blocked or not.
187 void Connection::
188 set_nonblock(bool flag) {
189  if (flag) {
190  _socket->SetNonBlocking();
191  } else {
192  _socket->SetBlocking();
193  }
194 }
195 */
196 
197 /**
198  * Sets the time to linger on close if data is present. If flag is false,
199  * when you close a socket with data available the system attempts to deliver
200  * the data to the peer (the default behavior). If flag is false but time is
201  * zero, the system discards any undelivered data when you close the socket.
202  * If flag is false but time is nonzero, the system waits up to time seconds
203  * to deliver the data.
204  */
205 void Connection::
206 set_linger(bool flag, double time) {
207  Socket_TCP *tcp;
208  DCAST_INTO_V(tcp, _socket);
209 
210  if (flag) {
211  tcp->SetLinger((int)time);
212  } else {
213  tcp->DontLinger();
214  }
215 }
216 
217 /**
218  * Sets whether local address reuse is allowed.
219  */
220 void Connection::
221 set_reuse_addr(bool flag) {
222  _socket->SetReuseAddress(flag);
223 }
224 
225 /**
226  * Sets whether the connection is periodically tested to see if it is still
227  * alive.
228  */
229 void Connection::
230 set_keep_alive(bool flag) {
231  // TODO.
232 }
233 
234 /**
235  * Sets the size of the receive buffer, in bytes.
236  */
237 void Connection::
239  _socket->SetRecvBufferSize(size);
240 }
241 
242 /**
243  * Sets the size of the send buffer, in bytes.
244  */
245 void Connection::
247  Socket_TCP *tcp;
248  DCAST_INTO_V(tcp, _socket);
249 
250  tcp->SetSendBufferSize(size);
251 }
252 
253 /**
254  * Sets IP time-to-live.
255  */
256 void Connection::
258  // TODO.
259 }
260 
261 /**
262  * Sets IP type-of-service and precedence.
263  */
264 void Connection::
266  // TODO.
267 }
268 
269 /**
270  * If flag is true, this disables the Nagle algorithm, and prevents delaying
271  * of send to coalesce packets.
272  */
273 void Connection::
274 set_no_delay(bool flag) {
275  Socket_TCP *tcp;
276  DCAST_INTO_V(tcp, _socket);
277 
278  tcp->SetNoDelay(flag);
279 }
280 
281 /**
282  * Sets the maximum segment size.
283  */
284 void Connection::
285 set_max_segment(int size) {
286  // TODO.
287 }
288 
289 /**
290  * This method is intended only to be called by ConnectionWriter. It
291  * atomically writes the given datagram to the socket, returning true on
292  * success, false on failure. If the socket seems to be closed, it notifies
293  * the ConnectionManager.
294  */
295 bool Connection::
296 send_datagram(const NetDatagram &datagram, int tcp_header_size) {
297  nassertr(_socket != nullptr, false);
298 
299  if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
300  // We have to send UDP right away.
301  Socket_UDP *udp;
302  DCAST_INTO_R(udp, _socket, false);
303 
304  LightReMutexHolder holder(_write_mutex);
305  DatagramUDPHeader header(datagram);
306  std::string data;
307  data += header.get_header();
308  data += datagram.get_message();
309 
310  if (net_cat.is_debug()) {
311  header.verify_datagram(datagram);
312  }
313 
314  int bytes_to_send = data.length();
315  Socket_Address addr = datagram.get_address().get_addr();
316 
317  bool okflag = udp->SendTo(data, addr);
318 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
319  while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
321  okflag = udp->SendTo(data, addr);
322  }
323 #endif // SIMPLE_THREADS
324 
325  if (net_cat.is_spam()) {
326  net_cat.spam()
327  << "Sent UDP datagram with "
328  << bytes_to_send << " bytes to " << (void *)this
329  << ", ok = " << okflag << "\n";
330  }
331 
332  return check_send_error(okflag);
333  }
334 
335  // We might queue up TCP packets for later sending.
336  if (tcp_header_size == 2 && datagram.get_length() >= 0x10000) {
337  net_cat.error()
338  << "Attempt to send TCP datagram of " << datagram.get_length()
339  << " bytes--too long!\n";
340  nassert_raise("Datagram too long");
341  return false;
342  }
343 
344  DatagramTCPHeader header(datagram, tcp_header_size);
345 
346  LightReMutexHolder holder(_write_mutex);
347  _queued_data += header.get_header();
348  _queued_data += datagram.get_message();
349  _queued_count++;
350 
351  if (net_cat.is_debug()) {
352  header.verify_datagram(datagram, tcp_header_size);
353  }
354 
355  if (!_collect_tcp ||
356  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
357  return do_flush();
358  }
359 
360  return true;
361 }
362 
363 /**
364  * This method is intended only to be called by ConnectionWriter. It
365  * atomically writes the given datagram to the socket, without the Datagram
366  * header.
367  */
368 bool Connection::
369 send_raw_datagram(const NetDatagram &datagram) {
370  nassertr(_socket != nullptr, false);
371 
372  if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
373  // We have to send UDP right away.
374  Socket_UDP *udp;
375  DCAST_INTO_R(udp, _socket, false);
376 
377  std::string data = datagram.get_message();
378 
379  LightReMutexHolder holder(_write_mutex);
380  Socket_Address addr = datagram.get_address().get_addr();
381  bool okflag = udp->SendTo(data, addr);
382 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
383  while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
385  okflag = udp->SendTo(data, addr);
386  }
387 #endif // SIMPLE_THREADS
388 
389  if (net_cat.is_spam()) {
390  net_cat.spam()
391  << "Sent UDP datagram with "
392  << data.size() << " bytes to " << (void *)this
393  << ", ok = " << okflag << "\n";
394  }
395 
396  return check_send_error(okflag);
397  }
398 
399  // We might queue up TCP packets for later sending.
400  LightReMutexHolder holder(_write_mutex);
401  _queued_data += datagram.get_message();
402  _queued_count++;
403 
404  if (!_collect_tcp ||
405  TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
406  return do_flush();
407  }
408 
409  return true;
410 }
411 
412 /**
413  * The private implementation of flush(), this assumes the _write_mutex is
414  * already held.
415  */
416 bool Connection::
417 do_flush() {
418  if (_queued_data.empty()) {
419  _queued_count = 0;
420  _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
421  return true;
422  }
423 
424  if (net_cat.is_spam()) {
425  net_cat.spam()
426  << "Sending " << _queued_count << " TCP datagram(s) with "
427  << _queued_data.length() << " total bytes to " << (void *)this << "\n";
428  }
429 
430  Socket_TCP *tcp;
431  DCAST_INTO_R(tcp, _socket, false);
432 
433  std::string sending_data;
434  _queued_data.swap(sending_data);
435 
436  _queued_count = 0;
437  _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
438 
439 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
440  int max_send = net_max_write_per_epoch;
441  int data_sent = tcp->SendData(sending_data.data(), std::min((size_t)max_send, sending_data.size()));
442  bool okflag = (data_sent == (int)sending_data.size());
443  if (!okflag) {
444  int total_sent = 0;
445  if (data_sent > 0) {
446  total_sent += data_sent;
447  }
448 
449  while (!okflag && tcp->Active() &&
450  (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
451  if (data_sent == 0) {
453  } else {
455  }
456  data_sent = tcp->SendData(sending_data.data() + total_sent, std::min((size_t)max_send, sending_data.size() - total_sent));
457  if (data_sent > 0) {
458  total_sent += data_sent;
459  }
460  okflag = (total_sent == (int)sending_data.size());
461  }
462  }
463 
464 #else // SIMPLE_THREADS
465  int data_sent = tcp->SendData(sending_data);
466  bool okflag = (data_sent == (int)sending_data.size());
467 
468 #endif // SIMPLE_THREADS
469 
470  return check_send_error(okflag);
471 }
472 
473 /**
474  * Checks the return value of a Send() or SendTo() call.
475  */
476 bool Connection::
477 check_send_error(bool okflag) {
478  if (!okflag) {
479  static ConfigVariableBool abort_send_error("abort-send-error", false);
480  if (abort_send_error) {
481  nassert_raise("send error");
482  return false;
483  }
484 
485  // Assume any error means the connection has been reset; tell our manager
486  // about it and ignore it.
487  if (_manager != nullptr) {
488  _manager->flush_read_connection(this);
489  _manager->connection_reset(this, okflag);
490  }
491  return false;
492  }
493 
494  return true;
495 }
int SetRecvBufferSize(int size)
Ok it sets the recv buffer size for both tcp and UDP.
Definition: socket_ip.h:157
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:68
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:12
bool is_exact_type(TypeHandle handle) const
Returns true if the current object is the indicated type exactly.
Definition: typedObject.I:38
void set_keep_alive(bool flag)
Sets whether the connection is periodically tested to see if it is still alive.
Definition: connection.cxx:230
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
Definition: socket_ip.h:27
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
int SetNonBlocking()
this function will throw a socket into non-blocking mode
Definition: socket_ip.h:169
This is a convenience class to specialize ConfigVariable as a boolean type.
void set_ip_type_of_service(int tos)
Sets IP type-of-service and precedence.
Definition: connection.cxx:265
void set_linger(bool flag, double time)
Sets whether nonblocking I/O should be in effect.
Definition: connection.cxx:206
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
The primary interface to the low-level networking layer in this package.
~Connection()
Closes a connection.
Definition: connection.cxx:59
int DontLinger()
Turn off the linger flag.
Definition: socket_tcp.h:91
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:253
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:96
void set_max_segment(int size)
Sets the maximum segment size.
Definition: connection.cxx:285
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
void set_ip_time_to_live(int ttl)
Sets IP time-to-live.
Definition: connection.cxx:257
Connection(ConnectionManager *manager, Socket_IP *socket)
Creates a connection.
Definition: connection.cxx:36
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
bool get_collect_tcp() const
Returns the current setting of "collect-tcp" mode.
Definition: connection.cxx:120
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:24
Socket_IP * get_socket() const
Returns the internal Socket_IP that defines the connection.
Definition: connection.cxx:93
void set_recv_buffer_size(int size)
Sets the size of the receive buffer, in bytes.
Definition: connection.cxx:238
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void set_collect_tcp(bool collect_tcp)
Enables or disables "collect-tcp" mode.
Definition: connection.cxx:112
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this connection.
Definition: connection.cxx:85
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
double get_collect_tcp_interval() const
Returns the interval in time, in seconds, for which to hold TCP packets before sending all of the rec...
Definition: connection.cxx:140
void set_reuse_addr(bool flag)
Sets whether local address reuse is allowed.
Definition: connection.cxx:221
void set_no_delay(bool flag)
If flag is true, this disables the Nagle algorithm, and prevents delaying of send to coalesce packets...
Definition: connection.cxx:274
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool consider_flush()
Sends the most recently queued TCP datagram(s) if enough time has elapsed.
Definition: connection.cxx:149
A simple place to store and manipulate tcp and port address for communication layer.
Similar to MutexHolder, but for a light reentrant mutex.
NetAddress get_address() const
Returns the address bound to this connection, if it is a TCP connection.
Definition: connection.cxx:75
int SetLinger(int interval_seconds=0)
will control the behavior of SO_LINGER for a TCP socket
Definition: socket_tcp.h:74
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
static int GetLastError()
Gets the last errcode from a socket operation.
Definition: socket_ip.h:140
void set_collect_tcp_interval(double interval)
Specifies the interval in time, in seconds, for which to hold TCP packets before sending all of the r...
Definition: connection.cxx:130
bool SetReuseAddress(bool flag=true)
Informs a socket to reuse IP address as needed.
Definition: socket_ip.h:212
void Close()
Closes a socket if it is open (allocated).
Definition: socket_ip.h:128
void set_send_buffer_size(int size)
Sets the size of the send buffer, in bytes.
Definition: connection.cxx:246
std::string get_message() const
Returns the datagram's data as a string.
Definition: datagram.I:314
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
const Socket_Address & get_addr() const
Returns the Socket_Address for this address.
Definition: netAddress.cxx:138
bool SendTo(const char *data, int len, const Socket_Address &address)
Send data to specified address.
Definition: socket_udp.h:107
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int SetSendBufferSize(int insize)
Just like it sounds.
Definition: socket_tcp.h:107
bool flush()
Sends the most recently queued TCP datagram(s) now.
Definition: connection.cxx:173
int SetNoDelay(bool flag=true)
Disable Nagle algorithm.
Definition: socket_tcp.h:62
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition: netAddress.h:25
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.