00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifndef CONNECTIONREADER_H
00016 #define CONNECTIONREADER_H
00017
00018 #include "pandabase.h"
00019
00020 #include "connection.h"
00021
00022 #include "pointerTo.h"
00023 #include "pmutex.h"
00024 #include "lightMutex.h"
00025 #include "pvector.h"
00026 #include "pset.h"
00027 #include "socket_fdset.h"
00028 #include "atomicAdjust.h"
00029
00030 class NetDatagram;
00031 class ConnectionManager;
00032 class Socket_Address;
00033 class Socket_IP;
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 class EXPCL_PANDA_NET ConnectionReader {
00058 PUBLISHED:
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070 ConnectionReader(ConnectionManager *manager, int num_threads,
00071 const string &thread_name = string());
00072 virtual ~ConnectionReader();
00073
00074 bool add_connection(Connection *connection);
00075 bool remove_connection(Connection *connection);
00076 bool is_connection_ok(Connection *connection);
00077
00078 void poll();
00079
00080 ConnectionManager *get_manager() const;
00081 INLINE bool is_polling() const;
00082 int get_num_threads() const;
00083
00084 void set_raw_mode(bool mode);
00085 bool get_raw_mode() const;
00086
00087 void set_tcp_header_size(int tcp_header_size);
00088 int get_tcp_header_size() const;
00089
00090 void shutdown();
00091
00092 protected:
00093 virtual void flush_read_connection(Connection *connection);
00094 virtual void receive_datagram(const NetDatagram &datagram)=0;
00095
00096 class SocketInfo {
00097 public:
00098 SocketInfo(const PT(Connection) &connection);
00099 bool is_udp() const;
00100 Socket_IP *get_socket() const;
00101
00102 PT(Connection) _connection;
00103 bool _busy;
00104 bool _error;
00105 };
00106 typedef pvector<SocketInfo *> Sockets;
00107
00108 void clear_manager();
00109 void finish_socket(SocketInfo *sinfo);
00110
00111 virtual bool process_incoming_data(SocketInfo *sinfo);
00112 virtual bool process_incoming_udp_data(SocketInfo *sinfo);
00113 virtual bool process_incoming_tcp_data(SocketInfo *sinfo);
00114 virtual bool process_raw_incoming_udp_data(SocketInfo *sinfo);
00115 virtual bool process_raw_incoming_tcp_data(SocketInfo *sinfo);
00116
00117 protected:
00118 ConnectionManager *_manager;
00119
00120
00121
00122 Sockets _sockets;
00123
00124
00125 Sockets _removed_sockets;
00126
00127 LightMutex _sockets_mutex;
00128
00129 private:
00130 void thread_run(int thread_index);
00131
00132 SocketInfo *get_next_available_socket(bool allow_block,
00133 int current_thread_index);
00134
00135 void rebuild_select_list();
00136 void accumulate_fdset(Socket_fdset &fdset);
00137
00138 private:
00139 bool _raw_mode;
00140 int _tcp_header_size;
00141 bool _shutdown;
00142
00143 class ReaderThread : public Thread {
00144 public:
00145 ReaderThread(ConnectionReader *reader, const string &thread_name,
00146 int thread_index);
00147 virtual void thread_main();
00148
00149 ConnectionReader *_reader;
00150 int _thread_index;
00151 };
00152
00153 typedef pvector< PT(ReaderThread) > Threads;
00154 Threads _threads;
00155 bool _polling;
00156
00157
00158
00159 Socket_fdset _fdset;
00160 Sockets _selecting_sockets;
00161 int _next_index;
00162 int _num_results;
00163
00164
00165 Mutex _select_mutex;
00166
00167
00168
00169
00170 AtomicAdjust::Integer _currently_polling_thread;
00171
00172 friend class ConnectionManager;
00173 friend class ReaderThread;
00174 };
00175
00176 #include "connectionReader.I"
00177
00178 #endif