Panda3D

connectionWriter.cxx

00001 // Filename: connectionWriter.cxx
00002 // Created by:  drose (08Feb00)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "connectionWriter.h"
00016 #include "connectionManager.h"
00017 #include "datagramTCPHeader.h"
00018 #include "config_net.h"
00019 #include "socket_tcp.h"
00020 #include "socket_udp.h"
00021 #include "pnotify.h"
00022 #include "config_downloader.h"
00023 
00024 ////////////////////////////////////////////////////////////////////
00025 //     Function: ConnectionWriter::WriterThread::Constructor
00026 //       Access: Public
00027 //  Description:
00028 ////////////////////////////////////////////////////////////////////
00029 ConnectionWriter::WriterThread::
00030 WriterThread(ConnectionWriter *writer, const string &thread_name,
00031              int thread_index) :
00032   Thread(make_thread_name(thread_name, thread_index), 
00033          make_thread_name(thread_name, thread_index)),
00034   _writer(writer),
00035   _thread_index(thread_index)
00036 {
00037 }
00038 
00039 ////////////////////////////////////////////////////////////////////
00040 //     Function: ConnectionWriter::WriterThread::thread_main
00041 //       Access: Public, Virtual
00042 //  Description:
00043 ////////////////////////////////////////////////////////////////////
00044 void ConnectionWriter::WriterThread::
00045 thread_main() {
00046   _writer->thread_run(_thread_index);
00047 }
00048 
00049 ////////////////////////////////////////////////////////////////////
00050 //     Function: ConnectionWriter::Constructor
00051 //       Access: Public
00052 //  Description: Creates a new ConnectionWriter with the indicated
00053 //               number of threads to handle output.
00054 //
00055 //               If num_threads is 0, all datagrams will be sent
00056 //               immediately instead of queueing for later
00057 //               transmission by a thread.
00058 ////////////////////////////////////////////////////////////////////
00059 ConnectionWriter::
00060 ConnectionWriter(ConnectionManager *manager, int num_threads,
00061                  const string &thread_name) :
00062   _manager(manager)
00063 {
00064   if (!Thread::is_threading_supported()) {
00065 #ifndef NDEBUG
00066     if (num_threads != 0) {
00067       if (net_cat.is_debug()) {
00068         net_cat.debug()
00069           << "Threading support is not available.\n";
00070       }
00071     }
00072 #endif  // NDEBUG
00073     num_threads = 0;
00074   }
00075 
00076   _raw_mode = false;
00077   _tcp_header_size = tcp_header_size;
00078   _immediate = (num_threads <= 0);
00079   _shutdown = false;
00080 
00081   string writer_thread_name = thread_name;
00082   if (thread_name.empty()) {
00083     writer_thread_name = "WriterThread";
00084   }
00085   int i;
00086   for (i = 0; i < num_threads; i++) {
00087     PT(WriterThread) thread = new WriterThread(this, writer_thread_name, i);
00088     _threads.push_back(thread);
00089   }
00090   for (i = 0; i < num_threads; i++) {
00091     _threads[i]->start(net_thread_priority, true);
00092   }
00093 
00094   _manager->add_writer(this);
00095 }
00096 
00097 ////////////////////////////////////////////////////////////////////
00098 //     Function: ConnectionWriter::Destructor
00099 //       Access: Public
00100 //  Description:
00101 ////////////////////////////////////////////////////////////////////
00102 ConnectionWriter::
00103 ~ConnectionWriter() {
00104   if (_manager != (ConnectionManager *)NULL) {
00105     _manager->remove_writer(this);
00106   }
00107 
00108   shutdown();
00109 }
00110 
00111 ////////////////////////////////////////////////////////////////////
00112 //     Function: ConnectionWriter::set_max_queue_size
00113 //       Access: Public
00114 //  Description: Limits the number of packets that may be pending on
00115 //               the outbound queue.  This only has an effect when
00116 //               using threads; if num_threads is 0, then all packets
00117 //               are sent immediately.
00118 ////////////////////////////////////////////////////////////////////
00119 void ConnectionWriter::
00120 set_max_queue_size(int max_size) {
00121   _queue.set_max_queue_size(max_size);
00122 }
00123 
00124 ////////////////////////////////////////////////////////////////////
00125 //     Function: ConnectionWriter::get_max_queue_size
00126 //       Access: Public
00127 //  Description: Returns the maximum size the queue is allowed to grow
00128 //               to.  See set_max_queue_size().
00129 ////////////////////////////////////////////////////////////////////
00130 int ConnectionWriter::
00131 get_max_queue_size() const {
00132   return _queue.get_max_queue_size();
00133 }
00134 
00135 ////////////////////////////////////////////////////////////////////
00136 //     Function: ConnectionWriter::get_current_queue_size
00137 //       Access: Public
00138 //  Description: Returns the current number of things in the queue.
00139 ////////////////////////////////////////////////////////////////////
00140 int ConnectionWriter::
00141 get_current_queue_size() const {
00142   return _queue.get_current_queue_size();
00143 }
00144 
00145 
00146 ////////////////////////////////////////////////////////////////////
00147 //     Function: ConnectionWriter::send
00148 //       Access: Public
00149 //  Description: Enqueues a datagram for transmittal on the indicated
00150 //               socket.  Since the host address is not specified with
00151 //               this form, this function should only be used for
00152 //               sending TCP packets.  Use the other send() method for
00153 //               sending UDP packets.
00154 //
00155 //               Returns true if successful, false if there was an
00156 //               error.  In the normal, threaded case, this function
00157 //               only returns false if the send queue is filled; it's
00158 //               impossible to detect a transmission error at this
00159 //               point.
00160 //
00161 //               If block is true, this will not return false if the
00162 //               send queue is filled; instead, it will wait until
00163 //               there is space available.
00164 ////////////////////////////////////////////////////////////////////
00165 bool ConnectionWriter::
00166 send(const Datagram &datagram, const PT(Connection) &connection, bool block) {
00167   nassertr(!_shutdown, false);
00168   nassertr(connection != (Connection *)NULL, false);
00169   nassertr(connection->get_socket()->is_exact_type(Socket_TCP::get_class_type()), false);
00170 
00171   NetDatagram copy(datagram);
00172   copy.set_connection(connection);
00173 
00174   if (_immediate) {
00175     if (_raw_mode) {
00176       return connection->send_raw_datagram(copy);
00177     } else {
00178       return connection->send_datagram(copy, _tcp_header_size);
00179     }
00180   } else {
00181     return _queue.insert(copy, block);
00182   }
00183 }
00184 
00185 
00186 ////////////////////////////////////////////////////////////////////
00187 //     Function: ConnectionWriter::send
00188 //       Access: Public
00189 //  Description: Enqueues a datagram for transmittal on the indicated
00190 //               socket.  This form of the function allows the
00191 //               specification of a destination host address, and so
00192 //               is appropriate for UDP packets.  Use the other send()
00193 //               method for sending TCP packets.
00194 //
00195 //               Returns true if successful, false if there was an
00196 //               error.  In the normal, threaded case, this function
00197 //               only returns false if the send queue is filled; it's
00198 //               impossible to detect a transmission error at this
00199 //               point.
00200 //
00201 //               If block is true, this will not return false if the
00202 //               send queue is filled; instead, it will wait until
00203 //               there is space available.
00204 ////////////////////////////////////////////////////////////////////
00205 bool ConnectionWriter::
00206 send(const Datagram &datagram, const PT(Connection) &connection,
00207      const NetAddress &address, bool block) {
00208   nassertr(!_shutdown, false);
00209   nassertr(connection != (Connection *)NULL, false);
00210   nassertr(connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()), false);
00211 
00212   if ((int)datagram.get_length() > maximum_udp_datagram) {
00213     net_cat.warning()
00214       << "Attempt to send UDP datagram of " << datagram.get_length()
00215       << " bytes, more than the\n"
00216       << "currently defined maximum of " << maximum_udp_datagram
00217       << " bytes.\n";
00218   }
00219 
00220   NetDatagram copy(datagram);
00221   copy.set_connection(connection);
00222   copy.set_address(address);
00223 
00224   if (_immediate) {
00225     if (_raw_mode) {
00226       return connection->send_raw_datagram(copy);
00227     } else {
00228       return connection->send_datagram(copy, _tcp_header_size);
00229     }
00230   } else {
00231     return _queue.insert(copy, block);
00232   }
00233 }
00234 
00235 ////////////////////////////////////////////////////////////////////
00236 //     Function: ConnectionWriter::is_valid_for_udp
00237 //       Access: Public
00238 //  Description: Returns true if the datagram is small enough to be
00239 //               sent over a UDP packet, false otherwise.
00240 ////////////////////////////////////////////////////////////////////
00241 bool ConnectionWriter::
00242 is_valid_for_udp(const Datagram &datagram) const {
00243   return (int)datagram.get_length() <= maximum_udp_datagram;
00244 }
00245 
00246 ////////////////////////////////////////////////////////////////////
00247 //     Function: ConnectionWriter::get_manager
00248 //       Access: Public
00249 //  Description: Returns a pointer to the ConnectionManager object
00250 //               that serves this ConnectionWriter.
00251 ////////////////////////////////////////////////////////////////////
00252 ConnectionManager *ConnectionWriter::
00253 get_manager() const {
00254   return _manager;
00255 }
00256 
00257 ////////////////////////////////////////////////////////////////////
00258 //     Function: ConnectionWriter::is_immediate
00259 //       Access: Public
00260 //  Description: Returns true if the writer is an immediate writer,
00261 //               i.e. it has no threads.
00262 ////////////////////////////////////////////////////////////////////
00263 bool ConnectionWriter::
00264 is_immediate() const {
00265   return _immediate;
00266 }
00267 
00268 ////////////////////////////////////////////////////////////////////
00269 //     Function: ConnectionWriter::get_num_threads
00270 //       Access: Public
00271 //  Description: Returns the number of threads the ConnectionWriter
00272 //               has been created with.
00273 ////////////////////////////////////////////////////////////////////
00274 int ConnectionWriter::
00275 get_num_threads() const {
00276   return _threads.size();
00277 }
00278 
00279 ////////////////////////////////////////////////////////////////////
00280 //     Function: ConnectionWriter::set_raw_mode
00281 //       Access: Public
00282 //  Description: Sets the ConnectionWriter into raw mode (or turns off
00283 //               raw mode).  In raw mode, datagrams are not sent along
00284 //               with their headers; the bytes in the datagram are
00285 //               simply sent down the pipe.
00286 //
00287 //               Setting the ConnectionWriter to raw mode must be done
00288 //               with care.  This can only be done when the matching
00289 //               ConnectionReader is also set to raw mode, or when the
00290 //               ConnectionWriter is communicating to a process that
00291 //               does not expect datagrams.
00292 ////////////////////////////////////////////////////////////////////
00293 void ConnectionWriter::
00294 set_raw_mode(bool mode) {
00295   _raw_mode = mode;
00296 }
00297 
00298 ////////////////////////////////////////////////////////////////////
00299 //     Function: ConnectionWriter::get_raw_mode
00300 //       Access: Public
00301 //  Description: Returns the current setting of the raw mode flag.
00302 //               See set_raw_mode().
00303 ////////////////////////////////////////////////////////////////////
00304 bool ConnectionWriter::
00305 get_raw_mode() const {
00306   return _raw_mode;
00307 }
00308 
00309 ////////////////////////////////////////////////////////////////////
00310 //     Function: ConnectionWriter::set_tcp_header_size
00311 //       Access: Public
00312 //  Description: Sets the header size of TCP packets.  At the present,
00313 //               legal values for this are 0, 2, or 4; this specifies
00314 //               the number of bytes to use encode the datagram length
00315 //               at the start of each TCP datagram.  Sender and
00316 //               receiver must independently agree on this.
00317 ////////////////////////////////////////////////////////////////////
00318 void ConnectionWriter::
00319 set_tcp_header_size(int tcp_header_size) {
00320   _tcp_header_size = tcp_header_size;
00321 }
00322 
00323 ////////////////////////////////////////////////////////////////////
00324 //     Function: ConnectionWriter::get_tcp_header_size
00325 //       Access: Public
00326 //  Description: Returns the current setting of TCP header size.
00327 //               See set_tcp_header_size().
00328 ////////////////////////////////////////////////////////////////////
00329 int ConnectionWriter::
00330 get_tcp_header_size() const {
00331   return _tcp_header_size;
00332 }
00333 
00334 ////////////////////////////////////////////////////////////////////
00335 //     Function: ConnectionWriter::shutdown
00336 //       Access: Published
00337 //  Description: Stops all the threads and cleans them up.  This is
00338 //               called automatically by the destructor, but it may be
00339 //               called explicitly before destruction.
00340 ////////////////////////////////////////////////////////////////////
00341 void ConnectionWriter::
00342 shutdown() {
00343   if (_shutdown) {
00344     return;
00345   }
00346   _shutdown = true;
00347 
00348   // First, shutdown the queue.  This will tell our threads they're
00349   // done.
00350   _queue.shutdown();
00351 
00352   // Now wait for all threads to terminate.
00353   Threads::iterator ti;
00354   for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00355     (*ti)->join();
00356   }
00357   _threads.clear();
00358 }
00359 
00360 ////////////////////////////////////////////////////////////////////
00361 //     Function: ConnectionWriter::clear_manager
00362 //       Access: Protected
00363 //  Description: This should normally only be called when the
00364 //               associated ConnectionManager destructs.  It resets
00365 //               the ConnectionManager pointer to NULL so we don't
00366 //               have a floating pointer.  This makes the
00367 //               ConnectionWriter invalid; presumably it also will be
00368 //               destructed momentarily.
00369 ////////////////////////////////////////////////////////////////////
00370 void ConnectionWriter::
00371 clear_manager() {
00372   _manager = (ConnectionManager *)NULL;
00373   shutdown();
00374 }
00375 
00376 ////////////////////////////////////////////////////////////////////
00377 //     Function: ConnectionWriter::thread_run
00378 //       Access: Private
00379 //  Description: This is the actual executing function for each
00380 //               thread.
00381 ////////////////////////////////////////////////////////////////////
00382 void ConnectionWriter::
00383 thread_run(int thread_index) {
00384   nassertv(!_immediate);
00385 
00386   NetDatagram datagram;
00387   while (_queue.extract(datagram)) {
00388     if (_raw_mode) {
00389       datagram.get_connection()->send_raw_datagram(datagram);
00390     } else {
00391       datagram.get_connection()->send_datagram(datagram, _tcp_header_size);
00392     }
00393     Thread::consider_yield();
00394   }
00395 }
 All Classes Functions Variables Enumerations