Panda3D
|
00001 // Filename: connectionWriter.cxx 00002 // Created by: drose (08Feb00) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "connectionWriter.h" 00016 #include "connectionManager.h" 00017 #include "datagramTCPHeader.h" 00018 #include "config_net.h" 00019 #include "socket_tcp.h" 00020 #include "socket_udp.h" 00021 #include "pnotify.h" 00022 #include "config_downloader.h" 00023 00024 //////////////////////////////////////////////////////////////////// 00025 // Function: ConnectionWriter::WriterThread::Constructor 00026 // Access: Public 00027 // Description: 00028 //////////////////////////////////////////////////////////////////// 00029 ConnectionWriter::WriterThread:: 00030 WriterThread(ConnectionWriter *writer, const string &thread_name, 00031 int thread_index) : 00032 Thread(make_thread_name(thread_name, thread_index), 00033 make_thread_name(thread_name, thread_index)), 00034 _writer(writer), 00035 _thread_index(thread_index) 00036 { 00037 } 00038 00039 //////////////////////////////////////////////////////////////////// 00040 // Function: ConnectionWriter::WriterThread::thread_main 00041 // Access: Public, Virtual 00042 // Description: 00043 //////////////////////////////////////////////////////////////////// 00044 void ConnectionWriter::WriterThread:: 00045 thread_main() { 00046 _writer->thread_run(_thread_index); 00047 } 00048 00049 //////////////////////////////////////////////////////////////////// 00050 // Function: ConnectionWriter::Constructor 00051 // Access: Public 00052 // Description: Creates a new ConnectionWriter with the indicated 00053 // number of threads to handle output. 00054 // 00055 // If num_threads is 0, all datagrams will be sent 00056 // immediately instead of queueing for later 00057 // transmission by a thread. 00058 //////////////////////////////////////////////////////////////////// 00059 ConnectionWriter:: 00060 ConnectionWriter(ConnectionManager *manager, int num_threads, 00061 const string &thread_name) : 00062 _manager(manager) 00063 { 00064 if (!Thread::is_threading_supported()) { 00065 #ifndef NDEBUG 00066 if (num_threads != 0) { 00067 if (net_cat.is_debug()) { 00068 net_cat.debug() 00069 << "Threading support is not available.\n"; 00070 } 00071 } 00072 #endif // NDEBUG 00073 num_threads = 0; 00074 } 00075 00076 _raw_mode = false; 00077 _tcp_header_size = tcp_header_size; 00078 _immediate = (num_threads <= 0); 00079 _shutdown = false; 00080 00081 string writer_thread_name = thread_name; 00082 if (thread_name.empty()) { 00083 writer_thread_name = "WriterThread"; 00084 } 00085 int i; 00086 for (i = 0; i < num_threads; i++) { 00087 PT(WriterThread) thread = new WriterThread(this, writer_thread_name, i); 00088 _threads.push_back(thread); 00089 } 00090 for (i = 0; i < num_threads; i++) { 00091 _threads[i]->start(net_thread_priority, true); 00092 } 00093 00094 _manager->add_writer(this); 00095 } 00096 00097 //////////////////////////////////////////////////////////////////// 00098 // Function: ConnectionWriter::Destructor 00099 // Access: Public 00100 // Description: 00101 //////////////////////////////////////////////////////////////////// 00102 ConnectionWriter:: 00103 ~ConnectionWriter() { 00104 if (_manager != (ConnectionManager *)NULL) { 00105 _manager->remove_writer(this); 00106 } 00107 00108 shutdown(); 00109 } 00110 00111 //////////////////////////////////////////////////////////////////// 00112 // Function: ConnectionWriter::set_max_queue_size 00113 // Access: Public 00114 // Description: Limits the number of packets that may be pending on 00115 // the outbound queue. This only has an effect when 00116 // using threads; if num_threads is 0, then all packets 00117 // are sent immediately. 00118 //////////////////////////////////////////////////////////////////// 00119 void ConnectionWriter:: 00120 set_max_queue_size(int max_size) { 00121 _queue.set_max_queue_size(max_size); 00122 } 00123 00124 //////////////////////////////////////////////////////////////////// 00125 // Function: ConnectionWriter::get_max_queue_size 00126 // Access: Public 00127 // Description: Returns the maximum size the queue is allowed to grow 00128 // to. See set_max_queue_size(). 00129 //////////////////////////////////////////////////////////////////// 00130 int ConnectionWriter:: 00131 get_max_queue_size() const { 00132 return _queue.get_max_queue_size(); 00133 } 00134 00135 //////////////////////////////////////////////////////////////////// 00136 // Function: ConnectionWriter::get_current_queue_size 00137 // Access: Public 00138 // Description: Returns the current number of things in the queue. 00139 //////////////////////////////////////////////////////////////////// 00140 int ConnectionWriter:: 00141 get_current_queue_size() const { 00142 return _queue.get_current_queue_size(); 00143 } 00144 00145 00146 //////////////////////////////////////////////////////////////////// 00147 // Function: ConnectionWriter::send 00148 // Access: Public 00149 // Description: Enqueues a datagram for transmittal on the indicated 00150 // socket. Since the host address is not specified with 00151 // this form, this function should only be used for 00152 // sending TCP packets. Use the other send() method for 00153 // sending UDP packets. 00154 // 00155 // Returns true if successful, false if there was an 00156 // error. In the normal, threaded case, this function 00157 // only returns false if the send queue is filled; it's 00158 // impossible to detect a transmission error at this 00159 // point. 00160 // 00161 // If block is true, this will not return false if the 00162 // send queue is filled; instead, it will wait until 00163 // there is space available. 00164 //////////////////////////////////////////////////////////////////// 00165 bool ConnectionWriter:: 00166 send(const Datagram &datagram, const PT(Connection) &connection, bool block) { 00167 nassertr(!_shutdown, false); 00168 nassertr(connection != (Connection *)NULL, false); 00169 nassertr(connection->get_socket()->is_exact_type(Socket_TCP::get_class_type()), false); 00170 00171 NetDatagram copy(datagram); 00172 copy.set_connection(connection); 00173 00174 if (_immediate) { 00175 if (_raw_mode) { 00176 return connection->send_raw_datagram(copy); 00177 } else { 00178 return connection->send_datagram(copy, _tcp_header_size); 00179 } 00180 } else { 00181 return _queue.insert(copy, block); 00182 } 00183 } 00184 00185 00186 //////////////////////////////////////////////////////////////////// 00187 // Function: ConnectionWriter::send 00188 // Access: Public 00189 // Description: Enqueues a datagram for transmittal on the indicated 00190 // socket. This form of the function allows the 00191 // specification of a destination host address, and so 00192 // is appropriate for UDP packets. Use the other send() 00193 // method for sending TCP packets. 00194 // 00195 // Returns true if successful, false if there was an 00196 // error. In the normal, threaded case, this function 00197 // only returns false if the send queue is filled; it's 00198 // impossible to detect a transmission error at this 00199 // point. 00200 // 00201 // If block is true, this will not return false if the 00202 // send queue is filled; instead, it will wait until 00203 // there is space available. 00204 //////////////////////////////////////////////////////////////////// 00205 bool ConnectionWriter:: 00206 send(const Datagram &datagram, const PT(Connection) &connection, 00207 const NetAddress &address, bool block) { 00208 nassertr(!_shutdown, false); 00209 nassertr(connection != (Connection *)NULL, false); 00210 nassertr(connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()), false); 00211 00212 if ((int)datagram.get_length() > maximum_udp_datagram) { 00213 net_cat.warning() 00214 << "Attempt to send UDP datagram of " << datagram.get_length() 00215 << " bytes, more than the\n" 00216 << "currently defined maximum of " << maximum_udp_datagram 00217 << " bytes.\n"; 00218 } 00219 00220 NetDatagram copy(datagram); 00221 copy.set_connection(connection); 00222 copy.set_address(address); 00223 00224 if (_immediate) { 00225 if (_raw_mode) { 00226 return connection->send_raw_datagram(copy); 00227 } else { 00228 return connection->send_datagram(copy, _tcp_header_size); 00229 } 00230 } else { 00231 return _queue.insert(copy, block); 00232 } 00233 } 00234 00235 //////////////////////////////////////////////////////////////////// 00236 // Function: ConnectionWriter::is_valid_for_udp 00237 // Access: Public 00238 // Description: Returns true if the datagram is small enough to be 00239 // sent over a UDP packet, false otherwise. 00240 //////////////////////////////////////////////////////////////////// 00241 bool ConnectionWriter:: 00242 is_valid_for_udp(const Datagram &datagram) const { 00243 return (int)datagram.get_length() <= maximum_udp_datagram; 00244 } 00245 00246 //////////////////////////////////////////////////////////////////// 00247 // Function: ConnectionWriter::get_manager 00248 // Access: Public 00249 // Description: Returns a pointer to the ConnectionManager object 00250 // that serves this ConnectionWriter. 00251 //////////////////////////////////////////////////////////////////// 00252 ConnectionManager *ConnectionWriter:: 00253 get_manager() const { 00254 return _manager; 00255 } 00256 00257 //////////////////////////////////////////////////////////////////// 00258 // Function: ConnectionWriter::is_immediate 00259 // Access: Public 00260 // Description: Returns true if the writer is an immediate writer, 00261 // i.e. it has no threads. 00262 //////////////////////////////////////////////////////////////////// 00263 bool ConnectionWriter:: 00264 is_immediate() const { 00265 return _immediate; 00266 } 00267 00268 //////////////////////////////////////////////////////////////////// 00269 // Function: ConnectionWriter::get_num_threads 00270 // Access: Public 00271 // Description: Returns the number of threads the ConnectionWriter 00272 // has been created with. 00273 //////////////////////////////////////////////////////////////////// 00274 int ConnectionWriter:: 00275 get_num_threads() const { 00276 return _threads.size(); 00277 } 00278 00279 //////////////////////////////////////////////////////////////////// 00280 // Function: ConnectionWriter::set_raw_mode 00281 // Access: Public 00282 // Description: Sets the ConnectionWriter into raw mode (or turns off 00283 // raw mode). In raw mode, datagrams are not sent along 00284 // with their headers; the bytes in the datagram are 00285 // simply sent down the pipe. 00286 // 00287 // Setting the ConnectionWriter to raw mode must be done 00288 // with care. This can only be done when the matching 00289 // ConnectionReader is also set to raw mode, or when the 00290 // ConnectionWriter is communicating to a process that 00291 // does not expect datagrams. 00292 //////////////////////////////////////////////////////////////////// 00293 void ConnectionWriter:: 00294 set_raw_mode(bool mode) { 00295 _raw_mode = mode; 00296 } 00297 00298 //////////////////////////////////////////////////////////////////// 00299 // Function: ConnectionWriter::get_raw_mode 00300 // Access: Public 00301 // Description: Returns the current setting of the raw mode flag. 00302 // See set_raw_mode(). 00303 //////////////////////////////////////////////////////////////////// 00304 bool ConnectionWriter:: 00305 get_raw_mode() const { 00306 return _raw_mode; 00307 } 00308 00309 //////////////////////////////////////////////////////////////////// 00310 // Function: ConnectionWriter::set_tcp_header_size 00311 // Access: Public 00312 // Description: Sets the header size of TCP packets. At the present, 00313 // legal values for this are 0, 2, or 4; this specifies 00314 // the number of bytes to use encode the datagram length 00315 // at the start of each TCP datagram. Sender and 00316 // receiver must independently agree on this. 00317 //////////////////////////////////////////////////////////////////// 00318 void ConnectionWriter:: 00319 set_tcp_header_size(int tcp_header_size) { 00320 _tcp_header_size = tcp_header_size; 00321 } 00322 00323 //////////////////////////////////////////////////////////////////// 00324 // Function: ConnectionWriter::get_tcp_header_size 00325 // Access: Public 00326 // Description: Returns the current setting of TCP header size. 00327 // See set_tcp_header_size(). 00328 //////////////////////////////////////////////////////////////////// 00329 int ConnectionWriter:: 00330 get_tcp_header_size() const { 00331 return _tcp_header_size; 00332 } 00333 00334 //////////////////////////////////////////////////////////////////// 00335 // Function: ConnectionWriter::shutdown 00336 // Access: Published 00337 // Description: Stops all the threads and cleans them up. This is 00338 // called automatically by the destructor, but it may be 00339 // called explicitly before destruction. 00340 //////////////////////////////////////////////////////////////////// 00341 void ConnectionWriter:: 00342 shutdown() { 00343 if (_shutdown) { 00344 return; 00345 } 00346 _shutdown = true; 00347 00348 // First, shutdown the queue. This will tell our threads they're 00349 // done. 00350 _queue.shutdown(); 00351 00352 // Now wait for all threads to terminate. 00353 Threads::iterator ti; 00354 for (ti = _threads.begin(); ti != _threads.end(); ++ti) { 00355 (*ti)->join(); 00356 } 00357 _threads.clear(); 00358 } 00359 00360 //////////////////////////////////////////////////////////////////// 00361 // Function: ConnectionWriter::clear_manager 00362 // Access: Protected 00363 // Description: This should normally only be called when the 00364 // associated ConnectionManager destructs. It resets 00365 // the ConnectionManager pointer to NULL so we don't 00366 // have a floating pointer. This makes the 00367 // ConnectionWriter invalid; presumably it also will be 00368 // destructed momentarily. 00369 //////////////////////////////////////////////////////////////////// 00370 void ConnectionWriter:: 00371 clear_manager() { 00372 _manager = (ConnectionManager *)NULL; 00373 shutdown(); 00374 } 00375 00376 //////////////////////////////////////////////////////////////////// 00377 // Function: ConnectionWriter::thread_run 00378 // Access: Private 00379 // Description: This is the actual executing function for each 00380 // thread. 00381 //////////////////////////////////////////////////////////////////// 00382 void ConnectionWriter:: 00383 thread_run(int thread_index) { 00384 nassertv(!_immediate); 00385 00386 NetDatagram datagram; 00387 while (_queue.extract(datagram)) { 00388 if (_raw_mode) { 00389 datagram.get_connection()->send_raw_datagram(datagram); 00390 } else { 00391 datagram.get_connection()->send_datagram(datagram, _tcp_header_size); 00392 } 00393 Thread::consider_yield(); 00394 } 00395 }