Panda3D
connection.cxx
Go to the documentation of this file.
1/**
2 * PANDA 3D SOFTWARE
3 * Copyright (c) Carnegie Mellon University. All rights reserved.
4 *
5 * All use of this software is subject to the terms of the revised BSD
6 * license. You should have received a copy of this license along
7 * with this source code in a file named "LICENSE."
8 *
9 * @file connection.cxx
10 * @author jns
11 * @date 2000-02-07
12 */
13
14#include "connection.h"
15#include "connectionManager.h"
16#include "netDatagram.h"
17#include "datagramTCPHeader.h"
18#include "datagramUDPHeader.h"
19#include "config_net.h"
20#include "config_express.h" // for collect_tcp
21#include "trueClock.h"
22#include "pnotify.h"
23#include "lightReMutexHolder.h"
24#include "socket_ip.h"
25#include "socket_tcp.h"
26#include "socket_udp.h"
27#include "dcast.h"
28
29
30/**
31 * Creates a connection. Normally this constructor should not be used
32 * directly by user code; use one of the methods in ConnectionManager to make
33 * a new connection.
34 */
36Connection(ConnectionManager *manager, Socket_IP *socket) :
37 _manager(manager),
38 _socket(socket)
39{
40 _collect_tcp = collect_tcp;
41 _collect_tcp_interval = collect_tcp_interval;
42 _queued_data_start = 0.0;
43 _queued_count = 0;
44
45#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
46 // In the presence of SIMPLE_THREADS, we use non-blocking IO. We simulate
47 // blocking by yielding the thread.
48 if (_socket->SetNonBlocking() != ALL_OK) {
49 net_cat.warning()
50 << "Unable to set non-blocking status on socket\n";
51 }
52#endif
53}
54
55/**
56 * Closes a connection.
57 */
60 net_cat.info()
61 << "Deleting connection " << (void *)this << "\n";
62
63 if (_socket != nullptr) {
64 flush();
65
66 _socket->Close();
67 delete _socket;
68 }
69}
70
71/**
72 * Returns the address bound to this connection, if it is a TCP connection.
73 */
75get_address() const {
76 Socket_Address addr = _socket->GetPeerName();
77 return NetAddress(addr);
78}
79
80/**
81 * Returns a pointer to the ConnectionManager object that serves this
82 * connection.
83 */
85get_manager() const {
86 return _manager;
87}
88
89/**
90 * Returns the internal Socket_IP that defines the connection.
91 */
93get_socket() const {
94 return _socket;
95}
96
97/**
98 * Enables or disables "collect-tcp" mode. In this mode, individual TCP
99 * packets are not sent immediately, but rather they are collected together
100 * and accumulated to be sent periodically as one larger TCP packet. This
101 * cuts down on overhead from the TCP/IP protocol, especially if many small
102 * packets need to be sent on the same connection, but it introduces
103 * additional latency (since packets must be held before they can be sent).
104 *
105 * See set_collect_tcp_interval() to specify the interval of time for which to
106 * hold packets before sending them.
107 *
108 * If you enable this mode, you may also need to periodically call
109 * consider_flush() to flush the queue if no packets have been sent recently.
110 */
112set_collect_tcp(bool collect_tcp) {
113 _collect_tcp = collect_tcp;
114}
115
116/**
117 * Returns the current setting of "collect-tcp" mode. See set_collect_tcp().
118 */
120get_collect_tcp() const {
121 return _collect_tcp;
122}
123
124/**
125 * Specifies the interval in time, in seconds, for which to hold TCP packets
126 * before sending all of the recently received packets at once. This only has
127 * meaning if "collect-tcp" mode is enabled; see set_collect_tcp().
128 */
130set_collect_tcp_interval(double interval) {
131 _collect_tcp_interval = interval;
132}
133
134/**
135 * Returns the interval in time, in seconds, for which to hold TCP packets
136 * before sending all of the recently received packets at once. This only has
137 * meaning if "collect-tcp" mode is enabled; see set_collect_tcp().
138 */
141 return _collect_tcp_interval;
142}
143
144/**
145 * Sends the most recently queued TCP datagram(s) if enough time has elapsed.
146 * This only has meaning if set_collect_tcp() has been set to true.
147 */
150 LightReMutexHolder holder(_write_mutex);
151
152 if (!_collect_tcp) {
153 return do_flush();
154
155 } else {
156 double elapsed =
157 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start;
158 // If the elapsed time is negative, someone must have reset the clock
159 // back, so just go ahead and flush.
160 if (elapsed < 0.0 || elapsed >= _collect_tcp_interval) {
161 return do_flush();
162 }
163 }
164
165 return true;
166}
167
168/**
169 * Sends the most recently queued TCP datagram(s) now. This only has meaning
170 * if set_collect_tcp() has been set to true.
171 */
173flush() {
174 LightReMutexHolder holder(_write_mutex);
175 return do_flush();
176}
177
178
179/**
180 * Sets whether nonblocking I/O should be in effect.
181 */
182/*
183This method is disabled. We don't provide enough interface to use
184non-blocking I/O effectively at this level, so we shouldn't provide
185this call. Specifically, we don't provide a way to query whether an
186operation failed because it would have blocked or not.
187void Connection::
188set_nonblock(bool flag) {
189 if (flag) {
190 _socket->SetNonBlocking();
191 } else {
192 _socket->SetBlocking();
193 }
194}
195*/
196
197/**
198 * Sets the time to linger on close if data is present. If flag is false,
199 * when you close a socket with data available the system attempts to deliver
200 * the data to the peer (the default behavior). If flag is false but time is
201 * zero, the system discards any undelivered data when you close the socket.
202 * If flag is false but time is nonzero, the system waits up to time seconds
203 * to deliver the data.
204 */
206set_linger(bool flag, double time) {
207 Socket_TCP *tcp;
208 DCAST_INTO_V(tcp, _socket);
209
210 if (flag) {
211 tcp->SetLinger((int)time);
212 } else {
213 tcp->DontLinger();
214 }
215}
216
217/**
218 * Sets whether local address reuse is allowed.
219 */
221set_reuse_addr(bool flag) {
222 _socket->SetReuseAddress(flag);
223}
224
225/**
226 * Sets whether the connection is periodically tested to see if it is still
227 * alive.
228 */
230set_keep_alive(bool flag) {
231 // TODO.
232}
233
234/**
235 * Sets the size of the receive buffer, in bytes.
236 */
238set_recv_buffer_size(int size) {
239 _socket->SetRecvBufferSize(size);
240}
241
242/**
243 * Sets the size of the send buffer, in bytes.
244 */
246set_send_buffer_size(int size) {
247 Socket_TCP *tcp;
248 DCAST_INTO_V(tcp, _socket);
249
250 tcp->SetSendBufferSize(size);
251}
252
253/**
254 * Sets IP time-to-live.
255 */
257set_ip_time_to_live(int ttl) {
258 // TODO.
259}
260
261/**
262 * Sets IP type-of-service and precedence.
263 */
265set_ip_type_of_service(int tos) {
266 // TODO.
267}
268
269/**
270 * If flag is true, this disables the Nagle algorithm, and prevents delaying
271 * of send to coalesce packets.
272 */
274set_no_delay(bool flag) {
275 Socket_TCP *tcp;
276 DCAST_INTO_V(tcp, _socket);
277
278 tcp->SetNoDelay(flag);
279}
280
281/**
282 * Sets the maximum segment size.
283 */
285set_max_segment(int size) {
286 // TODO.
287}
288
289/**
290 * This method is intended only to be called by ConnectionWriter. It
291 * atomically writes the given datagram to the socket, returning true on
292 * success, false on failure. If the socket seems to be closed, it notifies
293 * the ConnectionManager.
294 */
295bool Connection::
296send_datagram(const NetDatagram &datagram, int tcp_header_size) {
297 nassertr(_socket != nullptr, false);
298
299 if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
300 // We have to send UDP right away.
301 Socket_UDP *udp;
302 DCAST_INTO_R(udp, _socket, false);
303
304 LightReMutexHolder holder(_write_mutex);
305 DatagramUDPHeader header(datagram);
306 std::string data;
307 data += header.get_header();
308 data += datagram.get_message();
309
310 if (net_cat.is_debug()) {
311 header.verify_datagram(datagram);
312 }
313
314 int bytes_to_send = data.length();
315 Socket_Address addr = datagram.get_address().get_addr();
316
317 bool okflag = udp->SendTo(data, addr);
318#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
319 while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
321 okflag = udp->SendTo(data, addr);
322 }
323#endif // SIMPLE_THREADS
324
325 if (net_cat.is_spam()) {
326 net_cat.spam()
327 << "Sent UDP datagram with "
328 << bytes_to_send << " bytes to " << (void *)this
329 << ", ok = " << okflag << "\n";
330 }
331
332 return check_send_error(okflag);
333 }
334
335 // We might queue up TCP packets for later sending.
336 if (tcp_header_size == 2 && datagram.get_length() >= 0x10000) {
337 net_cat.error()
338 << "Attempt to send TCP datagram of " << datagram.get_length()
339 << " bytes--too long!\n";
340 nassert_raise("Datagram too long");
341 return false;
342 }
343
344 DatagramTCPHeader header(datagram, tcp_header_size);
345
346 LightReMutexHolder holder(_write_mutex);
347 _queued_data += header.get_header();
348 _queued_data += datagram.get_message();
349 _queued_count++;
350
351 if (net_cat.is_debug()) {
352 header.verify_datagram(datagram, tcp_header_size);
353 }
354
355 if (!_collect_tcp ||
356 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
357 return do_flush();
358 }
359
360 return true;
361}
362
363/**
364 * This method is intended only to be called by ConnectionWriter. It
365 * atomically writes the given datagram to the socket, without the Datagram
366 * header.
367 */
368bool Connection::
369send_raw_datagram(const NetDatagram &datagram) {
370 nassertr(_socket != nullptr, false);
371
372 if (_socket->is_exact_type(Socket_UDP::get_class_type())) {
373 // We have to send UDP right away.
374 Socket_UDP *udp;
375 DCAST_INTO_R(udp, _socket, false);
376
377 std::string data = datagram.get_message();
378
379 LightReMutexHolder holder(_write_mutex);
380 Socket_Address addr = datagram.get_address().get_addr();
381 bool okflag = udp->SendTo(data, addr);
382#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
383 while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR && udp->Active()) {
385 okflag = udp->SendTo(data, addr);
386 }
387#endif // SIMPLE_THREADS
388
389 if (net_cat.is_spam()) {
390 net_cat.spam()
391 << "Sent UDP datagram with "
392 << data.size() << " bytes to " << (void *)this
393 << ", ok = " << okflag << "\n";
394 }
395
396 return check_send_error(okflag);
397 }
398
399 // We might queue up TCP packets for later sending.
400 LightReMutexHolder holder(_write_mutex);
401 _queued_data += datagram.get_message();
402 _queued_count++;
403
404 if (!_collect_tcp ||
405 TrueClock::get_global_ptr()->get_short_time() - _queued_data_start >= _collect_tcp_interval) {
406 return do_flush();
407 }
408
409 return true;
410}
411
412/**
413 * The private implementation of flush(), this assumes the _write_mutex is
414 * already held.
415 */
416bool Connection::
417do_flush() {
418 if (_queued_data.empty()) {
419 _queued_count = 0;
420 _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
421 return true;
422 }
423
424 if (net_cat.is_spam()) {
425 net_cat.spam()
426 << "Sending " << _queued_count << " TCP datagram(s) with "
427 << _queued_data.length() << " total bytes to " << (void *)this << "\n";
428 }
429
430 Socket_TCP *tcp;
431 DCAST_INTO_R(tcp, _socket, false);
432
433 std::string sending_data;
434 _queued_data.swap(sending_data);
435
436 _queued_count = 0;
437 _queued_data_start = TrueClock::get_global_ptr()->get_short_time();
438
439#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
440 int max_send = net_max_write_per_epoch;
441 int data_sent = tcp->SendData(sending_data.data(), std::min((size_t)max_send, sending_data.size()));
442 bool okflag = (data_sent == (int)sending_data.size());
443 if (!okflag) {
444 int total_sent = 0;
445 if (data_sent > 0) {
446 total_sent += data_sent;
447 }
448
449 while (!okflag && tcp->Active() &&
450 (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
451 if (data_sent == 0) {
453 } else {
455 }
456 data_sent = tcp->SendData(sending_data.data() + total_sent, std::min((size_t)max_send, sending_data.size() - total_sent));
457 if (data_sent > 0) {
458 total_sent += data_sent;
459 }
460 okflag = (total_sent == (int)sending_data.size());
461 }
462 }
463
464#else // SIMPLE_THREADS
465 int data_sent = tcp->SendData(sending_data);
466 bool okflag = (data_sent == (int)sending_data.size());
467
468#endif // SIMPLE_THREADS
469
470 return check_send_error(okflag);
471}
472
473/**
474 * Checks the return value of a Send() or SendTo() call.
475 */
476bool Connection::
477check_send_error(bool okflag) {
478 if (!okflag) {
479 static ConfigVariableBool abort_send_error("abort-send-error", false);
480 if (abort_send_error) {
481 nassert_raise("send error");
482 return false;
483 }
484
485 // Assume any error means the connection has been reset; tell our manager
486 // about it and ignore it.
487 if (_manager != nullptr) {
488 _manager->flush_read_connection(this);
489 _manager->connection_reset(this, okflag);
490 }
491 return false;
492 }
493
494 return true;
495}
This is a convenience class to specialize ConfigVariable as a boolean type.
The primary interface to the low-level networking layer in this package.
void set_collect_tcp(bool collect_tcp)
Enables or disables "collect-tcp" mode.
Definition: connection.cxx:112
void set_recv_buffer_size(int size)
Sets the size of the receive buffer, in bytes.
Definition: connection.cxx:238
void set_collect_tcp_interval(double interval)
Specifies the interval in time, in seconds, for which to hold TCP packets before sending all of the r...
Definition: connection.cxx:130
bool consider_flush()
Sends the most recently queued TCP datagram(s) if enough time has elapsed.
Definition: connection.cxx:149
~Connection()
Closes a connection.
Definition: connection.cxx:59
bool flush()
Sends the most recently queued TCP datagram(s) now.
Definition: connection.cxx:173
void set_keep_alive(bool flag)
Sets whether the connection is periodically tested to see if it is still alive.
Definition: connection.cxx:230
NetAddress get_address() const
Returns the address bound to this connection, if it is a TCP connection.
Definition: connection.cxx:75
void set_ip_time_to_live(int ttl)
Sets IP time-to-live.
Definition: connection.cxx:257
double get_collect_tcp_interval() const
Returns the interval in time, in seconds, for which to hold TCP packets before sending all of the rec...
Definition: connection.cxx:140
void set_reuse_addr(bool flag)
Sets whether local address reuse is allowed.
Definition: connection.cxx:221
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this connection.
Definition: connection.cxx:85
void set_linger(bool flag, double time)
Sets whether nonblocking I/O should be in effect.
Definition: connection.cxx:206
bool get_collect_tcp() const
Returns the current setting of "collect-tcp" mode.
Definition: connection.cxx:120
void set_ip_type_of_service(int tos)
Sets IP type-of-service and precedence.
Definition: connection.cxx:265
void set_no_delay(bool flag)
If flag is true, this disables the Nagle algorithm, and prevents delaying of send to coalesce packets...
Definition: connection.cxx:274
void set_send_buffer_size(int size)
Sets the size of the send buffer, in bytes.
Definition: connection.cxx:246
Connection(ConnectionManager *manager, Socket_IP *socket)
Creates a connection.
Definition: connection.cxx:36
void set_max_segment(int size)
Sets the maximum segment size.
Definition: connection.cxx:285
Socket_IP * get_socket() const
Returns the internal Socket_IP that defines the connection.
Definition: connection.cxx:93
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
A class that encapsulates the extra bytes that are sent in front of each datagram to identify it when...
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335
std::string get_message() const
Returns the datagram's data as a string.
Definition: datagram.I:314
Similar to MutexHolder, but for a light reentrant mutex.
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition: netAddress.h:25
const Socket_Address & get_addr() const
Returns the Socket_Address for this address.
Definition: netAddress.cxx:138
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
A simple place to store and manipulate tcp and port address for communication layer.
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
Definition: socket_ip.h:27
bool SetReuseAddress(bool flag=true)
Informs a socket to reuse IP address as needed.
Definition: socket_ip.h:212
static int GetLastError()
Gets the last errcode from a socket operation.
Definition: socket_ip.h:140
void Close()
Closes a socket if it is open (allocated).
Definition: socket_ip.h:128
int SetRecvBufferSize(int size)
Ok it sets the recv buffer size for both tcp and UDP.
Definition: socket_ip.h:157
bool Active()
Ask if the socket is open (allocated)
Definition: socket_ip.h:96
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
Definition: socket_ip.h:253
int SetNonBlocking()
this function will throw a socket into non-blocking mode
Definition: socket_ip.h:169
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
Definition: socket_tcp.h:12
int SetSendBufferSize(int insize)
Just like it sounds.
Definition: socket_tcp.h:107
int DontLinger()
Turn off the linger flag.
Definition: socket_tcp.h:91
int SetLinger(int interval_seconds=0)
will control the behavior of SO_LINGER for a TCP socket
Definition: socket_tcp.h:74
int SetNoDelay(bool flag=true)
Disable Nagle algorithm.
Definition: socket_tcp.h:62
Base functionality for a combination UDP Reader and Writer.
Definition: socket_udp.h:24
bool SendTo(const char *data, int len, const Socket_Address &address)
Send data to specified address.
Definition: socket_udp.h:107
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
static void force_yield()
Suspends the current thread for the rest of the current epoch.
Definition: thread.I:201
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
Definition: trueClock.I:68
bool is_exact_type(TypeHandle handle) const
Returns true if the current object is the indicated type exactly.
Definition: typedObject.I:38
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.