Panda3D
Loading...
Searching...
No Matches
connectionWriter.cxx
Go to the documentation of this file.
1/**
2 * PANDA 3D SOFTWARE
3 * Copyright (c) Carnegie Mellon University. All rights reserved.
4 *
5 * All use of this software is subject to the terms of the revised BSD
6 * license. You should have received a copy of this license along
7 * with this source code in a file named "LICENSE."
8 *
9 * @file connectionWriter.cxx
10 * @author drose
11 * @date 2000-02-08
12 */
13
14#include "connectionWriter.h"
15#include "connectionManager.h"
16#include "datagramTCPHeader.h"
17#include "config_net.h"
18#include "socket_tcp.h"
19#include "socket_udp.h"
20#include "pnotify.h"
21#include "config_downloader.h"
22
23/**
24 *
25 */
26ConnectionWriter::WriterThread::
27WriterThread(ConnectionWriter *writer, const std::string &thread_name,
28 int thread_index) :
29 Thread(make_thread_name(thread_name, thread_index),
30 make_thread_name(thread_name, thread_index)),
31 _writer(writer),
32 _thread_index(thread_index)
33{
34}
35
36/**
37 *
38 */
39void ConnectionWriter::WriterThread::
40thread_main() {
41 _writer->thread_run(_thread_index);
42}
43
44/**
45 * Creates a new ConnectionWriter with the indicated number of threads to
46 * handle output.
47 *
48 * If num_threads is 0, all datagrams will be sent immediately instead of
49 * queueing for later transmission by a thread.
50 */
52ConnectionWriter(ConnectionManager *manager, int num_threads,
53 const std::string &thread_name) :
54 _manager(manager)
55{
57#ifndef NDEBUG
58 if (num_threads != 0) {
59 if (net_cat.is_debug()) {
60 net_cat.debug()
61 << "Threading support is not available.\n";
62 }
63 }
64#endif // NDEBUG
65 num_threads = 0;
66 }
67
68 _raw_mode = false;
69 _tcp_header_size = tcp_header_size;
70 _immediate = (num_threads <= 0);
71 _shutdown = false;
72
73 std::string writer_thread_name = thread_name;
74 if (thread_name.empty()) {
75 writer_thread_name = "WriterThread";
76 }
77 int i;
78 for (i = 0; i < num_threads; i++) {
79 PT(WriterThread) thread = new WriterThread(this, writer_thread_name, i);
80 _threads.push_back(thread);
81 }
82 for (i = 0; i < num_threads; i++) {
83 _threads[i]->start(net_thread_priority, true);
84 }
85
86 _manager->add_writer(this);
87}
88
89/**
90 *
91 */
92ConnectionWriter::
93~ConnectionWriter() {
94 if (_manager != nullptr) {
95 _manager->remove_writer(this);
96 }
97
98 shutdown();
99}
100
101/**
102 * Limits the number of packets that may be pending on the outbound queue.
103 * This only has an effect when using threads; if num_threads is 0, then all
104 * packets are sent immediately.
105 */
107set_max_queue_size(int max_size) {
108 _queue.set_max_queue_size(max_size);
109}
110
111/**
112 * Returns the maximum size the queue is allowed to grow to. See
113 * set_max_queue_size().
114 */
116get_max_queue_size() const {
117 return _queue.get_max_queue_size();
118}
119
120/**
121 * Returns the current number of things in the queue.
122 */
125 return _queue.get_current_queue_size();
126}
127
128
129/**
130 * Enqueues a datagram for transmittal on the indicated socket. Since the
131 * host address is not specified with this form, this function should only be
132 * used for sending TCP packets. Use the other send() method for sending UDP
133 * packets.
134 *
135 * Returns true if successful, false if there was an error. In the normal,
136 * threaded case, this function only returns false if the send queue is
137 * filled; it's impossible to detect a transmission error at this point.
138 *
139 * If block is true, this will not return false if the send queue is filled;
140 * instead, it will wait until there is space available.
141 */
143send(const Datagram &datagram, const PT(Connection) &connection, bool block) {
144 nassertr(!_shutdown, false);
145 nassertr(connection != nullptr, false);
146 nassertr(connection->get_socket()->is_exact_type(Socket_TCP::get_class_type()), false);
147
148 NetDatagram copy(datagram);
149 copy.set_connection(connection);
150
151 if (_immediate) {
152 if (_raw_mode) {
153 return connection->send_raw_datagram(copy);
154 } else {
155 return connection->send_datagram(copy, _tcp_header_size);
156 }
157 } else {
158 return _queue.insert(copy, block);
159 }
160}
161
162
163/**
164 * Enqueues a datagram for transmittal on the indicated socket. This form of
165 * the function allows the specification of a destination host address, and so
166 * is appropriate for UDP packets. Use the other send() method for sending
167 * TCP packets.
168 *
169 * Returns true if successful, false if there was an error. In the normal,
170 * threaded case, this function only returns false if the send queue is
171 * filled; it's impossible to detect a transmission error at this point.
172 *
173 * If block is true, this will not return false if the send queue is filled;
174 * instead, it will wait until there is space available.
175 */
177send(const Datagram &datagram, const PT(Connection) &connection,
178 const NetAddress &address, bool block) {
179 nassertr(!_shutdown, false);
180 nassertr(connection != nullptr, false);
181 nassertr(connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()), false);
182
183 if ((int)datagram.get_length() > maximum_udp_datagram) {
184 net_cat.warning()
185 << "Attempt to send UDP datagram of " << datagram.get_length()
186 << " bytes, more than the\n"
187 << "currently defined maximum of " << maximum_udp_datagram
188 << " bytes.\n";
189 }
190
191 NetDatagram copy(datagram);
192 copy.set_connection(connection);
193 copy.set_address(address);
194
195 if (_immediate) {
196 if (_raw_mode) {
197 return connection->send_raw_datagram(copy);
198 } else {
199 return connection->send_datagram(copy, _tcp_header_size);
200 }
201 } else {
202 return _queue.insert(copy, block);
203 }
204}
205
206/**
207 * Returns true if the datagram is small enough to be sent over a UDP packet,
208 * false otherwise.
209 */
211is_valid_for_udp(const Datagram &datagram) const {
212 return (int)datagram.get_length() <= maximum_udp_datagram;
213}
214
215/**
216 * Returns a pointer to the ConnectionManager object that serves this
217 * ConnectionWriter.
218 */
220get_manager() const {
221 return _manager;
222}
223
224/**
225 * Returns true if the writer is an immediate writer, i.e. it has no threads.
226 */
228is_immediate() const {
229 return _immediate;
230}
231
232/**
233 * Returns the number of threads the ConnectionWriter has been created with.
234 */
236get_num_threads() const {
237 return _threads.size();
238}
239
240/**
241 * Sets the ConnectionWriter into raw mode (or turns off raw mode). In raw
242 * mode, datagrams are not sent along with their headers; the bytes in the
243 * datagram are simply sent down the pipe.
244 *
245 * Setting the ConnectionWriter to raw mode must be done with care. This can
246 * only be done when the matching ConnectionReader is also set to raw mode, or
247 * when the ConnectionWriter is communicating to a process that does not
248 * expect datagrams.
249 */
251set_raw_mode(bool mode) {
252 _raw_mode = mode;
253}
254
255/**
256 * Returns the current setting of the raw mode flag. See set_raw_mode().
257 */
259get_raw_mode() const {
260 return _raw_mode;
261}
262
263/**
264 * Sets the header size of TCP packets. At the present, legal values for this
265 * are 0, 2, or 4; this specifies the number of bytes to use encode the
266 * datagram length at the start of each TCP datagram. Sender and receiver
267 * must independently agree on this.
268 */
270set_tcp_header_size(int tcp_header_size) {
271 _tcp_header_size = tcp_header_size;
272}
273
274/**
275 * Returns the current setting of TCP header size. See set_tcp_header_size().
276 */
278get_tcp_header_size() const {
279 return _tcp_header_size;
280}
281
282/**
283 * Stops all the threads and cleans them up. This is called automatically by
284 * the destructor, but it may be called explicitly before destruction.
285 */
287shutdown() {
288 if (_shutdown) {
289 return;
290 }
291 _shutdown = true;
292
293 // First, shutdown the queue. This will tell our threads they're done.
294 _queue.shutdown();
295
296 // Now wait for all threads to terminate.
297 Threads::iterator ti;
298 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
299 (*ti)->join();
300 }
301 _threads.clear();
302}
303
304/**
305 * This should normally only be called when the associated ConnectionManager
306 * destructs. It resets the ConnectionManager pointer to NULL so we don't
307 * have a floating pointer. This makes the ConnectionWriter invalid;
308 * presumably it also will be destructed momentarily.
309 */
310void ConnectionWriter::
311clear_manager() {
312 _manager = nullptr;
313 shutdown();
314}
315
316/**
317 * This is the actual executing function for each thread.
318 */
319void ConnectionWriter::
320thread_run(int thread_index) {
321 nassertv(!_immediate);
322
323 NetDatagram datagram;
324 while (_queue.extract(datagram)) {
325 if (_raw_mode) {
326 datagram.get_connection()->send_raw_datagram(datagram);
327 } else {
328 datagram.get_connection()->send_datagram(datagram, _tcp_header_size);
329 }
331 }
332}
The primary interface to the low-level networking layer in this package.
This class handles threaded delivery of datagrams to various TCP or UDP sockets.
void set_max_queue_size(int max_size)
Limits the number of packets that may be pending on the outbound queue.
int get_num_threads() const
Returns the number of threads the ConnectionWriter has been created with.
bool send(const Datagram &datagram, const PT(Connection) &connection, bool block=false)
Enqueues a datagram for transmittal on the indicated socket.
bool is_valid_for_udp(const Datagram &datagram) const
Returns true if the datagram is small enough to be sent over a UDP packet, false otherwise.
void set_raw_mode(bool mode)
Sets the ConnectionWriter into raw mode (or turns off raw mode).
void shutdown()
Stops all the threads and cleans them up.
int get_max_queue_size() const
Returns the maximum size the queue is allowed to grow to.
bool is_immediate() const
Returns true if the writer is an immediate writer, i.e.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionWriter.
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
ConnectionWriter(ConnectionManager *manager, int num_threads, const std::string &thread_name=std::string())
Creates a new ConnectionWriter with the indicated number of threads to handle output.
int get_current_queue_size() const
Returns the current number of things in the queue.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
Represents a single TCP or UDP socket for input or output.
Definition connection.h:29
Socket_IP * get_socket() const
Returns the internal Socket_IP that defines the connection.
int get_max_queue_size() const
Returns the maximum size the queue is allowed to grow to.
void shutdown()
Marks the queue as shutting down, which will eventually cause all threads blocking on extract() to re...
void set_max_queue_size(int max_size)
Sets the maximum size the queue is allowed to grow to.
bool extract(NetDatagram &result)
Extracts a datagram from the head of the queue, if one is available.
bool insert(const NetDatagram &data, bool block=false)
Inserts the indicated datagram onto the end of the queue, and returns.
int get_current_queue_size() const
Returns the current number of things in the queue.
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...
Definition datagram.h:38
size_t get_length() const
Returns the number of bytes in the datagram.
Definition datagram.I:335
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition netAddress.h:25
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition netDatagram.h:40
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
A thread; that is, a lightweight process.
Definition thread.h:46
is_threading_supported
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition thread.h:112
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition thread.I:212
bool is_exact_type(TypeHandle handle) const
Returns true if the current object is the indicated type exactly.
Definition typedObject.I:38
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.