Panda3D
pStatClientImpl.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file pStatClientImpl.cxx
10  * @author drose
11  * @date 2004-12-23
12  */
13 
14 #include "pStatClientImpl.h"
15 
16 // This file only defines anything if DO_PSTATS is defined.
17 #ifdef DO_PSTATS
18 
19 #include "pStatClient.h"
22 #include "pStatCollector.h"
23 #include "pStatThread.h"
24 #include "config_pstatclient.h"
25 #include "pStatProperties.h"
26 #include "cmath.h"
27 
28 #include <algorithm>
29 
30 #if defined(WIN32_VC) || defined(WIN64_VC)
31 #include <Winsock2.h>
32 #include <windows.h>
33 #endif
34 
35 /**
36  *
37  */
38 PStatClientImpl::
39 PStatClientImpl(PStatClient *client) :
40  _clock(TrueClock::get_global_ptr()),
41  _delta(0.0),
42  _last_frame(0.0),
43  _client(client),
44  _reader(this, 0),
45  _writer(this, pstats_threaded_write ? 1 : 0)
46 {
47  _writer.set_max_queue_size(pstats_max_queue_size);
48  _reader.set_tcp_header_size(4);
49  _writer.set_tcp_header_size(4);
50  _is_connected = false;
51  _got_udp_port = false;
52  _collectors_reported = 0;
53  _threads_reported = 0;
54 
55  _client_name = pstats_name;
56  _max_rate = pstats_max_rate;
57 
58  _tcp_count = 1;
59  _udp_count = 1;
60 
61  if (pstats_tcp_ratio >= 1.0f) {
62  _tcp_count_factor = 0.0f;
63  _udp_count_factor = 1.0f;
64 
65  } else if (pstats_tcp_ratio <= 0.0f) {
66  _tcp_count_factor = 1.0f;
67  _udp_count_factor = 0.0f;
68 
69  } else {
70  csincos(pstats_tcp_ratio * (3.14159265f / 2.0f),
71  &_udp_count_factor,
72  &_tcp_count_factor);
73  }
74 }
75 
76 /**
77  *
78  */
79 PStatClientImpl::
80 ~PStatClientImpl() {
81  nassertv(!_is_connected);
82 }
83 
84 /**
85  * Called only by PStatClient::client_connect().
86  */
87 bool PStatClientImpl::
88 client_connect(std::string hostname, int port) {
89  nassertr(!_is_connected, true);
90 
91  if (hostname.empty()) {
92  hostname = pstats_host;
93  }
94  if (port < 0) {
95  port = pstats_port;
96  }
97 
98  if (!_server.set_host(hostname, port)) {
99  pstats_cat.error()
100  << "Unknown host: " << hostname << "\n";
101  return false;
102  }
103 
104  _tcp_connection = open_TCP_client_connection(_server, 5000);
105 
106  if (_tcp_connection.is_null()) {
107  pstats_cat.error()
108  << "Couldn't connect to PStatServer at " << hostname << ":"
109  << port << "\n";
110  return false;
111  }
112  // Make sure we're not queuing up multiple TCP sockets--we expect immediate
113  // writes of our TCP datagrams.
114  _tcp_connection->set_collect_tcp(false);
115 
116  _reader.add_connection(_tcp_connection);
117  _is_connected = true;
118 
119  _udp_connection = open_UDP_connection();
120 
121  send_hello();
122 
123 #ifdef DEBUG_THREADS
124  MutexDebug::increment_pstats();
125 #endif // DEBUG_THREADS
126 
127  return _is_connected;
128 }
129 
130 /**
131  * Called only by PStatClient::client_disconnect().
132  */
133 void PStatClientImpl::
134 client_disconnect() {
135  if (_is_connected) {
136 #ifdef DEBUG_THREADS
137  MutexDebug::decrement_pstats();
138 #endif // DEBUG_THREADS
139  _reader.remove_connection(_tcp_connection);
140  close_connection(_tcp_connection);
141  close_connection(_udp_connection);
142  }
143 
144  _tcp_connection.clear();
145  _udp_connection.clear();
146 
147  _is_connected = false;
148  _got_udp_port = false;
149 
150  _collectors_reported = 0;
151  _threads_reported = 0;
152 }
153 
154 /**
155  * Called by the PStatThread interface at the beginning of every frame, for
156  * each thread. This resets the clocks for the new frame and transmits the
157  * data for the previous frame.
158  */
159 void PStatClientImpl::
160 new_frame(int thread_index) {
161  nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
162 
163  PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
164 
165  // If we're the main thread, we should exchange control packets with the
166  // server.
167  if (thread_index == 0) {
168  transmit_control_data();
169  }
170 
171  // If we've got the UDP port by the time the frame starts, it's time to
172  // become active and start actually tracking data.
173  if (_got_udp_port) {
174  pthread->_is_active = true;
175  }
176 
177  if (!pthread->_is_active) {
178  return;
179  }
180 
181  double frame_start = get_real_time();
182  int frame_number = -1;
183  PStatFrameData frame_data;
184 
185  if (!pthread->_frame_data.is_empty()) {
186  // Collector 0 is the whole frame.
187  _client->stop(0, thread_index, frame_start);
188 
189  // Fill up the level data for all the collectors who have level data for
190  // this pthread.
191  int num_collectors = _client->_num_collectors;
192  PStatClient::CollectorPointer *collectors =
193  (PStatClient::CollectorPointer *)_client->_collectors;
194  for (int i = 0; i < num_collectors; i++) {
195  const PStatClient::PerThreadData &ptd =
196  collectors[i]->_per_thread[thread_index];
197  if (ptd._has_level) {
198  pthread->_frame_data.add_level(i, ptd._level);
199  }
200  }
201  pthread->_frame_data.swap(frame_data);
202  frame_number = pthread->_frame_number;
203  }
204 
205  pthread->_frame_data.clear();
206  pthread->_frame_number++;
207  _client->start(0, thread_index, frame_start);
208 
209  // Also record the time for the PStats operation itself.
210  int current_thread_index = Thread::get_current_thread()->get_pstats_index();
211  int pstats_index = PStatClient::_pstats_pcollector.get_index();
212  _client->start(pstats_index, current_thread_index, frame_start);
213 
214  if (frame_number != -1) {
215  transmit_frame_data(thread_index, frame_number, frame_data);
216  }
217  _client->stop(pstats_index, current_thread_index, get_real_time());
218 }
219 
220 /**
221  * Slightly lower-level interface than new_frame that takes a set of frame
222  * data.
223  */
224 void PStatClientImpl::
225 add_frame(int thread_index, const PStatFrameData &frame_data) {
226  nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
227 
228  PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
229 
230  // If we're the main thread, we should exchange control packets with the
231  // server.
232  if (thread_index == 0) {
233  transmit_control_data();
234  }
235 
236  // If we've got the UDP port by the time the frame starts, it's time to
237  // become active and start actually tracking data.
238  if (_got_udp_port) {
239  pthread->_is_active = true;
240  }
241 
242  if (!pthread->_is_active) {
243  return;
244  }
245 
246  int frame_number = pthread->_frame_number++;
247 
248  // Also record the time for the PStats operation itself.
249  int current_thread_index = Thread::get_current_thread()->get_pstats_index();
250  int pstats_index = PStatClient::_pstats_pcollector.get_index();
251  _client->start(pstats_index, current_thread_index);
252 
253  if (frame_number != -1) {
254  transmit_frame_data(thread_index, frame_number, frame_data);
255  }
256  _client->stop(pstats_index, current_thread_index);
257 }
258 
259 /**
260  * Should be called once per frame per thread to transmit the latest data to
261  * the PStatServer.
262  */
263 void PStatClientImpl::
264 transmit_frame_data(int thread_index, int frame_number,
265  const PStatFrameData &frame_data) {
266  nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
267  PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index);
268  if (_is_connected && thread->_is_active) {
269 
270  // We don't want to send too many packets in a hurry and flood the server.
271  // Check that enough time has elapsed for us to send a new packet. If
272  // not, we'll drop this packet on the floor and send a new one next time
273  // around.
274  double now = get_real_time();
275  if (now >= thread->_next_packet) {
276  // We don't want to send more than _max_rate UDP-size packets per
277  // second, per thread.
278  double packet_delay = 1.0 / _max_rate;
279 
280  // Send new data.
281  NetDatagram datagram;
282  // We always start with a zero byte, to differentiate it from a control
283  // message.
284  datagram.add_uint8(0);
285 
286  datagram.add_uint16(thread_index);
287  datagram.add_uint32(frame_number);
288 
289  bool sent;
290 
291  if (!frame_data.write_datagram(datagram, _client)) {
292  // Too many events to fit in a single datagram. Maybe it was a long
293  // frame load or something. Just drop the datagram.
294  sent = false;
295 
296  } else if (_writer.is_valid_for_udp(datagram)) {
297  if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) {
298  // Send this one as a UDP packet.
299  nassertv(_got_udp_port);
300  sent = _writer.send(datagram, _udp_connection, _server);
301  _udp_count++;
302 
303  if (_udp_count == 0) {
304  // Wraparound!
305  _udp_count = 1;
306  _tcp_count = 1;
307  }
308 
309  } else {
310  // Send this one as a TCP packet.
311  sent = _writer.send(datagram, _tcp_connection);
312  _tcp_count++;
313 
314  if (_tcp_count == 0) {
315  // Wraparound!
316  _udp_count = 1;
317  _tcp_count = 1;
318  }
319  }
320 
321  } else {
322  sent = _writer.send(datagram, _tcp_connection);
323  // If our packets are so large that we must ship them via TCP, then
324  // artificially slow down the packet rate even further.
325  int packet_ratio =
326  (datagram.get_length() + maximum_udp_datagram - 1) /
327  maximum_udp_datagram;
328  packet_delay *= (double)packet_ratio;
329  }
330 
331  thread->_next_packet = now + packet_delay;
332 
333  if (!sent) {
334  if (pstats_cat.is_debug()) {
335  pstats_cat.debug()
336  << "Couldn't send packet.\n";
337  }
338  }
339  }
340  }
341 }
342 
343 /**
344  * Should be called once a frame to exchange control information with the
345  * server.
346  */
347 void PStatClientImpl::
348 transmit_control_data() {
349  // Check for new messages from the server.
350  while (_is_connected && _reader.data_available()) {
351  NetDatagram datagram;
352 
353  if (_reader.get_data(datagram)) {
355  if (message.decode(datagram)) {
356  handle_server_control_message(message);
357 
358  } else {
359  pstats_cat.error()
360  << "Got unexpected message from server.\n";
361  }
362  }
363  }
364 
365  if (_is_connected) {
366  report_new_collectors();
367  report_new_threads();
368  }
369 }
370 
371 
372 /**
373  * Returns the current machine's hostname.
374  */
375 std::string PStatClientImpl::
376 get_hostname() {
377  if (_hostname.empty()) {
378  char temp_buff[1024];
379  if (gethostname(temp_buff, 1024) == 0) {
380  _hostname = temp_buff;
381  } else {
382  _hostname = "unknown";
383  }
384  }
385  return _hostname;
386 }
387 
388 /**
389  * Sends the initial greeting message to the server.
390  */
391 void PStatClientImpl::
392 send_hello() {
393  nassertv(_is_connected);
394 
396  message._type = PStatClientControlMessage::T_hello;
397  message._client_hostname = get_hostname();
398  message._client_progname = _client_name;
399  message._major_version = get_current_pstat_major_version();
400  message._minor_version = get_current_pstat_minor_version();
401 
402  Datagram datagram;
403  message.encode(datagram);
404  _writer.send(datagram, _tcp_connection, true);
405 }
406 
407 /**
408  * Sends over any information about new Collectors that the user code might
409  * have recently created.
410  */
411 void PStatClientImpl::
412 report_new_collectors() {
413  // Empirically, we determined that you can't send more than about 1400
414  // collectors at once without exceeding the 64K limit on a single datagram.
415  // So we limit ourselves here to sending only half that many.
416  static const int max_collectors_at_once = 700;
417 
418  while (_is_connected && _collectors_reported < _client->_num_collectors) {
420  message._type = PStatClientControlMessage::T_define_collectors;
421  int i = 0;
422  while (_collectors_reported < _client->_num_collectors &&
423  i < max_collectors_at_once) {
424  message._collectors.push_back(_client->get_collector_def(_collectors_reported));
425  _collectors_reported++;
426  i++;
427  }
428 
429  Datagram datagram;
430  message.encode(datagram);
431  _writer.send(datagram, _tcp_connection, true);
432  }
433 }
434 
435 /**
436  * Sends over any information about new Threads that the user code might have
437  * recently created.
438  */
439 void PStatClientImpl::
440 report_new_threads() {
441  while (_is_connected && _threads_reported < _client->_num_threads) {
443  message._type = PStatClientControlMessage::T_define_threads;
444  message._first_thread_index = _threads_reported;
445  PStatClient::ThreadPointer *threads =
446  (PStatClient::ThreadPointer *)_client->_threads;
447  while (_threads_reported < _client->_num_threads) {
448  message._names.push_back(threads[_threads_reported]->_name);
449  _threads_reported++;
450  }
451 
452  Datagram datagram;
453  message.encode(datagram);
454  _writer.send(datagram, _tcp_connection, true);
455  }
456 }
457 
458 /**
459  * Called when a control message has been received by the server over the TCP
460  * connection.
461  */
462 void PStatClientImpl::
463 handle_server_control_message(const PStatServerControlMessage &message) {
464  switch (message._type) {
465  case PStatServerControlMessage::T_hello:
466  pstats_cat.info()
467  << "Connected to " << message._server_progname << " on "
468  << message._server_hostname << "\n";
469 
470  _server.set_port(message._udp_port);
471  _got_udp_port = true;
472  break;
473 
474  default:
475  pstats_cat.error()
476  << "Invalid control message received from server.\n";
477  }
478 }
479 
480 /**
481  * Called by the internal net code when the connection has been lost.
482  */
483 void PStatClientImpl::
484 connection_reset(const PT(Connection) &connection, bool) {
485  if (connection == _tcp_connection) {
486  client_disconnect();
487  } else if (connection == _udp_connection) {
488  pstats_cat.warning()
489  << "Trouble sending UDP; switching to TCP only.\n";
490  _tcp_count_factor = 0.0f;
491  _udp_count_factor = 1.0f;
492  } else {
493  pstats_cat.warning()
494  << "Ignoring spurious connection_reset() message\n";
495  }
496 }
497 
498 #endif // DO_PSTATS
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
bool decode(const Datagram &datagram)
Extracts the message from the indicated datagram.
This kind of message is sent from the server to the client on the TCP socket to establish critical co...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
This kind of message is sent from the client to the server on the TCP socket to establish critical co...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int get_current_pstat_minor_version()
Returns the current minor version number of the PStats protocol.
void add_uint32(uint32_t value)
Adds an unsigned 32-bit integer to the datagram.
Definition: datagram.I:94
void add_uint16(uint16_t value)
Adds an unsigned 16-bit integer to the datagram.
Definition: datagram.I:85
get_current_thread
Returns a pointer to the currently-executing Thread object.
Definition: thread.h:109
void encode(Datagram &datagram) const
Writes the message into the indicated datagram.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Contains the raw timing and level data for a single frame.
An interface to whatever real-time clock we might have available in the current environment.
Definition: trueClock.h:33
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool write_datagram(Datagram &destination, PStatClient *client) const
Writes the definition of the FrameData to the datagram.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int get_current_pstat_major_version()
Returns the current major version number of the PStats protocol.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void add_uint8(uint8_t value)
Adds an unsigned 8-bit integer to the datagram.
Definition: datagram.I:50
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
Manages the communications to report statistics via a network connection to a remote PStatServer.
Definition: pStatClient.h:263
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:29
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...
Definition: datagram.h:38
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335