15 #include "pStatClientImpl.h"
20 #include "pStatClient.h"
21 #include "pStatClientControlMessage.h"
22 #include "pStatServerControlMessage.h"
23 #include "pStatCollector.h"
24 #include "pStatThread.h"
25 #include "config_pstats.h"
26 #include "pStatProperties.h"
31 #if defined(WIN32_VC) || defined(WIN64_VC)
48 _writer(this, pstats_threaded_write ? 1 : 0)
50 _writer.set_max_queue_size(pstats_max_queue_size);
51 _reader.set_tcp_header_size(4);
52 _writer.set_tcp_header_size(4);
53 _is_connected =
false;
54 _got_udp_port =
false;
55 _collectors_reported = 0;
56 _threads_reported = 0;
58 _client_name = pstats_name;
59 _max_rate = pstats_max_rate;
64 if (pstats_tcp_ratio >= 1.0f) {
65 _tcp_count_factor = 0.0f;
66 _udp_count_factor = 1.0f;
68 }
else if (pstats_tcp_ratio <= 0.0f) {
69 _tcp_count_factor = 1.0f;
70 _udp_count_factor = 0.0f;
73 csincos(pstats_tcp_ratio * (3.14159265f / 2.0f),
86 nassertv(!_is_connected);
94 bool PStatClientImpl::
95 client_connect(
string hostname,
int port) {
96 nassertr(!_is_connected,
true);
98 if (hostname.empty()) {
99 hostname = pstats_host;
105 if (!_server.set_host(hostname, port)) {
107 <<
"Unknown host: " << hostname <<
"\n";
111 _tcp_connection = open_TCP_client_connection(_server, 5000);
113 if (_tcp_connection.is_null()) {
115 <<
"Couldn't connect to PStatServer at " << hostname <<
":"
121 _tcp_connection->set_collect_tcp(
false);
123 _reader.add_connection(_tcp_connection);
124 _is_connected =
true;
126 _udp_connection = open_UDP_connection();
131 MutexDebug::increment_pstats();
132 #endif // DEBUG_THREADS
134 return _is_connected;
142 void PStatClientImpl::
143 client_disconnect() {
146 MutexDebug::decrement_pstats();
147 #endif // DEBUG_THREADS
148 _reader.remove_connection(_tcp_connection);
149 close_connection(_tcp_connection);
150 close_connection(_udp_connection);
153 _tcp_connection.clear();
154 _udp_connection.clear();
156 _is_connected =
false;
157 _got_udp_port =
false;
159 _collectors_reported = 0;
160 _threads_reported = 0;
171 void PStatClientImpl::
172 new_frame(
int thread_index) {
173 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
175 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
179 if (thread_index == 0) {
180 transmit_control_data();
186 pthread->_is_active =
true;
189 if (!pthread->_is_active) {
193 double frame_start = get_real_time();
194 int frame_number = -1;
197 if (!pthread->_frame_data.is_empty()) {
199 _client->stop(0, thread_index, frame_start);
203 int num_collectors = _client->_num_collectors;
204 PStatClient::CollectorPointer *collectors =
205 (PStatClient::CollectorPointer *)_client->_collectors;
206 for (
int i = 0; i < num_collectors; i++) {
207 const PStatClient::PerThreadData &ptd =
208 collectors[i]->_per_thread[thread_index];
209 if (ptd._has_level) {
210 pthread->_frame_data.add_level(i, ptd._level);
213 pthread->_frame_data.swap(frame_data);
214 frame_number = pthread->_frame_number;
217 pthread->_frame_data.clear();
218 pthread->_frame_number++;
219 _client->start(0, thread_index, frame_start);
223 int pstats_index = PStatClient::_pstats_pcollector.get_index();
224 _client->start(pstats_index, current_thread_index, frame_start);
226 if (frame_number != -1) {
227 transmit_frame_data(thread_index, frame_number, frame_data);
229 _client->stop(pstats_index, current_thread_index, get_real_time());
238 void PStatClientImpl::
240 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
242 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
246 if (thread_index == 0) {
247 transmit_control_data();
253 pthread->_is_active =
true;
256 if (!pthread->_is_active) {
260 int frame_number = pthread->_frame_number++;
264 int pstats_index = PStatClient::_pstats_pcollector.get_index();
265 _client->start(pstats_index, current_thread_index);
267 if (frame_number != -1) {
268 transmit_frame_data(thread_index, frame_number, frame_data);
270 _client->stop(pstats_index, current_thread_index);
279 void PStatClientImpl::
280 transmit_frame_data(
int thread_index,
int frame_number,
282 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
283 PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index);
284 if (_is_connected && thread->_is_active) {
290 double now = get_real_time();
291 if (now >= thread->_next_packet) {
294 double packet_delay = 1.0 / _max_rate;
312 }
else if (_writer.is_valid_for_udp(datagram)) {
313 if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) {
315 nassertv(_got_udp_port);
316 sent = _writer.send(datagram, _udp_connection, _server);
319 if (_udp_count == 0) {
327 sent = _writer.send(datagram, _tcp_connection);
330 if (_tcp_count == 0) {
338 sent = _writer.send(datagram, _tcp_connection);
342 (datagram.
get_length() + maximum_udp_datagram - 1) /
343 maximum_udp_datagram;
344 packet_delay *= (double)packet_ratio;
347 thread->_next_packet = now + packet_delay;
350 if (pstats_cat.is_debug()) {
352 <<
"Couldn't send packet.\n";
365 void PStatClientImpl::
366 transmit_control_data() {
368 while (_is_connected && _reader.data_available()) {
371 if (_reader.get_data(datagram)) {
373 if (message.
decode(datagram)) {
374 handle_server_control_message(message);
378 <<
"Got unexpected message from server.\n";
384 report_new_collectors();
385 report_new_threads();
395 string PStatClientImpl::
397 if (_hostname.empty()) {
398 char temp_buff[1024];
399 if (gethostname(temp_buff, 1024) == 0) {
400 _hostname = temp_buff;
402 _hostname =
"unknown";
413 void PStatClientImpl::
415 nassertv(_is_connected);
418 message._type = PStatClientControlMessage::T_hello;
419 message._client_hostname = get_hostname();
420 message._client_progname = _client_name;
421 message._major_version = get_current_pstat_major_version();
422 message._minor_version = get_current_pstat_minor_version();
426 _writer.send(datagram, _tcp_connection,
true);
435 void PStatClientImpl::
436 report_new_collectors() {
441 static const int max_collectors_at_once = 700;
443 while (_is_connected && _collectors_reported < _client->_num_collectors) {
445 message._type = PStatClientControlMessage::T_define_collectors;
447 while (_collectors_reported < _client->_num_collectors &&
448 i < max_collectors_at_once) {
449 message._collectors.push_back(_client->get_collector_def(_collectors_reported));
450 _collectors_reported++;
456 _writer.send(datagram, _tcp_connection,
true);
466 void PStatClientImpl::
467 report_new_threads() {
468 while (_is_connected && _threads_reported < _client->_num_threads) {
470 message._type = PStatClientControlMessage::T_define_threads;
471 message._first_thread_index = _threads_reported;
472 PStatClient::ThreadPointer *threads =
473 (PStatClient::ThreadPointer *)_client->_threads;
474 while (_threads_reported < _client->_num_threads) {
475 message._names.push_back(threads[_threads_reported]->_name);
481 _writer.send(datagram, _tcp_connection,
true);
491 void PStatClientImpl::
493 switch (message._type) {
494 case PStatServerControlMessage::T_hello:
496 <<
"Connected to " << message._server_progname <<
" on "
497 << message._server_hostname <<
"\n";
499 _server.set_port(message._udp_port);
500 _got_udp_port =
true;
505 <<
"Invalid control message received from server.\n";
515 void PStatClientImpl::
516 connection_reset(
const PT(
Connection) &connection,
bool) {
517 if (connection == _tcp_connection) {
519 }
else if (connection == _udp_connection) {
521 <<
"Trouble sending UDP; switching to TCP only.\n";
522 _tcp_count_factor = 0.0f;
523 _udp_count_factor = 1.0f;
526 <<
"Ignoring spurious connection_reset() message\n";
void add_uint8(PN_uint8 value)
Adds an unsigned 8-bit integer to the datagram.
A specific kind of Datagram, especially for sending across or receiving from a network.
bool decode(const Datagram &datagram)
Extracts the message from the indicated datagram.
bool write_datagram(Datagram &destination, PStatClient *client) const
Writes the definition of the FrameData to the datagram.
This kind of message is sent from the server to the client on the TCP socket to establish critical co...
void encode(Datagram &datagram) const
Writes the message into the indicated datagram.
This kind of message is sent from the client to the server on the TCP socket to establish critical co...
size_t get_length() const
Returns the number of bytes in the datagram.
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
Contains the raw timing and level data for a single frame.
An interface to whatever real-time clock we might have available in the current environment.
int get_pstats_index() const
Returns the PStats index associated with this thread, or -1 if no index has yet been associated with ...
void add_uint16(PN_uint16 value)
Adds an unsigned 16-bit integer to the datagram.
void add_uint32(PN_uint32 value)
Adds an unsigned 32-bit integer to the datagram.
Manages the communications to report statistics via a network connection to a remote PStatServer...
Represents a single TCP or UDP socket for input or output.
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...