Panda3D
|
00001 // Filename: connectionReader.h 00002 // Created by: drose (08Feb00) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #ifndef CONNECTIONREADER_H 00016 #define CONNECTIONREADER_H 00017 00018 #include "pandabase.h" 00019 00020 #include "connection.h" 00021 00022 #include "pointerTo.h" 00023 #include "pmutex.h" 00024 #include "lightMutex.h" 00025 #include "pvector.h" 00026 #include "pset.h" 00027 #include "socket_fdset.h" 00028 #include "atomicAdjust.h" 00029 00030 class NetDatagram; 00031 class ConnectionManager; 00032 class Socket_Address; 00033 class Socket_IP; 00034 00035 //////////////////////////////////////////////////////////////////// 00036 // Class : ConnectionReader 00037 // Description : This is an abstract base class for a family of 00038 // classes that listen for activity on a socket and 00039 // respond to it, for instance by reading a datagram and 00040 // serving it (or queueing it up for later service). 00041 // 00042 // A ConnectionReader may define an arbitrary number of 00043 // threads (at least one) to process datagrams coming in 00044 // from an arbitrary number of sockets that it is 00045 // monitoring. The number of threads is specified at 00046 // construction time and cannot be changed, but the set 00047 // of sockets that is to be monitored may be constantly 00048 // modified at will. 00049 // 00050 // This is an abstract class because it doesn't define 00051 // how to process each received datagram. See 00052 // QueuedConnectionReader. Also note that 00053 // ConnectionListener derives from this class, extending 00054 // it to accept connections on a rendezvous socket 00055 // rather than read datagrams. 00056 //////////////////////////////////////////////////////////////////// 00057 class EXPCL_PANDA_NET ConnectionReader { 00058 PUBLISHED: 00059 // The implementation here used to involve NSPR's multi-wait 00060 // interface, but that got too complicated to manage. It turns out 00061 // to be difficult to protect against memory leaks caused by race 00062 // conditions in that interface, as designed. 00063 00064 // Instead, we do our own multi-wait type stuff. Only one thread at 00065 // a time can extract the next-available socket with activity on it. 00066 // That thread will either (a) simply extract the next socket from 00067 // the arrays returned by a previous call to PR_Poll(), or (b) 00068 // execute (and possibly block on) a new call to PR_Poll(). 00069 00070 ConnectionReader(ConnectionManager *manager, int num_threads, 00071 const string &thread_name = string()); 00072 virtual ~ConnectionReader(); 00073 00074 bool add_connection(Connection *connection); 00075 bool remove_connection(Connection *connection); 00076 bool is_connection_ok(Connection *connection); 00077 00078 void poll(); 00079 00080 ConnectionManager *get_manager() const; 00081 INLINE bool is_polling() const; 00082 int get_num_threads() const; 00083 00084 void set_raw_mode(bool mode); 00085 bool get_raw_mode() const; 00086 00087 void set_tcp_header_size(int tcp_header_size); 00088 int get_tcp_header_size() const; 00089 00090 void shutdown(); 00091 00092 protected: 00093 virtual void flush_read_connection(Connection *connection); 00094 virtual void receive_datagram(const NetDatagram &datagram)=0; 00095 00096 class SocketInfo { 00097 public: 00098 SocketInfo(const PT(Connection) &connection); 00099 bool is_udp() const; 00100 Socket_IP *get_socket() const; 00101 00102 PT(Connection) _connection; 00103 bool _busy; 00104 bool _error; 00105 }; 00106 typedef pvector<SocketInfo *> Sockets; 00107 00108 void clear_manager(); 00109 void finish_socket(SocketInfo *sinfo); 00110 00111 virtual bool process_incoming_data(SocketInfo *sinfo); 00112 virtual bool process_incoming_udp_data(SocketInfo *sinfo); 00113 virtual bool process_incoming_tcp_data(SocketInfo *sinfo); 00114 virtual bool process_raw_incoming_udp_data(SocketInfo *sinfo); 00115 virtual bool process_raw_incoming_tcp_data(SocketInfo *sinfo); 00116 00117 protected: 00118 ConnectionManager *_manager; 00119 00120 // These structures track the total set of sockets (connections) we 00121 // know about. 00122 Sockets _sockets; 00123 // This is the list of recently-removed sockets. We can't actually 00124 // delete them until they're no longer _busy. 00125 Sockets _removed_sockets; 00126 // Any operations on _sockets are protected by this mutex. 00127 LightMutex _sockets_mutex; 00128 00129 private: 00130 void thread_run(int thread_index); 00131 00132 SocketInfo *get_next_available_socket(bool allow_block, 00133 int current_thread_index); 00134 00135 void rebuild_select_list(); 00136 void accumulate_fdset(Socket_fdset &fdset); 00137 00138 private: 00139 bool _raw_mode; 00140 int _tcp_header_size; 00141 bool _shutdown; 00142 00143 class ReaderThread : public Thread { 00144 public: 00145 ReaderThread(ConnectionReader *reader, const string &thread_name, 00146 int thread_index); 00147 virtual void thread_main(); 00148 00149 ConnectionReader *_reader; 00150 int _thread_index; 00151 }; 00152 00153 typedef pvector< PT(ReaderThread) > Threads; 00154 Threads _threads; 00155 bool _polling; 00156 00157 // These structures are used to manage selecting for noise on 00158 // available sockets. 00159 Socket_fdset _fdset; 00160 Sockets _selecting_sockets; 00161 int _next_index; 00162 int _num_results; 00163 // Threads go to sleep on this mutex waiting for their chance to 00164 // read a socket. 00165 Mutex _select_mutex; 00166 00167 // This is atomically updated with the index (in _threads) of the 00168 // thread that is currently waiting on the PR_Poll() call. It 00169 // contains -1 if no thread is so waiting. 00170 AtomicAdjust::Integer _currently_polling_thread; 00171 00172 friend class ConnectionManager; 00173 friend class ReaderThread; 00174 }; 00175 00176 #include "connectionReader.I" 00177 00178 #endif