Panda3D
 All Classes Functions Variables Enumerations
pStatClientImpl.cxx
1 // Filename: pStatClientImpl.cxx
2 // Created by: drose (23Dec04)
3 //
4 ////////////////////////////////////////////////////////////////////
5 //
6 // PANDA 3D SOFTWARE
7 // Copyright (c) Carnegie Mellon University. All rights reserved.
8 //
9 // All use of this software is subject to the terms of the revised BSD
10 // license. You should have received a copy of this license along
11 // with this source code in a file named "LICENSE."
12 //
13 ////////////////////////////////////////////////////////////////////
14 
15 #include "pStatClientImpl.h"
16 
17 // This file only defines anything if DO_PSTATS is defined.
18 #ifdef DO_PSTATS
19 
20 #include "pStatClient.h"
21 #include "pStatClientControlMessage.h"
22 #include "pStatServerControlMessage.h"
23 #include "pStatCollector.h"
24 #include "pStatThread.h"
25 #include "config_pstats.h"
26 #include "pStatProperties.h"
27 #include "cmath.h"
28 
29 #include <algorithm>
30 
31 #if defined(WIN32_VC) || defined(WIN64_VC)
32 #include <Winsock2.h>
33 #include <windows.h>
34 #endif
35 
36 ////////////////////////////////////////////////////////////////////
37 // Function: PStatClientImpl::Constructor
38 // Access: Public
39 // Description:
40 ////////////////////////////////////////////////////////////////////
41 PStatClientImpl::
42 PStatClientImpl(PStatClient *client) :
43  _clock(TrueClock::get_global_ptr()),
44  _delta(0.0),
45  _last_frame(0.0),
46  _client(client),
47  _reader(this, 0),
48  _writer(this, pstats_threaded_write ? 1 : 0)
49 {
50  _writer.set_max_queue_size(pstats_max_queue_size);
51  _reader.set_tcp_header_size(4);
52  _writer.set_tcp_header_size(4);
53  _is_connected = false;
54  _got_udp_port = false;
55  _collectors_reported = 0;
56  _threads_reported = 0;
57 
58  _client_name = pstats_name;
59  _max_rate = pstats_max_rate;
60 
61  _tcp_count = 1;
62  _udp_count = 1;
63 
64  if (pstats_tcp_ratio >= 1.0f) {
65  _tcp_count_factor = 0.0f;
66  _udp_count_factor = 1.0f;
67 
68  } else if (pstats_tcp_ratio <= 0.0f) {
69  _tcp_count_factor = 1.0f;
70  _udp_count_factor = 0.0f;
71 
72  } else {
73  csincos(pstats_tcp_ratio * (3.14159265f / 2.0f),
74  &_udp_count_factor,
75  &_tcp_count_factor);
76  }
77 }
78 
79 ////////////////////////////////////////////////////////////////////
80 // Function: PStatClientImpl::Destructor
81 // Access: Public
82 // Description:
83 ////////////////////////////////////////////////////////////////////
84 PStatClientImpl::
85 ~PStatClientImpl() {
86  nassertv(!_is_connected);
87 }
88 
89 ////////////////////////////////////////////////////////////////////
90 // Function: PStatClientImpl::client_connect
91 // Access: Public
92 // Description: Called only by PStatClient::client_connect().
93 ////////////////////////////////////////////////////////////////////
94 bool PStatClientImpl::
95 client_connect(string hostname, int port) {
96  nassertr(!_is_connected, true);
97 
98  if (hostname.empty()) {
99  hostname = pstats_host;
100  }
101  if (port < 0) {
102  port = pstats_port;
103  }
104 
105  if (!_server.set_host(hostname, port)) {
106  pstats_cat.error()
107  << "Unknown host: " << hostname << "\n";
108  return false;
109  }
110 
111  _tcp_connection = open_TCP_client_connection(_server, 5000);
112 
113  if (_tcp_connection.is_null()) {
114  pstats_cat.error()
115  << "Couldn't connect to PStatServer at " << hostname << ":"
116  << port << "\n";
117  return false;
118  }
119  // Make sure we're not queuing up multiple TCP sockets--we expect
120  // immediate writes of our TCP datagrams.
121  _tcp_connection->set_collect_tcp(false);
122 
123  _reader.add_connection(_tcp_connection);
124  _is_connected = true;
125 
126  _udp_connection = open_UDP_connection();
127 
128  send_hello();
129 
130 #ifdef DEBUG_THREADS
131  MutexDebug::increment_pstats();
132 #endif // DEBUG_THREADS
133 
134  return _is_connected;
135 }
136 
137 ////////////////////////////////////////////////////////////////////
138 // Function: PStatClientImpl::client_disconnect
139 // Access: Public
140 // Description: Called only by PStatClient::client_disconnect().
141 ////////////////////////////////////////////////////////////////////
142 void PStatClientImpl::
143 client_disconnect() {
144  if (_is_connected) {
145 #ifdef DEBUG_THREADS
146  MutexDebug::decrement_pstats();
147 #endif // DEBUG_THREADS
148  _reader.remove_connection(_tcp_connection);
149  close_connection(_tcp_connection);
150  close_connection(_udp_connection);
151  }
152 
153  _tcp_connection.clear();
154  _udp_connection.clear();
155 
156  _is_connected = false;
157  _got_udp_port = false;
158 
159  _collectors_reported = 0;
160  _threads_reported = 0;
161 }
162 
163 ////////////////////////////////////////////////////////////////////
164 // Function: PStatClientImpl::new_frame
165 // Access: Public
166 // Description: Called by the PStatThread interface at the beginning
167 // of every frame, for each thread. This resets the
168 // clocks for the new frame and transmits the data for
169 // the previous frame.
170 ////////////////////////////////////////////////////////////////////
171 void PStatClientImpl::
172 new_frame(int thread_index) {
173  nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
174 
175  PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
176 
177  // If we're the main thread, we should exchange control packets with
178  // the server.
179  if (thread_index == 0) {
180  transmit_control_data();
181  }
182 
183  // If we've got the UDP port by the time the frame starts, it's
184  // time to become active and start actually tracking data.
185  if (_got_udp_port) {
186  pthread->_is_active = true;
187  }
188 
189  if (!pthread->_is_active) {
190  return;
191  }
192 
193  double frame_start = get_real_time();
194  int frame_number = -1;
195  PStatFrameData frame_data;
196 
197  if (!pthread->_frame_data.is_empty()) {
198  // Collector 0 is the whole frame.
199  _client->stop(0, thread_index, frame_start);
200 
201  // Fill up the level data for all the collectors who have level
202  // data for this pthread.
203  int num_collectors = _client->_num_collectors;
204  PStatClient::CollectorPointer *collectors =
205  (PStatClient::CollectorPointer *)_client->_collectors;
206  for (int i = 0; i < num_collectors; i++) {
207  const PStatClient::PerThreadData &ptd =
208  collectors[i]->_per_thread[thread_index];
209  if (ptd._has_level) {
210  pthread->_frame_data.add_level(i, ptd._level);
211  }
212  }
213  pthread->_frame_data.swap(frame_data);
214  frame_number = pthread->_frame_number;
215  }
216 
217  pthread->_frame_data.clear();
218  pthread->_frame_number++;
219  _client->start(0, thread_index, frame_start);
220 
221  // Also record the time for the PStats operation itself.
222  int current_thread_index = Thread::get_current_thread()->get_pstats_index();
223  int pstats_index = PStatClient::_pstats_pcollector.get_index();
224  _client->start(pstats_index, current_thread_index, frame_start);
225 
226  if (frame_number != -1) {
227  transmit_frame_data(thread_index, frame_number, frame_data);
228  }
229  _client->stop(pstats_index, current_thread_index, get_real_time());
230 }
231 
232 ////////////////////////////////////////////////////////////////////
233 // Function: PStatClientImpl::add_frame
234 // Access: Public
235 // Description: Slightly lower-level interface than new_frame that
236 // takes a set of frame data.
237 ////////////////////////////////////////////////////////////////////
238 void PStatClientImpl::
239 add_frame(int thread_index, const PStatFrameData &frame_data) {
240  nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
241 
242  PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
243 
244  // If we're the main thread, we should exchange control packets with
245  // the server.
246  if (thread_index == 0) {
247  transmit_control_data();
248  }
249 
250  // If we've got the UDP port by the time the frame starts, it's
251  // time to become active and start actually tracking data.
252  if (_got_udp_port) {
253  pthread->_is_active = true;
254  }
255 
256  if (!pthread->_is_active) {
257  return;
258  }
259 
260  int frame_number = pthread->_frame_number++;
261 
262  // Also record the time for the PStats operation itself.
263  int current_thread_index = Thread::get_current_thread()->get_pstats_index();
264  int pstats_index = PStatClient::_pstats_pcollector.get_index();
265  _client->start(pstats_index, current_thread_index);
266 
267  if (frame_number != -1) {
268  transmit_frame_data(thread_index, frame_number, frame_data);
269  }
270  _client->stop(pstats_index, current_thread_index);
271 }
272 
273 ////////////////////////////////////////////////////////////////////
274 // Function: PStatClientImpl::transmit_frame_data
275 // Access: Private
276 // Description: Should be called once per frame per thread to
277 // transmit the latest data to the PStatServer.
278 ////////////////////////////////////////////////////////////////////
279 void PStatClientImpl::
280 transmit_frame_data(int thread_index, int frame_number,
281  const PStatFrameData &frame_data) {
282  nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
283  PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index);
284  if (_is_connected && thread->_is_active) {
285 
286  // We don't want to send too many packets in a hurry and flood the
287  // server. Check that enough time has elapsed for us to send a
288  // new packet. If not, we'll drop this packet on the floor and
289  // send a new one next time around.
290  double now = get_real_time();
291  if (now >= thread->_next_packet) {
292  // We don't want to send more than _max_rate UDP-size packets
293  // per second, per thread.
294  double packet_delay = 1.0 / _max_rate;
295 
296  // Send new data.
297  NetDatagram datagram;
298  // We always start with a zero byte, to differentiate it from a
299  // control message.
300  datagram.add_uint8(0);
301 
302  datagram.add_uint16(thread_index);
303  datagram.add_uint32(frame_number);
304 
305  bool sent;
306 
307  if (!frame_data.write_datagram(datagram, _client)) {
308  // Too many events to fit in a single datagram. Maybe it was
309  // a long frame load or something. Just drop the datagram.
310  sent = false;
311 
312  } else if (_writer.is_valid_for_udp(datagram)) {
313  if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) {
314  // Send this one as a UDP packet.
315  nassertv(_got_udp_port);
316  sent = _writer.send(datagram, _udp_connection, _server);
317  _udp_count++;
318 
319  if (_udp_count == 0) {
320  // Wraparound!
321  _udp_count = 1;
322  _tcp_count = 1;
323  }
324 
325  } else {
326  // Send this one as a TCP packet.
327  sent = _writer.send(datagram, _tcp_connection);
328  _tcp_count++;
329 
330  if (_tcp_count == 0) {
331  // Wraparound!
332  _udp_count = 1;
333  _tcp_count = 1;
334  }
335  }
336 
337  } else {
338  sent = _writer.send(datagram, _tcp_connection);
339  // If our packets are so large that we must ship them via TCP,
340  // then artificially slow down the packet rate even further.
341  int packet_ratio =
342  (datagram.get_length() + maximum_udp_datagram - 1) /
343  maximum_udp_datagram;
344  packet_delay *= (double)packet_ratio;
345  }
346 
347  thread->_next_packet = now + packet_delay;
348 
349  if (!sent) {
350  if (pstats_cat.is_debug()) {
351  pstats_cat.debug()
352  << "Couldn't send packet.\n";
353  }
354  }
355  }
356  }
357 }
358 
359 ////////////////////////////////////////////////////////////////////
360 // Function: PStatClientImpl::transmit_control_data
361 // Access: Private
362 // Description: Should be called once a frame to exchange control
363 // information with the server.
364 ////////////////////////////////////////////////////////////////////
365 void PStatClientImpl::
366 transmit_control_data() {
367  // Check for new messages from the server.
368  while (_is_connected && _reader.data_available()) {
369  NetDatagram datagram;
370 
371  if (_reader.get_data(datagram)) {
373  if (message.decode(datagram)) {
374  handle_server_control_message(message);
375 
376  } else {
377  pstats_cat.error()
378  << "Got unexpected message from server.\n";
379  }
380  }
381  }
382 
383  if (_is_connected) {
384  report_new_collectors();
385  report_new_threads();
386  }
387 }
388 
389 
390 ////////////////////////////////////////////////////////////////////
391 // Function: PStatClientImpl::get_hostname
392 // Access: Private
393 // Description: Returns the current machine's hostname.
394 ////////////////////////////////////////////////////////////////////
395 string PStatClientImpl::
396 get_hostname() {
397  if (_hostname.empty()) {
398  char temp_buff[1024];
399  if (gethostname(temp_buff, 1024) == 0) {
400  _hostname = temp_buff;
401  } else {
402  _hostname = "unknown";
403  }
404  }
405  return _hostname;
406 }
407 
408 ////////////////////////////////////////////////////////////////////
409 // Function: PStatClientImpl::send_hello
410 // Access: Private
411 // Description: Sends the initial greeting message to the server.
412 ////////////////////////////////////////////////////////////////////
413 void PStatClientImpl::
414 send_hello() {
415  nassertv(_is_connected);
416 
418  message._type = PStatClientControlMessage::T_hello;
419  message._client_hostname = get_hostname();
420  message._client_progname = _client_name;
421  message._major_version = get_current_pstat_major_version();
422  message._minor_version = get_current_pstat_minor_version();
423 
424  Datagram datagram;
425  message.encode(datagram);
426  _writer.send(datagram, _tcp_connection, true);
427 }
428 
429 ////////////////////////////////////////////////////////////////////
430 // Function: PStatClientImpl::report_new_collectors
431 // Access: Private
432 // Description: Sends over any information about new Collectors that
433 // the user code might have recently created.
434 ////////////////////////////////////////////////////////////////////
435 void PStatClientImpl::
436 report_new_collectors() {
437  // Empirically, we determined that you can't send more than about
438  // 1400 collectors at once without exceeding the 64K limit on a
439  // single datagram. So we limit ourselves here to sending only
440  // half that many.
441  static const int max_collectors_at_once = 700;
442 
443  while (_is_connected && _collectors_reported < _client->_num_collectors) {
445  message._type = PStatClientControlMessage::T_define_collectors;
446  int i = 0;
447  while (_collectors_reported < _client->_num_collectors &&
448  i < max_collectors_at_once) {
449  message._collectors.push_back(_client->get_collector_def(_collectors_reported));
450  _collectors_reported++;
451  i++;
452  }
453 
454  Datagram datagram;
455  message.encode(datagram);
456  _writer.send(datagram, _tcp_connection, true);
457  }
458 }
459 
460 ////////////////////////////////////////////////////////////////////
461 // Function: PStatClientImpl::report_new_threads
462 // Access: Private
463 // Description: Sends over any information about new Threads that
464 // the user code might have recently created.
465 ////////////////////////////////////////////////////////////////////
466 void PStatClientImpl::
467 report_new_threads() {
468  while (_is_connected && _threads_reported < _client->_num_threads) {
470  message._type = PStatClientControlMessage::T_define_threads;
471  message._first_thread_index = _threads_reported;
472  PStatClient::ThreadPointer *threads =
473  (PStatClient::ThreadPointer *)_client->_threads;
474  while (_threads_reported < _client->_num_threads) {
475  message._names.push_back(threads[_threads_reported]->_name);
476  _threads_reported++;
477  }
478 
479  Datagram datagram;
480  message.encode(datagram);
481  _writer.send(datagram, _tcp_connection, true);
482  }
483 }
484 
485 ////////////////////////////////////////////////////////////////////
486 // Function: PStatClientImpl::handle_server_control_message
487 // Access: Private
488 // Description: Called when a control message has been received by
489 // the server over the TCP connection.
490 ////////////////////////////////////////////////////////////////////
491 void PStatClientImpl::
492 handle_server_control_message(const PStatServerControlMessage &message) {
493  switch (message._type) {
494  case PStatServerControlMessage::T_hello:
495  pstats_cat.info()
496  << "Connected to " << message._server_progname << " on "
497  << message._server_hostname << "\n";
498 
499  _server.set_port(message._udp_port);
500  _got_udp_port = true;
501  break;
502 
503  default:
504  pstats_cat.error()
505  << "Invalid control message received from server.\n";
506  }
507 }
508 
509 ////////////////////////////////////////////////////////////////////
510 // Function: PStatClientImpl::connection_reset
511 // Access: Private, Virtual
512 // Description: Called by the internal net code when the connection
513 // has been lost.
514 ////////////////////////////////////////////////////////////////////
515 void PStatClientImpl::
516 connection_reset(const PT(Connection) &connection, bool) {
517  if (connection == _tcp_connection) {
518  client_disconnect();
519  } else if (connection == _udp_connection) {
520  pstats_cat.warning()
521  << "Trouble sending UDP; switching to TCP only.\n";
522  _tcp_count_factor = 0.0f;
523  _udp_count_factor = 1.0f;
524  } else {
525  pstats_cat.warning()
526  << "Ignoring spurious connection_reset() message\n";
527  }
528 }
529 
530 #endif // DO_PSTATS
void add_uint8(PN_uint8 value)
Adds an unsigned 8-bit integer to the datagram.
Definition: datagram.I:138
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:43
bool decode(const Datagram &datagram)
Extracts the message from the indicated datagram.
bool write_datagram(Datagram &destination, PStatClient *client) const
Writes the definition of the FrameData to the datagram.
This kind of message is sent from the server to the client on the TCP socket to establish critical co...
void encode(Datagram &datagram) const
Writes the message into the indicated datagram.
This kind of message is sent from the client to the server on the TCP socket to establish critical co...
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:457
static Thread * get_current_thread()
Returns a pointer to the currently-executing Thread object.
Definition: thread.I:145
Contains the raw timing and level data for a single frame.
An interface to whatever real-time clock we might have available in the current environment.
Definition: trueClock.h:38
int get_pstats_index() const
Returns the PStats index associated with this thread, or -1 if no index has yet been associated with ...
Definition: thread.I:60
void add_uint16(PN_uint16 value)
Adds an unsigned 16-bit integer to the datagram.
Definition: datagram.I:181
void add_uint32(PN_uint32 value)
Adds an unsigned 32-bit integer to the datagram.
Definition: datagram.I:192
Manages the communications to report statistics via a network connection to a remote PStatServer...
Definition: pStatClient.h:261
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:32
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...
Definition: datagram.h:43