Panda3D

connectionReader.h

00001 // Filename: connectionReader.h
00002 // Created by:  drose (08Feb00)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #ifndef CONNECTIONREADER_H
00016 #define CONNECTIONREADER_H
00017 
00018 #include "pandabase.h"
00019 
00020 #include "connection.h"
00021 
00022 #include "pointerTo.h"
00023 #include "pmutex.h"
00024 #include "lightMutex.h"
00025 #include "pvector.h"
00026 #include "pset.h"
00027 #include "socket_fdset.h"
00028 #include "atomicAdjust.h"
00029 
00030 class NetDatagram;
00031 class ConnectionManager;
00032 class Socket_Address;
00033 class Socket_IP;
00034 
00035 ////////////////////////////////////////////////////////////////////
00036 //       Class : ConnectionReader
00037 // Description : This is an abstract base class for a family of
00038 //               classes that listen for activity on a socket and
00039 //               respond to it, for instance by reading a datagram and
00040 //               serving it (or queueing it up for later service).
00041 //
00042 //               A ConnectionReader may define an arbitrary number of
00043 //               threads (at least one) to process datagrams coming in
00044 //               from an arbitrary number of sockets that it is
00045 //               monitoring.  The number of threads is specified at
00046 //               construction time and cannot be changed, but the set
00047 //               of sockets that is to be monitored may be constantly
00048 //               modified at will.
00049 //
00050 //               This is an abstract class because it doesn't define
00051 //               how to process each received datagram.  See
00052 //               QueuedConnectionReader.  Also note that
00053 //               ConnectionListener derives from this class, extending
00054 //               it to accept connections on a rendezvous socket
00055 //               rather than read datagrams.
00056 ////////////////////////////////////////////////////////////////////
00057 class EXPCL_PANDA_NET ConnectionReader {
00058 PUBLISHED:
00059   // The implementation here used to involve NSPR's multi-wait
00060   // interface, but that got too complicated to manage.  It turns out
00061   // to be difficult to protect against memory leaks caused by race
00062   // conditions in that interface, as designed.
00063 
00064   // Instead, we do our own multi-wait type stuff.  Only one thread at
00065   // a time can extract the next-available socket with activity on it.
00066   // That thread will either (a) simply extract the next socket from
00067   // the arrays returned by a previous call to PR_Poll(), or (b)
00068   // execute (and possibly block on) a new call to PR_Poll().
00069 
00070   ConnectionReader(ConnectionManager *manager, int num_threads,
00071                    const string &thread_name = string());
00072   virtual ~ConnectionReader();
00073 
00074   bool add_connection(Connection *connection);
00075   bool remove_connection(Connection *connection);
00076   bool is_connection_ok(Connection *connection);
00077 
00078   void poll();
00079 
00080   ConnectionManager *get_manager() const;
00081   INLINE bool is_polling() const;
00082   int get_num_threads() const;
00083 
00084   void set_raw_mode(bool mode);
00085   bool get_raw_mode() const;
00086 
00087   void set_tcp_header_size(int tcp_header_size);
00088   int get_tcp_header_size() const;
00089 
00090   void shutdown();
00091 
00092 protected:
00093   virtual void flush_read_connection(Connection *connection);
00094   virtual void receive_datagram(const NetDatagram &datagram)=0;
00095 
00096   class SocketInfo {
00097   public:
00098     SocketInfo(const PT(Connection) &connection);
00099     bool is_udp() const;
00100     Socket_IP *get_socket() const;
00101 
00102     PT(Connection) _connection;
00103     bool _busy;
00104     bool _error;
00105   };
00106   typedef pvector<SocketInfo *> Sockets;
00107 
00108   void clear_manager();
00109   void finish_socket(SocketInfo *sinfo);
00110 
00111   virtual bool process_incoming_data(SocketInfo *sinfo);
00112   virtual bool process_incoming_udp_data(SocketInfo *sinfo);
00113   virtual bool process_incoming_tcp_data(SocketInfo *sinfo);
00114   virtual bool process_raw_incoming_udp_data(SocketInfo *sinfo);
00115   virtual bool process_raw_incoming_tcp_data(SocketInfo *sinfo);
00116 
00117 protected:
00118   ConnectionManager *_manager;
00119 
00120   // These structures track the total set of sockets (connections) we
00121   // know about.
00122   Sockets _sockets;
00123   // This is the list of recently-removed sockets.  We can't actually
00124   // delete them until they're no longer _busy.
00125   Sockets _removed_sockets;
00126   // Any operations on _sockets are protected by this mutex.
00127   LightMutex _sockets_mutex;
00128 
00129 private:
00130   void thread_run(int thread_index);
00131 
00132   SocketInfo *get_next_available_socket(bool allow_block, 
00133                                         int current_thread_index);
00134 
00135   void rebuild_select_list();
00136   void accumulate_fdset(Socket_fdset &fdset);
00137 
00138 private:
00139   bool _raw_mode;
00140   int _tcp_header_size;
00141   bool _shutdown;
00142 
00143   class ReaderThread : public Thread {
00144   public:
00145     ReaderThread(ConnectionReader *reader, const string &thread_name, 
00146                  int thread_index);
00147     virtual void thread_main();
00148 
00149     ConnectionReader *_reader;
00150     int _thread_index;
00151   };
00152 
00153   typedef pvector< PT(ReaderThread) > Threads;
00154   Threads _threads;
00155   bool _polling;
00156 
00157   // These structures are used to manage selecting for noise on
00158   // available sockets.
00159   Socket_fdset _fdset;
00160   Sockets _selecting_sockets;
00161   int _next_index;
00162   int _num_results;
00163   // Threads go to sleep on this mutex waiting for their chance to
00164   // read a socket.
00165   Mutex _select_mutex;
00166 
00167   // This is atomically updated with the index (in _threads) of the
00168   // thread that is currently waiting on the PR_Poll() call.  It
00169   // contains -1 if no thread is so waiting.
00170   AtomicAdjust::Integer _currently_polling_thread;
00171 
00172   friend class ConnectionManager;
00173   friend class ReaderThread;
00174 };
00175 
00176 #include "connectionReader.I"
00177 
00178 #endif
 All Classes Functions Variables Enumerations