Panda3D
pStatClientImpl.cxx
Go to the documentation of this file.
1/**
2 * PANDA 3D SOFTWARE
3 * Copyright (c) Carnegie Mellon University. All rights reserved.
4 *
5 * All use of this software is subject to the terms of the revised BSD
6 * license. You should have received a copy of this license along
7 * with this source code in a file named "LICENSE."
8 *
9 * @file pStatClientImpl.cxx
10 * @author drose
11 * @date 2004-12-23
12 */
13
14#include "pStatClientImpl.h"
15
16// This file only defines anything if DO_PSTATS is defined.
17#ifdef DO_PSTATS
18
19#include "pStatClient.h"
22#include "pStatCollector.h"
23#include "pStatThread.h"
24#include "config_pstatclient.h"
25#include "pStatProperties.h"
26#include "cmath.h"
27
28#include <algorithm>
29
30#if defined(WIN32_VC) || defined(WIN64_VC)
31#include <winsock2.h>
32#include <windows.h>
33#endif
34
35/**
36 *
37 */
38PStatClientImpl::
39PStatClientImpl(PStatClient *client) :
40 _clock(TrueClock::get_global_ptr()),
41 _delta(0.0),
42 _last_frame(0.0),
43 _client(client),
44 _reader(this, 0),
45 _writer(this, pstats_threaded_write ? 1 : 0)
46{
47 _writer.set_max_queue_size(pstats_max_queue_size);
48 _reader.set_tcp_header_size(4);
49 _writer.set_tcp_header_size(4);
50 _is_connected = false;
51 _got_udp_port = false;
52 _collectors_reported = 0;
53 _threads_reported = 0;
54
55 _client_name = pstats_name;
56 _max_rate = pstats_max_rate;
57
58 _tcp_count = 1;
59 _udp_count = 1;
60
61 if (pstats_tcp_ratio >= 1.0f) {
62 _tcp_count_factor = 0.0f;
63 _udp_count_factor = 1.0f;
64
65 } else if (pstats_tcp_ratio <= 0.0f) {
66 _tcp_count_factor = 1.0f;
67 _udp_count_factor = 0.0f;
68
69 } else {
70 csincos(pstats_tcp_ratio * (3.14159265f / 2.0f),
71 &_udp_count_factor,
72 &_tcp_count_factor);
73 }
74}
75
76/**
77 *
78 */
79PStatClientImpl::
80~PStatClientImpl() {
81 nassertv(!_is_connected);
82}
83
84/**
85 * Called only by PStatClient::client_connect().
86 */
87bool PStatClientImpl::
88client_connect(std::string hostname, int port) {
89 nassertr(!_is_connected, true);
90
91 if (hostname.empty()) {
92 hostname = pstats_host;
93 }
94 if (port < 0) {
95 port = pstats_port;
96 }
97
98 if (!_server.set_host(hostname, port)) {
99 pstats_cat.error()
100 << "Unknown host: " << hostname << "\n";
101 return false;
102 }
103
104 _tcp_connection = open_TCP_client_connection(_server, 5000);
105
106 if (_tcp_connection.is_null()) {
107 pstats_cat.error()
108 << "Couldn't connect to PStatServer at " << hostname << ":"
109 << port << "\n";
110 return false;
111 }
112 // Make sure we're not queuing up multiple TCP sockets--we expect immediate
113 // writes of our TCP datagrams.
114 _tcp_connection->set_collect_tcp(false);
115
116 _reader.add_connection(_tcp_connection);
117 _is_connected = true;
118
119 _udp_connection = open_UDP_connection();
120
121 send_hello();
122
123#ifdef DEBUG_THREADS
124 MutexDebug::increment_pstats();
125#endif // DEBUG_THREADS
126
127 return _is_connected;
128}
129
130/**
131 * Called only by PStatClient::client_disconnect().
132 */
133void PStatClientImpl::
134client_disconnect() {
135 if (_is_connected) {
136#ifdef DEBUG_THREADS
137 MutexDebug::decrement_pstats();
138#endif // DEBUG_THREADS
139 _reader.remove_connection(_tcp_connection);
140 close_connection(_tcp_connection);
141 close_connection(_udp_connection);
142 }
143
144 _tcp_connection.clear();
145 _udp_connection.clear();
146
147 _is_connected = false;
148 _got_udp_port = false;
149
150 _collectors_reported = 0;
151 _threads_reported = 0;
152}
153
154/**
155 * Called by the PStatThread interface at the beginning of every frame, for
156 * each thread. This resets the clocks for the new frame and transmits the
157 * data for the previous frame.
158 */
159void PStatClientImpl::
160new_frame(int thread_index) {
161 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
162
163 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
164
165 // If we're the main thread, we should exchange control packets with the
166 // server.
167 if (thread_index == 0) {
168 transmit_control_data();
169 }
170
171 // If we've got the UDP port by the time the frame starts, it's time to
172 // become active and start actually tracking data.
173 if (_got_udp_port) {
174 pthread->_is_active = true;
175 }
176
177 if (!pthread->_is_active) {
178 return;
179 }
180
181 double frame_start = get_real_time();
182 int frame_number = -1;
183 PStatFrameData frame_data;
184
185 if (!pthread->_frame_data.is_empty()) {
186 // Collector 0 is the whole frame.
187 _client->stop(0, thread_index, frame_start);
188
189 // Fill up the level data for all the collectors who have level data for
190 // this pthread.
191 int num_collectors = _client->_num_collectors;
192 PStatClient::CollectorPointer *collectors =
193 (PStatClient::CollectorPointer *)_client->_collectors;
194 for (int i = 0; i < num_collectors; i++) {
195 const PStatClient::PerThreadData &ptd =
196 collectors[i]->_per_thread[thread_index];
197 if (ptd._has_level) {
198 pthread->_frame_data.add_level(i, ptd._level);
199 }
200 }
201 pthread->_frame_data.swap(frame_data);
202 frame_number = pthread->_frame_number;
203 }
204
205 pthread->_frame_data.clear();
206 pthread->_frame_number++;
207 _client->start(0, thread_index, frame_start);
208
209 // Also record the time for the PStats operation itself.
210 int current_thread_index = Thread::get_current_thread()->get_pstats_index();
211 int pstats_index = PStatClient::_pstats_pcollector.get_index();
212 _client->start(pstats_index, current_thread_index, frame_start);
213
214 if (frame_number != -1) {
215 transmit_frame_data(thread_index, frame_number, frame_data);
216 }
217 _client->stop(pstats_index, current_thread_index, get_real_time());
218}
219
220/**
221 * Slightly lower-level interface than new_frame that takes a set of frame
222 * data.
223 */
224void PStatClientImpl::
225add_frame(int thread_index, const PStatFrameData &frame_data) {
226 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
227
228 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
229
230 // If we're the main thread, we should exchange control packets with the
231 // server.
232 if (thread_index == 0) {
233 transmit_control_data();
234 }
235
236 // If we've got the UDP port by the time the frame starts, it's time to
237 // become active and start actually tracking data.
238 if (_got_udp_port) {
239 pthread->_is_active = true;
240 }
241
242 if (!pthread->_is_active) {
243 return;
244 }
245
246 int frame_number = pthread->_frame_number++;
247
248 // Also record the time for the PStats operation itself.
249 int current_thread_index = Thread::get_current_thread()->get_pstats_index();
250 int pstats_index = PStatClient::_pstats_pcollector.get_index();
251 _client->start(pstats_index, current_thread_index);
252
253 if (frame_number != -1) {
254 transmit_frame_data(thread_index, frame_number, frame_data);
255 }
256 _client->stop(pstats_index, current_thread_index);
257}
258
259/**
260 * Should be called once per frame per thread to transmit the latest data to
261 * the PStatServer.
262 */
263void PStatClientImpl::
264transmit_frame_data(int thread_index, int frame_number,
265 const PStatFrameData &frame_data) {
266 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
267 PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index);
268 if (_is_connected && thread->_is_active) {
269
270 // We don't want to send too many packets in a hurry and flood the server.
271 // Check that enough time has elapsed for us to send a new packet. If
272 // not, we'll drop this packet on the floor and send a new one next time
273 // around.
274 double now = get_real_time();
275 if (now >= thread->_next_packet) {
276 // We don't want to send more than _max_rate UDP-size packets per
277 // second, per thread.
278 double packet_delay = 1.0 / _max_rate;
279
280 // Send new data.
281 NetDatagram datagram;
282 // We always start with a zero byte, to differentiate it from a control
283 // message.
284 datagram.add_uint8(0);
285
286 datagram.add_uint16(thread_index);
287 datagram.add_uint32(frame_number);
288
289 bool sent;
290
291 if (!frame_data.write_datagram(datagram, _client)) {
292 // Too many events to fit in a single datagram. Maybe it was a long
293 // frame load or something. Just drop the datagram.
294 sent = false;
295
296 } else if (_writer.is_valid_for_udp(datagram)) {
297 if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) {
298 // Send this one as a UDP packet.
299 nassertv(_got_udp_port);
300 sent = _writer.send(datagram, _udp_connection, _server);
301 _udp_count++;
302
303 if (_udp_count == 0) {
304 // Wraparound!
305 _udp_count = 1;
306 _tcp_count = 1;
307 }
308
309 } else {
310 // Send this one as a TCP packet.
311 sent = _writer.send(datagram, _tcp_connection);
312 _tcp_count++;
313
314 if (_tcp_count == 0) {
315 // Wraparound!
316 _udp_count = 1;
317 _tcp_count = 1;
318 }
319 }
320
321 } else {
322 sent = _writer.send(datagram, _tcp_connection);
323 // If our packets are so large that we must ship them via TCP, then
324 // artificially slow down the packet rate even further.
325 int packet_ratio =
326 (datagram.get_length() + maximum_udp_datagram - 1) /
327 maximum_udp_datagram;
328 packet_delay *= (double)packet_ratio;
329 }
330
331 thread->_next_packet = now + packet_delay;
332
333 if (!sent) {
334 if (pstats_cat.is_debug()) {
335 pstats_cat.debug()
336 << "Couldn't send packet.\n";
337 }
338 }
339 }
340 }
341}
342
343/**
344 * Should be called once a frame to exchange control information with the
345 * server.
346 */
347void PStatClientImpl::
348transmit_control_data() {
349 // Check for new messages from the server.
350 while (_is_connected && _reader.data_available()) {
351 NetDatagram datagram;
352
353 if (_reader.get_data(datagram)) {
355 if (message.decode(datagram)) {
356 handle_server_control_message(message);
357
358 } else {
359 pstats_cat.error()
360 << "Got unexpected message from server.\n";
361 }
362 }
363 }
364
365 if (_is_connected) {
366 report_new_collectors();
367 report_new_threads();
368 }
369}
370
371
372/**
373 * Returns the current machine's hostname.
374 */
375std::string PStatClientImpl::
376get_hostname() {
377 if (_hostname.empty()) {
378 char temp_buff[1024];
379 if (gethostname(temp_buff, 1024) == 0) {
380 _hostname = temp_buff;
381 } else {
382 _hostname = "unknown";
383 }
384 }
385 return _hostname;
386}
387
388/**
389 * Sends the initial greeting message to the server.
390 */
391void PStatClientImpl::
392send_hello() {
393 nassertv(_is_connected);
394
396 message._type = PStatClientControlMessage::T_hello;
397 message._client_hostname = get_hostname();
398 message._client_progname = _client_name;
399 message._major_version = get_current_pstat_major_version();
400 message._minor_version = get_current_pstat_minor_version();
401
402 Datagram datagram;
403 message.encode(datagram);
404 _writer.send(datagram, _tcp_connection, true);
405}
406
407/**
408 * Sends over any information about new Collectors that the user code might
409 * have recently created.
410 */
411void PStatClientImpl::
412report_new_collectors() {
413 // Empirically, we determined that you can't send more than about 1400
414 // collectors at once without exceeding the 64K limit on a single datagram.
415 // So we limit ourselves here to sending only half that many.
416 static const int max_collectors_at_once = 700;
417
418 while (_is_connected && _collectors_reported < _client->_num_collectors) {
420 message._type = PStatClientControlMessage::T_define_collectors;
421 int i = 0;
422 while (_collectors_reported < _client->_num_collectors &&
423 i < max_collectors_at_once) {
424 message._collectors.push_back(_client->get_collector_def(_collectors_reported));
425 _collectors_reported++;
426 i++;
427 }
428
429 Datagram datagram;
430 message.encode(datagram);
431 _writer.send(datagram, _tcp_connection, true);
432 }
433}
434
435/**
436 * Sends over any information about new Threads that the user code might have
437 * recently created.
438 */
439void PStatClientImpl::
440report_new_threads() {
441 while (_is_connected && _threads_reported < _client->_num_threads) {
443 message._type = PStatClientControlMessage::T_define_threads;
444 message._first_thread_index = _threads_reported;
445 PStatClient::ThreadPointer *threads =
446 (PStatClient::ThreadPointer *)_client->_threads;
447 while (_threads_reported < _client->_num_threads) {
448 message._names.push_back(threads[_threads_reported]->_name);
449 _threads_reported++;
450 }
451
452 Datagram datagram;
453 message.encode(datagram);
454 _writer.send(datagram, _tcp_connection, true);
455 }
456}
457
458/**
459 * Called when a control message has been received by the server over the TCP
460 * connection.
461 */
462void PStatClientImpl::
463handle_server_control_message(const PStatServerControlMessage &message) {
464 switch (message._type) {
465 case PStatServerControlMessage::T_hello:
466 pstats_cat.info()
467 << "Connected to " << message._server_progname << " on "
468 << message._server_hostname << "\n";
469
470 _server.set_port(message._udp_port);
471 _got_udp_port = true;
472 break;
473
474 default:
475 pstats_cat.error()
476 << "Invalid control message received from server.\n";
477 }
478}
479
480/**
481 * Called by the internal net code when the connection has been lost.
482 */
483void PStatClientImpl::
484connection_reset(const PT(Connection) &connection, bool) {
485 if (connection == _tcp_connection) {
486 client_disconnect();
487 } else if (connection == _udp_connection) {
488 pstats_cat.warning()
489 << "Trouble sending UDP; switching to TCP only.\n";
490 _tcp_count_factor = 0.0f;
491 _udp_count_factor = 1.0f;
492 } else {
493 pstats_cat.warning()
494 << "Ignoring spurious connection_reset() message\n";
495 }
496}
497
498#endif // DO_PSTATS
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:29
An ordered list of data elements, formatted in memory for transmission over a socket or writing to a ...
Definition: datagram.h:38
void add_uint32(uint32_t value)
Adds an unsigned 32-bit integer to the datagram.
Definition: datagram.I:94
size_t get_length() const
Returns the number of bytes in the datagram.
Definition: datagram.I:335
void add_uint8(uint8_t value)
Adds an unsigned 8-bit integer to the datagram.
Definition: datagram.I:50
void add_uint16(uint16_t value)
Adds an unsigned 16-bit integer to the datagram.
Definition: datagram.I:85
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
This kind of message is sent from the client to the server on the TCP socket to establish critical co...
void encode(Datagram &datagram) const
Writes the message into the indicated datagram.
Manages the communications to report statistics via a network connection to a remote PStatServer.
Definition: pStatClient.h:263
Contains the raw timing and level data for a single frame.
bool write_datagram(Datagram &destination, PStatClient *client) const
Writes the definition of the FrameData to the datagram.
This kind of message is sent from the server to the client on the TCP socket to establish critical co...
bool decode(const Datagram &datagram)
Extracts the message from the indicated datagram.
get_current_thread
Returns a pointer to the currently-executing Thread object.
Definition: thread.h:109
An interface to whatever real-time clock we might have available in the current environment.
Definition: trueClock.h:33
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int get_current_pstat_major_version()
Returns the current major version number of the PStats protocol.
int get_current_pstat_minor_version()
Returns the current minor version number of the PStats protocol.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.