15 #include "cConnectionRepository.h" 16 #include "dcmsgtypes.h" 20 #include "config_distributed.h" 21 #include "config_downloader.h" 22 #include "httpChannel.h" 24 #include "datagramIterator.h" 25 #include "throw_event.h" 26 #include "pStatTimer.h" 32 const string CConnectionRepository::_overflow_event_name =
"CRDatagramOverflow";
35 PStatCollector CConnectionRepository::_update_pcollector(
"App:Show code:readerPollTask:Update");
43 CConnectionRepository::
44 CConnectionRepository(
bool has_owner_view,
bool threaded_net) :
45 _lock(
"CConnectionRepository::_lock"),
47 _python_repository(NULL),
53 _cw(&_qcm, threaded_net ? 1 : 0),
54 _qcr(&_qcm, threaded_net ? 1 : 0),
56 #ifdef WANT_NATIVE_NET
57 _bdc(4096000,4096000,1400),
60 _client_datagram(true),
61 _handle_datagrams_internally(handle_datagrams_internally),
62 _simulated_disconnect(false),
63 _verbose(distributed_cat.is_spam()),
68 _has_owner_view(has_owner_view),
69 _handle_c_updates(true),
70 _want_message_bundling(true),
74 #if defined(HAVE_NET) && defined(SIMULATE_NETWORK_DELAY) 75 if (min_lag != 0.0 || max_lag != 0.0) {
76 _qcr.start_delay(min_lag, max_lag);
79 _tcp_header_size = tcp_header_size;
87 CConnectionRepository::
88 ~CConnectionRepository() {
103 _tcp_header_size = tcp_header_size;
106 if (_http_conn != (SocketStream *)NULL) {
107 _http_conn->set_tcp_header_size(tcp_header_size);
112 _cw.set_tcp_header_size(tcp_header_size);
113 _qcr.set_tcp_header_size(tcp_header_size);
126 void CConnectionRepository::
127 set_connection_http(HTTPChannel *channel) {
131 nassertv(channel->is_connection_ready());
132 _http_conn = channel->get_connection();
133 _http_conn->set_tcp_header_size(_tcp_header_size);
134 #ifdef SIMULATE_NETWORK_DELAY 135 if (min_lag != 0.0 || max_lag != 0.0) {
136 _http_conn->start_delay(min_lag, max_lag);
140 #endif // HAVE_OPENSSL 150 SocketStream *CConnectionRepository::
156 #endif // HAVE_OPENSSL 167 bool CConnectionRepository::
168 try_connect_net(
const URLSpec &url) {
175 game_server_timeout_ms);
178 _net_conn->set_no_delay(
true);
179 _qcr.add_connection(_net_conn);
187 #ifdef WANT_NATIVE_NET 194 bool CConnectionRepository::
195 connect_native(
const URLSpec &url) {
201 _bdc.ClearAddresses();
202 _bdc.AddAddress(addr);
203 return _bdc.DoConnect();
206 #endif //WANT NATIVE NET 208 #ifdef SIMULATE_NETWORK_DELAY 226 void CConnectionRepository::
227 start_delay(
double min_delay,
double max_delay) {
230 if (min_delay != 0.0 || max_delay != 0.0) {
232 _qcr.start_delay(min_delay, max_delay);
235 if (_http_conn != (SocketStream *)NULL) {
236 _http_conn->start_delay(min_delay, max_delay);
238 #endif // HAVE_OPENSSL 243 #endif // SIMULATE_NETWORK_DELAY 245 #ifdef SIMULATE_NETWORK_DELAY 253 void CConnectionRepository::
261 if (_http_conn != (SocketStream *)NULL) {
262 _http_conn->stop_delay();
264 #endif // HAVE_OPENSSL 266 #endif // SIMULATE_NETWORK_DELAY 281 if (_simulated_disconnect) {
284 #ifdef WANT_NATIVE_NET 287 #endif //WANT_NATIVE_NET 289 while (do_check_datagram()) {
291 describe_message(nout,
"RECV", _dg);
297 if (!_client_datagram) {
298 unsigned char wc_cnt;
300 _msg_channels.clear();
301 for (
unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) {
303 _msg_channels.push_back(schan);
311 if (_python_repository != (PyObject *)NULL) {
312 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 313 PyGILState_STATE gstate;
314 gstate = PyGILState_Ensure();
316 PyObject *value = PyLong_FromUnsignedLongLong(_msg_sender);
317 PyObject_SetAttrString(_python_repository,
"msgSender", value);
319 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 320 PyGILState_Release(gstate);
323 #endif // HAVE_PYTHON 328 if (!_handle_datagrams_internally) {
334 case CLIENT_OBJECT_UPDATE_FIELD:
335 case STATESERVER_OBJECT_UPDATE_FIELD:
336 if (_handle_c_updates) {
337 if (_has_owner_view) {
338 if (!handle_update_field_owner()) {
342 if (!handle_update_field()) {
351 #endif // HAVE_PYTHON 377 #ifdef WANT_NATIVE_NET 379 return (_bdc.IsConnected());
384 if (_qcm.reset_connection_available()) {
386 if (_qcm.get_reset_connection(reset_connection)) {
387 _qcm.close_connection(reset_connection);
388 if (reset_connection == _net_conn) {
401 if (!_http_conn->is_closed()) {
409 #endif // HAVE_OPENSSL 426 if (_simulated_disconnect) {
427 distributed_cat.warning()
428 <<
"Unable to send datagram during simulated disconnect.\n";
433 describe_message(nout,
"SEND", dg);
441 #ifdef WANT_NATIVE_NET 443 return _bdc.SendMessage(dg);
448 _cw.send(dg, _net_conn);
455 if (!_http_conn->send_datagram(dg)) {
456 distributed_cat.warning()
457 <<
"Could not send datagram.\n";
463 #endif // HAVE_OPENSSL 465 distributed_cat.warning()
466 <<
"Unable to send datagram after connection is closed.\n";
489 nout <<
"CR::SEND:BUNDLE_START(" << _bundling_msgs <<
")" << endl;
491 if (_bundling_msgs == 0) {
492 _bundle_msgs.clear();
506 nassertv(_bundling_msgs);
511 nout <<
"CR::SEND:BUNDLE_FINISH(" << _bundling_msgs <<
")" << endl;
523 BundledMsgVector::const_iterator bmi;
524 for (bmi = _bundle_msgs.begin(); bmi != _bundle_msgs.end(); bmi++) {
544 _bundle_msgs.clear();
552 void CConnectionRepository::
571 if (_simulated_disconnect) {
575 #ifdef WANT_NATIVE_NET 582 return _net_conn->consider_flush();
588 return _http_conn->consider_flush();
590 #endif // HAVE_OPENSSL 606 if (_simulated_disconnect) {
609 #ifdef WANT_NATIVE_NET 616 return _net_conn->flush();
622 return _http_conn->flush();
624 #endif // HAVE_OPENSSL 638 #ifdef WANT_NATIVE_NET 641 _bdc.ClearAddresses();
646 _qcm.close_connection(_net_conn);
657 #endif // HAVE_OPENSSL 659 _simulated_disconnect =
false;
684 bool CConnectionRepository::
685 do_check_datagram() {
686 #ifdef WANT_NATIVE_NET 688 return _bdc.GetMessage(_dg);
693 _net_conn->consider_flush();
694 if (_qcr.get_overflow_flag()) {
696 _qcr.reset_overflow_flag();
698 return (_qcr.data_available() && _qcr.get_data(_dg));
704 _http_conn->consider_flush();
705 return _http_conn->receive_datagram(_dg);
707 #endif // HAVE_OPENSSL 723 bool CConnectionRepository::
724 handle_update_field() {
726 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 727 PyGILState_STATE gstate;
728 gstate = PyGILState_Ensure();
733 if (_python_repository != (PyObject *)NULL)
736 PyObject_GetAttrString(_python_repository,
"doId2do");
737 nassertr(doId2do != NULL,
false);
739 #ifdef USE_PYTHON_2_2_OR_EARLIER 740 PyObject *doId = PyInt_FromLong(do_id);
742 PyObject *doId = PyLong_FromUnsignedLong(do_id);
744 PyObject *distobj = PyDict_GetItem(doId2do, doId);
748 if (distobj != NULL) {
749 PyObject *dclass_obj = PyObject_GetAttrString(distobj,
"dclass");
750 nassertr(dclass_obj != NULL,
false);
753 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
754 Py_DECREF(dclass_obj);
755 nassertr(dclass_this != NULL,
false);
758 Py_DECREF(dclass_this);
762 if (_in_quiet_zone) {
763 PyObject *neverDisable = PyObject_GetAttrString(distobj,
"neverDisable");
764 nassertr(neverDisable != NULL,
false);
766 unsigned int cNeverDisable = PyLong_AsLong(neverDisable);
767 if (!cNeverDisable) {
770 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 771 PyGILState_Release(gstate);
782 dclass->receive_update(distobj, _di);
785 if (PyErr_Occurred()) {
786 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 787 PyGILState_Release(gstate);
795 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 796 PyGILState_Release(gstate);
798 #endif // HAVE_PYTHON 814 bool CConnectionRepository::
815 handle_update_field_owner() {
817 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 818 PyGILState_STATE gstate;
819 gstate = PyGILState_Ensure();
824 if (_python_repository != (PyObject *)NULL) {
826 PyObject_GetAttrString(_python_repository,
"doId2do");
827 nassertr(doId2do != NULL,
false);
829 PyObject *doId2ownerView =
830 PyObject_GetAttrString(_python_repository,
"doId2ownerView");
831 nassertr(doId2ownerView != NULL,
false);
833 #ifdef USE_PYTHON_2_2_OR_EARLIER 834 PyObject *doId = PyInt_FromLong(do_id);
836 PyObject *doId = PyLong_FromUnsignedLong(do_id);
840 PyObject *distobjOV = PyDict_GetItem(doId2ownerView, doId);
841 Py_DECREF(doId2ownerView);
843 if (distobjOV != NULL) {
844 PyObject *dclass_obj = PyObject_GetAttrString(distobjOV,
"dclass");
845 nassertr(dclass_obj != NULL,
false);
847 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
848 Py_DECREF(dclass_obj);
849 nassertr(dclass_this != NULL,
false);
852 Py_DECREF(dclass_this);
864 Py_INCREF(distobjOV);
868 dclass->receive_update(distobjOV, _odi);
869 Py_DECREF(distobjOV);
871 if (PyErr_Occurred()) {
872 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 873 PyGILState_Release(gstate);
881 PyObject *distobj = PyDict_GetItem(doId2do, doId);
885 if (distobj != NULL) {
886 PyObject *dclass_obj = PyObject_GetAttrString(distobj,
"dclass");
887 nassertr(dclass_obj != NULL,
false);
889 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
890 Py_DECREF(dclass_obj);
891 nassertr(dclass_this != NULL,
false);
894 Py_DECREF(dclass_this);
907 dclass->receive_update(distobj, _di);
910 if (PyErr_Occurred()) {
911 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 912 PyGILState_Release(gstate);
920 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) 921 PyGILState_Release(gstate);
923 #endif // HAVE_PYTHON 935 void CConnectionRepository::
936 describe_message(ostream &out,
const string &prefix,
943 bool is_update =
false;
944 string full_prefix =
"CR::" + prefix;
946 if (!_client_datagram)
949 for( ;mcnt > 0; mcnt--)
950 packer.RAW_UNPACK_CHANNEL();
952 packer.RAW_UNPACK_CHANNEL();
954 is_update = (msg_type == STATESERVER_OBJECT_UPDATE_FIELD);
958 is_update = (msg_type == CLIENT_OBJECT_UPDATE_FIELD);
967 if (_python_repository != (PyObject *)NULL) {
968 PyObject *msgId = PyLong_FromLong(msg_type);
969 nassertv(msgId != NULL);
970 #if PY_MAJOR_VERSION >= 3 971 PyObject *methodName = PyUnicode_FromString(
"_getMsgName");
973 PyObject *methodName = PyString_FromString(
"_getMsgName");
975 nassertv(methodName != NULL);
977 PyObject *result = PyObject_CallMethodObjArgs(_python_repository, methodName,
979 nassertv(result != NULL);
981 #if PY_MAJOR_VERSION >= 3 982 msgName += string(PyUnicode_AsUTF8(result));
984 msgName += string(PyString_AsString(result));
987 Py_DECREF(methodName);
992 if (msgName.length() == 0) {
993 msgName +=
"unknown message ";
997 out << full_prefix <<
":" << msgName <<
"\n";
1007 if (_python_repository != (PyObject *)NULL) {
1009 PyObject_GetAttrString(_python_repository,
"doId2do");
1010 nassertv(doId2do != NULL);
1012 #ifdef USE_PYTHON_2_2_OR_EARLIER 1013 PyObject *doId = PyInt_FromLong(do_id);
1015 PyObject *doId = PyLong_FromUnsignedLong(do_id);
1017 PyObject *distobj = PyDict_GetItem(doId2do, doId);
1021 if (distobj != NULL) {
1022 PyObject *dclass_obj = PyObject_GetAttrString(distobj,
"dclass");
1023 nassertv(dclass_obj != NULL);
1025 PyObject *dclass_this = PyObject_GetAttrString(dclass_obj,
"this");
1026 Py_DECREF(dclass_obj);
1027 nassertv(dclass_this != NULL);
1029 dclass = (
DCClass *)PyLong_AsLong(dclass_this);
1030 Py_DECREF(dclass_this);
1033 #endif // HAVE_PYTHON 1037 if (dclass == (
DCClass *)NULL) {
1038 out << full_prefix <<
"update for unknown object " << do_id
1039 <<
", field " << field_id <<
"\n";
1042 out << full_prefix <<
1043 ":" << dclass->
get_name() <<
"(" << do_id <<
").";
1045 if (field == (
DCField *)NULL) {
1046 out <<
"unknown field " << field_id <<
"\n";
void add_string(const string &str)
Adds a variable-length string to the datagram.
A container for a URL, e.g.
void abandon_message_bundles()
throw out any msgs that have been queued up for message bundles
bool get_want_message_bundling() const
Returns true if message bundling enabled.
void add_int8(PN_int8 value)
Adds a signed 8-bit integer to the datagram.
A single field of a Distributed Class, either atomic or molecular.
void dump_hex(ostream &out, unsigned int indent=0) const
Writes a representation of the entire datagram contents, as a sequence of hex (and ASCII) values...
unsigned int raw_unpack_uint16()
Unpacks the data from the buffer between unpacking sessions.
string get_server() const
Returns the server name specified by the URL, if any.
const string & get_name() const
Returns the name of this field, or empty string if the field is unnamed.
void disconnect()
Closes the connection to the server.
bool send_datagram(const Datagram &dg)
Queues the indicated datagram for sending to the server.
bool get_verbose() const
Returns the current setting of the verbose flag.
Defines a particular DistributedClass as read from an input .dc file.
bool consider_flush()
Sends the most recently queued data if enough time has elapsed.
void start_message_bundle()
Send a set of messages to the state server that will be processed atomically.
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
PN_uint8 get_uint8()
Extracts an unsigned 8-bit integer.
PN_uint32 get_uint32()
Extracts an unsigned 32-bit integer.
int get_port() const
Returns the port number specified by the URL, or the default port if not specified.
PN_uint16 get_uint16()
Extracts an unsigned 16-bit integer.
DCField * get_field_by_index(int index_number) const
Returns a pointer to the DCField that has the indicated index number.
bool is_connected()
Returns true if the connection to the gameserver is established and still good, false if we are not c...
bool is_bundling_messages() const
Returns true if repository is queueing outgoing messages into a message bundle.
A lightweight class that represents a single element that may be timed and/or counted via stats...
unsigned int raw_unpack_uint8()
Unpacks the data from the buffer between unpacking sessions.
unsigned int raw_unpack_uint32()
Unpacks the data from the buffer between unpacking sessions.
void add_uint64(PN_uint64 value)
Adds an unsigned 64-bit integer to the datagram.
bool is_ownrecv() const
Returns true if the "ownrecv" flag is set for this field, false otherwise.
Similar to MutexHolder, but for a reentrant mutex.
string unpack_and_format(bool show_field_names=true)
Unpacks an object and formats its value into a syntax suitable for parsing in the dc file (e...
A simple place to store and munipulate tcp and port address for communication layer.
void add_uint16(PN_uint16 value)
Adds an unsigned 16-bit integer to the datagram.
bool set_host(const std::string &hostname, int port)
This function will take a port and string-based TCP address and initialize the address with this info...
void shutdown()
May be called at application shutdown to ensure all threads are cleaned up.
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
This class can be used for packing a series of numeric and string data into a binary stream...
void send_message_bundle(unsigned int channel, unsigned int sender_channel)
Send network messages queued up since startMessageBundle was called.
A class to retrieve the individual data elements previously stored in a Datagram. ...
PN_uint64 get_uint64()
Extracts an unsigned 64-bit integer.
string get_remaining_bytes() const
Returns the remaining bytes in the datagram as a string, but does not extract them from the iterator...
bool flush()
Sends the most recently queued data now.
void set_unpack_data(const string &data)
Sets up the unpack_data pointer.
string get_message() const
Returns the datagram's data as a string.
Represents a single TCP or UDP socket for input or output.
bool check_datagram()
Returns true if a new datagram is available, false otherwise.
const string & get_name() const
Returns the name of this class.
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...
void begin_unpack(const DCPackerInterface *root)
Begins an unpacking session.
static const string & get_overflow_event_name()
Returns event string that will be thrown if the datagram reader queue overflows.
bool end_unpack()
Finishes the unpacking session.