Panda3D
connectionWriter.cxx
1 // Filename: connectionWriter.cxx
2 // Created by: drose (08Feb00)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "connectionWriter.h"
16 #include "connectionManager.h"
17 #include "datagramTCPHeader.h"
18 #include "config_net.h"
19 #include "socket_tcp.h"
20 #include "socket_udp.h"
21 #include "pnotify.h"
22 #include "config_downloader.h"
23 
24 ////////////////////////////////////////////////////////////////////
25 // Function: ConnectionWriter::WriterThread::Constructor
26 // Access: Public
27 // Description:
28 ////////////////////////////////////////////////////////////////////
29 ConnectionWriter::WriterThread::
30 WriterThread(ConnectionWriter *writer, const string &thread_name,
31  int thread_index) :
32  Thread(make_thread_name(thread_name, thread_index),
33  make_thread_name(thread_name, thread_index)),
34  _writer(writer),
35  _thread_index(thread_index)
36 {
37 }
38 
39 ////////////////////////////////////////////////////////////////////
40 // Function: ConnectionWriter::WriterThread::thread_main
41 // Access: Public, Virtual
42 // Description:
43 ////////////////////////////////////////////////////////////////////
44 void ConnectionWriter::WriterThread::
45 thread_main() {
46  _writer->thread_run(_thread_index);
47 }
48 
49 ////////////////////////////////////////////////////////////////////
50 // Function: ConnectionWriter::Constructor
51 // Access: Public
52 // Description: Creates a new ConnectionWriter with the indicated
53 // number of threads to handle output.
54 //
55 // If num_threads is 0, all datagrams will be sent
56 // immediately instead of queueing for later
57 // transmission by a thread.
58 ////////////////////////////////////////////////////////////////////
60 ConnectionWriter(ConnectionManager *manager, int num_threads,
61  const string &thread_name) :
62  _manager(manager)
63 {
65 #ifndef NDEBUG
66  if (num_threads != 0) {
67  if (net_cat.is_debug()) {
68  net_cat.debug()
69  << "Threading support is not available.\n";
70  }
71  }
72 #endif // NDEBUG
73  num_threads = 0;
74  }
75 
76  _raw_mode = false;
77  _tcp_header_size = tcp_header_size;
78  _immediate = (num_threads <= 0);
79  _shutdown = false;
80 
81  string writer_thread_name = thread_name;
82  if (thread_name.empty()) {
83  writer_thread_name = "WriterThread";
84  }
85  int i;
86  for (i = 0; i < num_threads; i++) {
87  PT(WriterThread) thread = new WriterThread(this, writer_thread_name, i);
88  _threads.push_back(thread);
89  }
90  for (i = 0; i < num_threads; i++) {
91  _threads[i]->start(net_thread_priority, true);
92  }
93 
94  _manager->add_writer(this);
95 }
96 
97 ////////////////////////////////////////////////////////////////////
98 // Function: ConnectionWriter::Destructor
99 // Access: Public
100 // Description:
101 ////////////////////////////////////////////////////////////////////
102 ConnectionWriter::
103 ~ConnectionWriter() {
104  if (_manager != (ConnectionManager *)NULL) {
105  _manager->remove_writer(this);
106  }
107 
108  shutdown();
109 }
110 
111 ////////////////////////////////////////////////////////////////////
112 // Function: ConnectionWriter::set_max_queue_size
113 // Access: Public
114 // Description: Limits the number of packets that may be pending on
115 // the outbound queue. This only has an effect when
116 // using threads; if num_threads is 0, then all packets
117 // are sent immediately.
118 ////////////////////////////////////////////////////////////////////
120 set_max_queue_size(int max_size) {
121  _queue.set_max_queue_size(max_size);
122 }
123 
124 ////////////////////////////////////////////////////////////////////
125 // Function: ConnectionWriter::get_max_queue_size
126 // Access: Public
127 // Description: Returns the maximum size the queue is allowed to grow
128 // to. See set_max_queue_size().
129 ////////////////////////////////////////////////////////////////////
132  return _queue.get_max_queue_size();
133 }
134 
135 ////////////////////////////////////////////////////////////////////
136 // Function: ConnectionWriter::get_current_queue_size
137 // Access: Public
138 // Description: Returns the current number of things in the queue.
139 ////////////////////////////////////////////////////////////////////
142  return _queue.get_current_queue_size();
143 }
144 
145 
146 ////////////////////////////////////////////////////////////////////
147 // Function: ConnectionWriter::send
148 // Access: Public
149 // Description: Enqueues a datagram for transmittal on the indicated
150 // socket. Since the host address is not specified with
151 // this form, this function should only be used for
152 // sending TCP packets. Use the other send() method for
153 // sending UDP packets.
154 //
155 // Returns true if successful, false if there was an
156 // error. In the normal, threaded case, this function
157 // only returns false if the send queue is filled; it's
158 // impossible to detect a transmission error at this
159 // point.
160 //
161 // If block is true, this will not return false if the
162 // send queue is filled; instead, it will wait until
163 // there is space available.
164 ////////////////////////////////////////////////////////////////////
166 send(const Datagram &datagram, const PT(Connection) &connection, bool block) {
167  nassertr(!_shutdown, false);
168  nassertr(connection != (Connection *)NULL, false);
169  nassertr(connection->get_socket()->is_exact_type(Socket_TCP::get_class_type()), false);
170 
171  NetDatagram copy(datagram);
172  copy.set_connection(connection);
173 
174  if (_immediate) {
175  if (_raw_mode) {
176  return connection->send_raw_datagram(copy);
177  } else {
178  return connection->send_datagram(copy, _tcp_header_size);
179  }
180  } else {
181  return _queue.insert(copy, block);
182  }
183 }
184 
185 
186 ////////////////////////////////////////////////////////////////////
187 // Function: ConnectionWriter::send
188 // Access: Public
189 // Description: Enqueues a datagram for transmittal on the indicated
190 // socket. This form of the function allows the
191 // specification of a destination host address, and so
192 // is appropriate for UDP packets. Use the other send()
193 // method for sending TCP packets.
194 //
195 // Returns true if successful, false if there was an
196 // error. In the normal, threaded case, this function
197 // only returns false if the send queue is filled; it's
198 // impossible to detect a transmission error at this
199 // point.
200 //
201 // If block is true, this will not return false if the
202 // send queue is filled; instead, it will wait until
203 // there is space available.
204 ////////////////////////////////////////////////////////////////////
206 send(const Datagram &datagram, const PT(Connection) &connection,
207  const NetAddress &address, bool block) {
208  nassertr(!_shutdown, false);
209  nassertr(connection != (Connection *)NULL, false);
210  nassertr(connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()), false);
211 
212  if ((int)datagram.get_length() > maximum_udp_datagram) {
213  net_cat.warning()
214  << "Attempt to send UDP datagram of " << datagram.get_length()
215  << " bytes, more than the\n"
216  << "currently defined maximum of " << maximum_udp_datagram
217  << " bytes.\n";
218  }
219 
220  NetDatagram copy(datagram);
221  copy.set_connection(connection);
222  copy.set_address(address);
223 
224  if (_immediate) {
225  if (_raw_mode) {
226  return connection->send_raw_datagram(copy);
227  } else {
228  return connection->send_datagram(copy, _tcp_header_size);
229  }
230  } else {
231  return _queue.insert(copy, block);
232  }
233 }
234 
235 ////////////////////////////////////////////////////////////////////
236 // Function: ConnectionWriter::is_valid_for_udp
237 // Access: Public
238 // Description: Returns true if the datagram is small enough to be
239 // sent over a UDP packet, false otherwise.
240 ////////////////////////////////////////////////////////////////////
242 is_valid_for_udp(const Datagram &datagram) const {
243  return (int)datagram.get_length() <= maximum_udp_datagram;
244 }
245 
246 ////////////////////////////////////////////////////////////////////
247 // Function: ConnectionWriter::get_manager
248 // Access: Public
249 // Description: Returns a pointer to the ConnectionManager object
250 // that serves this ConnectionWriter.
251 ////////////////////////////////////////////////////////////////////
253 get_manager() const {
254  return _manager;
255 }
256 
257 ////////////////////////////////////////////////////////////////////
258 // Function: ConnectionWriter::is_immediate
259 // Access: Public
260 // Description: Returns true if the writer is an immediate writer,
261 // i.e. it has no threads.
262 ////////////////////////////////////////////////////////////////////
264 is_immediate() const {
265  return _immediate;
266 }
267 
268 ////////////////////////////////////////////////////////////////////
269 // Function: ConnectionWriter::get_num_threads
270 // Access: Public
271 // Description: Returns the number of threads the ConnectionWriter
272 // has been created with.
273 ////////////////////////////////////////////////////////////////////
276  return _threads.size();
277 }
278 
279 ////////////////////////////////////////////////////////////////////
280 // Function: ConnectionWriter::set_raw_mode
281 // Access: Public
282 // Description: Sets the ConnectionWriter into raw mode (or turns off
283 // raw mode). In raw mode, datagrams are not sent along
284 // with their headers; the bytes in the datagram are
285 // simply sent down the pipe.
286 //
287 // Setting the ConnectionWriter to raw mode must be done
288 // with care. This can only be done when the matching
289 // ConnectionReader is also set to raw mode, or when the
290 // ConnectionWriter is communicating to a process that
291 // does not expect datagrams.
292 ////////////////////////////////////////////////////////////////////
294 set_raw_mode(bool mode) {
295  _raw_mode = mode;
296 }
297 
298 ////////////////////////////////////////////////////////////////////
299 // Function: ConnectionWriter::get_raw_mode
300 // Access: Public
301 // Description: Returns the current setting of the raw mode flag.
302 // See set_raw_mode().
303 ////////////////////////////////////////////////////////////////////
305 get_raw_mode() const {
306  return _raw_mode;
307 }
308 
309 ////////////////////////////////////////////////////////////////////
310 // Function: ConnectionWriter::set_tcp_header_size
311 // Access: Public
312 // Description: Sets the header size of TCP packets. At the present,
313 // legal values for this are 0, 2, or 4; this specifies
314 // the number of bytes to use encode the datagram length
315 // at the start of each TCP datagram. Sender and
316 // receiver must independently agree on this.
317 ////////////////////////////////////////////////////////////////////
319 set_tcp_header_size(int tcp_header_size) {
320  _tcp_header_size = tcp_header_size;
321 }
322 
323 ////////////////////////////////////////////////////////////////////
324 // Function: ConnectionWriter::get_tcp_header_size
325 // Access: Public
326 // Description: Returns the current setting of TCP header size.
327 // See set_tcp_header_size().
328 ////////////////////////////////////////////////////////////////////
331  return _tcp_header_size;
332 }
333 
334 ////////////////////////////////////////////////////////////////////
335 // Function: ConnectionWriter::shutdown
336 // Access: Published
337 // Description: Stops all the threads and cleans them up. This is
338 // called automatically by the destructor, but it may be
339 // called explicitly before destruction.
340 ////////////////////////////////////////////////////////////////////
343  if (_shutdown) {
344  return;
345  }
346  _shutdown = true;
347 
348  // First, shutdown the queue. This will tell our threads they're
349  // done.
350  _queue.shutdown();
351 
352  // Now wait for all threads to terminate.
353  Threads::iterator ti;
354  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
355  (*ti)->join();
356  }
357  _threads.clear();
358 }
359 
360 ////////////////////////////////////////////////////////////////////
361 // Function: ConnectionWriter::clear_manager
362 // Access: Protected
363 // Description: This should normally only be called when the
364 // associated ConnectionManager destructs. It resets
365 // the ConnectionManager pointer to NULL so we don't
366 // have a floating pointer. This makes the
367 // ConnectionWriter invalid; presumably it also will be
368 // destructed momentarily.
369 ////////////////////////////////////////////////////////////////////
370 void ConnectionWriter::
371 clear_manager() {
372  _manager = (ConnectionManager *)NULL;
373  shutdown();
374 }
375 
376 ////////////////////////////////////////////////////////////////////
377 // Function: ConnectionWriter::thread_run
378 // Access: Private
379 // Description: This is the actual executing function for each
380 // thread.
381 ////////////////////////////////////////////////////////////////////
382 void ConnectionWriter::
383 thread_run(int thread_index) {
384  nassertv(!_immediate);
385 
386  NetDatagram datagram;
387  while (_queue.extract(datagram)) {
388  if (_raw_mode) {
389  datagram.get_connection()->send_raw_datagram(datagram);
390  } else {
391  datagram.get_connection()->send_datagram(datagram, _tcp_header_size);
392  }
394  }
395 }
void set_max_queue_size(int max_size)
Sets the maximum size the queue is allowed to grow to.
void set_max_queue_size(int max_size)
Limits the number of packets that may be pending on the outbound queue.
bool is_exact_type(TypeHandle handle) const
Returns true if the current object is the indicated type exactly.
Definition: typedObject.I:74
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:43
bool send(const Datagram &datagram, const PT(Connection) &connection, bool block=false)
Enqueues a datagram for transmittal on the indicated socket.
void shutdown()
Stops all the threads and cleans them up.
bool insert(const NetDatagram &data, bool block=false)
Inserts the indicated datagram onto the end of the queue, and returns.
int get_max_queue_size() const
Returns the maximum size the queue is allowed to grow to.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionWriter.
The primary interface to the low-level networking layer in this package.
int get_current_queue_size() const
Returns the current number of things in the queue.
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:263
ConnectionWriter(ConnectionManager *manager, int num_threads, const string &thread_name=string())
Creates a new ConnectionWriter with the indicated number of threads to handle output.
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
static bool is_threading_supported()
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.I:185
Socket_IP * get_socket() const
Returns the internal Socket_IP that defines the connection.
Definition: connection.cxx:107
bool extract(NetDatagram &result)
Extracts a datagram from the head of the queue, if one is available.
void set_raw_mode(bool mode)
Sets the ConnectionWriter into raw mode (or turns off raw mode).
bool is_valid_for_udp(const Datagram &datagram) const
Returns true if the datagram is small enough to be sent over a UDP packet, false otherwise.
This class handles threaded delivery of datagrams to various TCP or UDP sockets.
int get_max_queue_size() const
Returns the maximum size the queue is allowed to grow to.
void shutdown()
Marks the queue as shutting down, which will eventually cause all threads blocking on extract() to re...
A thread; that is, a lightweight process.
Definition: thread.h:51
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
int get_num_threads() const
Returns the number of threads the ConnectionWriter has been created with.
bool is_immediate() const
Returns true if the writer is an immediate writer, i.e.
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:32
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...
Definition: datagram.h:43
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:457
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound...
Definition: netAddress.h:27
int get_current_queue_size() const
Returns the current number of things in the queue.