Panda3D
connectionWriter.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file connectionWriter.cxx
10  * @author drose
11  * @date 2000-02-08
12  */
13 
14 #include "connectionWriter.h"
15 #include "connectionManager.h"
16 #include "datagramTCPHeader.h"
17 #include "config_net.h"
18 #include "socket_tcp.h"
19 #include "socket_udp.h"
20 #include "pnotify.h"
21 #include "config_downloader.h"
22 
23 /**
24  *
25  */
26 ConnectionWriter::WriterThread::
27 WriterThread(ConnectionWriter *writer, const std::string &thread_name,
28  int thread_index) :
29  Thread(make_thread_name(thread_name, thread_index),
30  make_thread_name(thread_name, thread_index)),
31  _writer(writer),
32  _thread_index(thread_index)
33 {
34 }
35 
36 /**
37  *
38  */
39 void ConnectionWriter::WriterThread::
40 thread_main() {
41  _writer->thread_run(_thread_index);
42 }
43 
44 /**
45  * Creates a new ConnectionWriter with the indicated number of threads to
46  * handle output.
47  *
48  * If num_threads is 0, all datagrams will be sent immediately instead of
49  * queueing for later transmission by a thread.
50  */
52 ConnectionWriter(ConnectionManager *manager, int num_threads,
53  const std::string &thread_name) :
54  _manager(manager)
55 {
56  if (!Thread::is_threading_supported()) {
57 #ifndef NDEBUG
58  if (num_threads != 0) {
59  if (net_cat.is_debug()) {
60  net_cat.debug()
61  << "Threading support is not available.\n";
62  }
63  }
64 #endif // NDEBUG
65  num_threads = 0;
66  }
67 
68  _raw_mode = false;
69  _tcp_header_size = tcp_header_size;
70  _immediate = (num_threads <= 0);
71  _shutdown = false;
72 
73  std::string writer_thread_name = thread_name;
74  if (thread_name.empty()) {
75  writer_thread_name = "WriterThread";
76  }
77  int i;
78  for (i = 0; i < num_threads; i++) {
79  PT(WriterThread) thread = new WriterThread(this, writer_thread_name, i);
80  _threads.push_back(thread);
81  }
82  for (i = 0; i < num_threads; i++) {
83  _threads[i]->start(net_thread_priority, true);
84  }
85 
86  _manager->add_writer(this);
87 }
88 
89 /**
90  *
91  */
92 ConnectionWriter::
93 ~ConnectionWriter() {
94  if (_manager != nullptr) {
95  _manager->remove_writer(this);
96  }
97 
98  shutdown();
99 }
100 
101 /**
102  * Limits the number of packets that may be pending on the outbound queue.
103  * This only has an effect when using threads; if num_threads is 0, then all
104  * packets are sent immediately.
105  */
107 set_max_queue_size(int max_size) {
108  _queue.set_max_queue_size(max_size);
109 }
110 
111 /**
112  * Returns the maximum size the queue is allowed to grow to. See
113  * set_max_queue_size().
114  */
117  return _queue.get_max_queue_size();
118 }
119 
120 /**
121  * Returns the current number of things in the queue.
122  */
125  return _queue.get_current_queue_size();
126 }
127 
128 
129 /**
130  * Enqueues a datagram for transmittal on the indicated socket. Since the
131  * host address is not specified with this form, this function should only be
132  * used for sending TCP packets. Use the other send() method for sending UDP
133  * packets.
134  *
135  * Returns true if successful, false if there was an error. In the normal,
136  * threaded case, this function only returns false if the send queue is
137  * filled; it's impossible to detect a transmission error at this point.
138  *
139  * If block is true, this will not return false if the send queue is filled;
140  * instead, it will wait until there is space available.
141  */
143 send(const Datagram &datagram, const PT(Connection) &connection, bool block) {
144  nassertr(!_shutdown, false);
145  nassertr(connection != nullptr, false);
146  nassertr(connection->get_socket()->is_exact_type(Socket_TCP::get_class_type()), false);
147 
148  NetDatagram copy(datagram);
149  copy.set_connection(connection);
150 
151  if (_immediate) {
152  if (_raw_mode) {
153  return connection->send_raw_datagram(copy);
154  } else {
155  return connection->send_datagram(copy, _tcp_header_size);
156  }
157  } else {
158  return _queue.insert(copy, block);
159  }
160 }
161 
162 
163 /**
164  * Enqueues a datagram for transmittal on the indicated socket. This form of
165  * the function allows the specification of a destination host address, and so
166  * is appropriate for UDP packets. Use the other send() method for sending
167  * TCP packets.
168  *
169  * Returns true if successful, false if there was an error. In the normal,
170  * threaded case, this function only returns false if the send queue is
171  * filled; it's impossible to detect a transmission error at this point.
172  *
173  * If block is true, this will not return false if the send queue is filled;
174  * instead, it will wait until there is space available.
175  */
177 send(const Datagram &datagram, const PT(Connection) &connection,
178  const NetAddress &address, bool block) {
179  nassertr(!_shutdown, false);
180  nassertr(connection != nullptr, false);
181  nassertr(connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()), false);
182 
183  if ((int)datagram.get_length() > maximum_udp_datagram) {
184  net_cat.warning()
185  << "Attempt to send UDP datagram of " << datagram.get_length()
186  << " bytes, more than the\n"
187  << "currently defined maximum of " << maximum_udp_datagram
188  << " bytes.\n";
189  }
190 
191  NetDatagram copy(datagram);
192  copy.set_connection(connection);
193  copy.set_address(address);
194 
195  if (_immediate) {
196  if (_raw_mode) {
197  return connection->send_raw_datagram(copy);
198  } else {
199  return connection->send_datagram(copy, _tcp_header_size);
200  }
201  } else {
202  return _queue.insert(copy, block);
203  }
204 }
205 
206 /**
207  * Returns true if the datagram is small enough to be sent over a UDP packet,
208  * false otherwise.
209  */
211 is_valid_for_udp(const Datagram &datagram) const {
212  return (int)datagram.get_length() <= maximum_udp_datagram;
213 }
214 
215 /**
216  * Returns a pointer to the ConnectionManager object that serves this
217  * ConnectionWriter.
218  */
220 get_manager() const {
221  return _manager;
222 }
223 
224 /**
225  * Returns true if the writer is an immediate writer, i.e. it has no threads.
226  */
228 is_immediate() const {
229  return _immediate;
230 }
231 
232 /**
233  * Returns the number of threads the ConnectionWriter has been created with.
234  */
237  return _threads.size();
238 }
239 
240 /**
241  * Sets the ConnectionWriter into raw mode (or turns off raw mode). In raw
242  * mode, datagrams are not sent along with their headers; the bytes in the
243  * datagram are simply sent down the pipe.
244  *
245  * Setting the ConnectionWriter to raw mode must be done with care. This can
246  * only be done when the matching ConnectionReader is also set to raw mode, or
247  * when the ConnectionWriter is communicating to a process that does not
248  * expect datagrams.
249  */
251 set_raw_mode(bool mode) {
252  _raw_mode = mode;
253 }
254 
255 /**
256  * Returns the current setting of the raw mode flag. See set_raw_mode().
257  */
259 get_raw_mode() const {
260  return _raw_mode;
261 }
262 
263 /**
264  * Sets the header size of TCP packets. At the present, legal values for this
265  * are 0, 2, or 4; this specifies the number of bytes to use encode the
266  * datagram length at the start of each TCP datagram. Sender and receiver
267  * must independently agree on this.
268  */
270 set_tcp_header_size(int tcp_header_size) {
271  _tcp_header_size = tcp_header_size;
272 }
273 
274 /**
275  * Returns the current setting of TCP header size. See set_tcp_header_size().
276  */
279  return _tcp_header_size;
280 }
281 
282 /**
283  * Stops all the threads and cleans them up. This is called automatically by
284  * the destructor, but it may be called explicitly before destruction.
285  */
288  if (_shutdown) {
289  return;
290  }
291  _shutdown = true;
292 
293  // First, shutdown the queue. This will tell our threads they're done.
294  _queue.shutdown();
295 
296  // Now wait for all threads to terminate.
297  Threads::iterator ti;
298  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
299  (*ti)->join();
300  }
301  _threads.clear();
302 }
303 
304 /**
305  * This should normally only be called when the associated ConnectionManager
306  * destructs. It resets the ConnectionManager pointer to NULL so we don't
307  * have a floating pointer. This makes the ConnectionWriter invalid;
308  * presumably it also will be destructed momentarily.
309  */
310 void ConnectionWriter::
311 clear_manager() {
312  _manager = nullptr;
313  shutdown();
314 }
315 
316 /**
317  * This is the actual executing function for each thread.
318  */
319 void ConnectionWriter::
320 thread_run(int thread_index) {
321  nassertv(!_immediate);
322 
323  NetDatagram datagram;
324  while (_queue.extract(datagram)) {
325  if (_raw_mode) {
326  datagram.get_connection()->send_raw_datagram(datagram);
327  } else {
328  datagram.get_connection()->send_datagram(datagram, _tcp_header_size);
329  }
331  }
332 }
void set_max_queue_size(int max_size)
Sets the maximum size the queue is allowed to grow to.
void set_max_queue_size(int max_size)
Limits the number of packets that may be pending on the outbound queue.
bool is_exact_type(TypeHandle handle) const
Returns true if the current object is the indicated type exactly.
Definition: typedObject.I:38
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
bool send(const Datagram &datagram, const PT(Connection) &connection, bool block=false)
Enqueues a datagram for transmittal on the indicated socket.
void shutdown()
Stops all the threads and cleans them up.
bool insert(const NetDatagram &data, bool block=false)
Inserts the indicated datagram onto the end of the queue, and returns.
int get_max_queue_size() const
Returns the maximum size the queue is allowed to grow to.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionWriter.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
The primary interface to the low-level networking layer in this package.
int get_current_queue_size() const
Returns the current number of things in the queue.
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
Socket_IP * get_socket() const
Returns the internal Socket_IP that defines the connection.
Definition: connection.cxx:93
bool extract(NetDatagram &result)
Extracts a datagram from the head of the queue, if one is available.
void set_raw_mode(bool mode)
Sets the ConnectionWriter into raw mode (or turns off raw mode).
bool is_valid_for_udp(const Datagram &datagram) const
Returns true if the datagram is small enough to be sent over a UDP packet, false otherwise.
This class handles threaded delivery of datagrams to various TCP or UDP sockets.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int get_max_queue_size() const
Returns the maximum size the queue is allowed to grow to.
ConnectionWriter(ConnectionManager *manager, int num_threads, const std::string &thread_name=std::string())
Creates a new ConnectionWriter with the indicated number of threads to handle output.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void shutdown()
Marks the queue as shutting down, which will eventually cause all threads blocking on extract() to re...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A thread; that is, a lightweight process.
Definition: thread.h:46
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
Definition: netDatagram.cxx:88
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int get_num_threads() const
Returns the number of threads the ConnectionWriter has been created with.
bool is_immediate() const
Returns true if the writer is an immediate writer, i.e.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:29
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...
Definition: datagram.h:38
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition: netAddress.h:25
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int get_current_queue_size() const
Returns the current number of things in the queue.