00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "connectionWriter.h"
00016 #include "connectionManager.h"
00017 #include "datagramTCPHeader.h"
00018 #include "config_net.h"
00019 #include "socket_tcp.h"
00020 #include "socket_udp.h"
00021 #include "pnotify.h"
00022 #include "config_downloader.h"
00023
00024
00025
00026
00027
00028
00029 ConnectionWriter::WriterThread::
00030 WriterThread(ConnectionWriter *writer, const string &thread_name,
00031 int thread_index) :
00032 Thread(make_thread_name(thread_name, thread_index),
00033 make_thread_name(thread_name, thread_index)),
00034 _writer(writer),
00035 _thread_index(thread_index)
00036 {
00037 }
00038
00039
00040
00041
00042
00043
00044 void ConnectionWriter::WriterThread::
00045 thread_main() {
00046 _writer->thread_run(_thread_index);
00047 }
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059 ConnectionWriter::
00060 ConnectionWriter(ConnectionManager *manager, int num_threads,
00061 const string &thread_name) :
00062 _manager(manager)
00063 {
00064 if (!Thread::is_threading_supported()) {
00065 #ifndef NDEBUG
00066 if (num_threads != 0) {
00067 if (net_cat.is_debug()) {
00068 net_cat.debug()
00069 << "Threading support is not available.\n";
00070 }
00071 }
00072 #endif // NDEBUG
00073 num_threads = 0;
00074 }
00075
00076 _raw_mode = false;
00077 _tcp_header_size = tcp_header_size;
00078 _immediate = (num_threads <= 0);
00079 _shutdown = false;
00080
00081 string writer_thread_name = thread_name;
00082 if (thread_name.empty()) {
00083 writer_thread_name = "WriterThread";
00084 }
00085 int i;
00086 for (i = 0; i < num_threads; i++) {
00087 PT(WriterThread) thread = new WriterThread(this, writer_thread_name, i);
00088 _threads.push_back(thread);
00089 }
00090 for (i = 0; i < num_threads; i++) {
00091 _threads[i]->start(net_thread_priority, true);
00092 }
00093
00094 _manager->add_writer(this);
00095 }
00096
00097
00098
00099
00100
00101
00102 ConnectionWriter::
00103 ~ConnectionWriter() {
00104 if (_manager != (ConnectionManager *)NULL) {
00105 _manager->remove_writer(this);
00106 }
00107
00108 shutdown();
00109 }
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119 void ConnectionWriter::
00120 set_max_queue_size(int max_size) {
00121 _queue.set_max_queue_size(max_size);
00122 }
00123
00124
00125
00126
00127
00128
00129
00130 int ConnectionWriter::
00131 get_max_queue_size() const {
00132 return _queue.get_max_queue_size();
00133 }
00134
00135
00136
00137
00138
00139
00140 int ConnectionWriter::
00141 get_current_queue_size() const {
00142 return _queue.get_current_queue_size();
00143 }
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165 bool ConnectionWriter::
00166 send(const Datagram &datagram, const PT(Connection) &connection, bool block) {
00167 nassertr(!_shutdown, false);
00168 nassertr(connection != (Connection *)NULL, false);
00169 nassertr(connection->get_socket()->is_exact_type(Socket_TCP::get_class_type()), false);
00170
00171 NetDatagram copy(datagram);
00172 copy.set_connection(connection);
00173
00174 if (_immediate) {
00175 if (_raw_mode) {
00176 return connection->send_raw_datagram(copy);
00177 } else {
00178 return connection->send_datagram(copy, _tcp_header_size);
00179 }
00180 } else {
00181 return _queue.insert(copy, block);
00182 }
00183 }
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205 bool ConnectionWriter::
00206 send(const Datagram &datagram, const PT(Connection) &connection,
00207 const NetAddress &address, bool block) {
00208 nassertr(!_shutdown, false);
00209 nassertr(connection != (Connection *)NULL, false);
00210 nassertr(connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()), false);
00211
00212 if ((int)datagram.get_length() > maximum_udp_datagram) {
00213 net_cat.warning()
00214 << "Attempt to send UDP datagram of " << datagram.get_length()
00215 << " bytes, more than the\n"
00216 << "currently defined maximum of " << maximum_udp_datagram
00217 << " bytes.\n";
00218 }
00219
00220 NetDatagram copy(datagram);
00221 copy.set_connection(connection);
00222 copy.set_address(address);
00223
00224 if (_immediate) {
00225 if (_raw_mode) {
00226 return connection->send_raw_datagram(copy);
00227 } else {
00228 return connection->send_datagram(copy, _tcp_header_size);
00229 }
00230 } else {
00231 return _queue.insert(copy, block);
00232 }
00233 }
00234
00235
00236
00237
00238
00239
00240
00241 bool ConnectionWriter::
00242 is_valid_for_udp(const Datagram &datagram) const {
00243 return (int)datagram.get_length() <= maximum_udp_datagram;
00244 }
00245
00246
00247
00248
00249
00250
00251
00252 ConnectionManager *ConnectionWriter::
00253 get_manager() const {
00254 return _manager;
00255 }
00256
00257
00258
00259
00260
00261
00262
00263 bool ConnectionWriter::
00264 is_immediate() const {
00265 return _immediate;
00266 }
00267
00268
00269
00270
00271
00272
00273
00274 int ConnectionWriter::
00275 get_num_threads() const {
00276 return _threads.size();
00277 }
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293 void ConnectionWriter::
00294 set_raw_mode(bool mode) {
00295 _raw_mode = mode;
00296 }
00297
00298
00299
00300
00301
00302
00303
00304 bool ConnectionWriter::
00305 get_raw_mode() const {
00306 return _raw_mode;
00307 }
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318 void ConnectionWriter::
00319 set_tcp_header_size(int tcp_header_size) {
00320 _tcp_header_size = tcp_header_size;
00321 }
00322
00323
00324
00325
00326
00327
00328
00329 int ConnectionWriter::
00330 get_tcp_header_size() const {
00331 return _tcp_header_size;
00332 }
00333
00334
00335
00336
00337
00338
00339
00340
00341 void ConnectionWriter::
00342 shutdown() {
00343 if (_shutdown) {
00344 return;
00345 }
00346 _shutdown = true;
00347
00348
00349
00350 _queue.shutdown();
00351
00352
00353 Threads::iterator ti;
00354 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00355 (*ti)->join();
00356 }
00357 _threads.clear();
00358 }
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370 void ConnectionWriter::
00371 clear_manager() {
00372 _manager = (ConnectionManager *)NULL;
00373 shutdown();
00374 }
00375
00376
00377
00378
00379
00380
00381
00382 void ConnectionWriter::
00383 thread_run(int thread_index) {
00384 nassertv(!_immediate);
00385
00386 NetDatagram datagram;
00387 while (_queue.extract(datagram)) {
00388 if (_raw_mode) {
00389 datagram.get_connection()->send_raw_datagram(datagram);
00390 } else {
00391 datagram.get_connection()->send_datagram(datagram, _tcp_header_size);
00392 }
00393 Thread::consider_yield();
00394 }
00395 }