30 #if defined(WIN32_VC) || defined(WIN64_VC)
45 _writer(this, pstats_threaded_write ? 1 : 0)
47 _writer.set_max_queue_size(pstats_max_queue_size);
48 _reader.set_tcp_header_size(4);
49 _writer.set_tcp_header_size(4);
50 _is_connected =
false;
51 _got_udp_port =
false;
52 _collectors_reported = 0;
53 _threads_reported = 0;
55 _client_name = pstats_name;
56 _max_rate = pstats_max_rate;
61 if (pstats_tcp_ratio >= 1.0f) {
62 _tcp_count_factor = 0.0f;
63 _udp_count_factor = 1.0f;
65 }
else if (pstats_tcp_ratio <= 0.0f) {
66 _tcp_count_factor = 1.0f;
67 _udp_count_factor = 0.0f;
70 csincos(pstats_tcp_ratio * (3.14159265f / 2.0f),
81 nassertv(!_is_connected);
87 bool PStatClientImpl::
88 client_connect(std::string hostname,
int port) {
89 nassertr(!_is_connected,
true);
91 if (hostname.empty()) {
92 hostname = pstats_host;
98 if (!_server.set_host(hostname, port)) {
100 <<
"Unknown host: " << hostname <<
"\n";
104 _tcp_connection = open_TCP_client_connection(_server, 5000);
106 if (_tcp_connection.is_null()) {
108 <<
"Couldn't connect to PStatServer at " << hostname <<
":"
114 _tcp_connection->set_collect_tcp(
false);
116 _reader.add_connection(_tcp_connection);
117 _is_connected =
true;
119 _udp_connection = open_UDP_connection();
124 MutexDebug::increment_pstats();
125 #endif // DEBUG_THREADS
127 return _is_connected;
133 void PStatClientImpl::
134 client_disconnect() {
137 MutexDebug::decrement_pstats();
138 #endif // DEBUG_THREADS
139 _reader.remove_connection(_tcp_connection);
140 close_connection(_tcp_connection);
141 close_connection(_udp_connection);
144 _tcp_connection.clear();
145 _udp_connection.clear();
147 _is_connected =
false;
148 _got_udp_port =
false;
150 _collectors_reported = 0;
151 _threads_reported = 0;
159 void PStatClientImpl::
160 new_frame(
int thread_index) {
161 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
163 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
167 if (thread_index == 0) {
168 transmit_control_data();
174 pthread->_is_active =
true;
177 if (!pthread->_is_active) {
181 double frame_start = get_real_time();
182 int frame_number = -1;
185 if (!pthread->_frame_data.is_empty()) {
187 _client->stop(0, thread_index, frame_start);
191 int num_collectors = _client->_num_collectors;
192 PStatClient::CollectorPointer *collectors =
193 (PStatClient::CollectorPointer *)_client->_collectors;
194 for (
int i = 0; i < num_collectors; i++) {
195 const PStatClient::PerThreadData &ptd =
196 collectors[i]->_per_thread[thread_index];
197 if (ptd._has_level) {
198 pthread->_frame_data.add_level(i, ptd._level);
201 pthread->_frame_data.swap(frame_data);
202 frame_number = pthread->_frame_number;
205 pthread->_frame_data.clear();
206 pthread->_frame_number++;
207 _client->start(0, thread_index, frame_start);
211 int pstats_index = PStatClient::_pstats_pcollector.get_index();
212 _client->start(pstats_index, current_thread_index, frame_start);
214 if (frame_number != -1) {
215 transmit_frame_data(thread_index, frame_number, frame_data);
217 _client->stop(pstats_index, current_thread_index, get_real_time());
224 void PStatClientImpl::
226 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
228 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
232 if (thread_index == 0) {
233 transmit_control_data();
239 pthread->_is_active =
true;
242 if (!pthread->_is_active) {
246 int frame_number = pthread->_frame_number++;
250 int pstats_index = PStatClient::_pstats_pcollector.get_index();
251 _client->start(pstats_index, current_thread_index);
253 if (frame_number != -1) {
254 transmit_frame_data(thread_index, frame_number, frame_data);
256 _client->stop(pstats_index, current_thread_index);
263 void PStatClientImpl::
264 transmit_frame_data(
int thread_index,
int frame_number,
266 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
267 PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index);
268 if (_is_connected && thread->_is_active) {
274 double now = get_real_time();
275 if (now >= thread->_next_packet) {
278 double packet_delay = 1.0 / _max_rate;
296 }
else if (_writer.is_valid_for_udp(datagram)) {
297 if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) {
299 nassertv(_got_udp_port);
300 sent = _writer.send(datagram, _udp_connection, _server);
303 if (_udp_count == 0) {
311 sent = _writer.send(datagram, _tcp_connection);
314 if (_tcp_count == 0) {
322 sent = _writer.send(datagram, _tcp_connection);
326 (datagram.
get_length() + maximum_udp_datagram - 1) /
327 maximum_udp_datagram;
328 packet_delay *= (double)packet_ratio;
331 thread->_next_packet = now + packet_delay;
334 if (pstats_cat.is_debug()) {
336 <<
"Couldn't send packet.\n";
347 void PStatClientImpl::
348 transmit_control_data() {
350 while (_is_connected && _reader.data_available()) {
353 if (_reader.get_data(datagram)) {
355 if (message.
decode(datagram)) {
356 handle_server_control_message(message);
360 <<
"Got unexpected message from server.\n";
366 report_new_collectors();
367 report_new_threads();
375 std::string PStatClientImpl::
377 if (_hostname.empty()) {
378 char temp_buff[1024];
379 if (gethostname(temp_buff, 1024) == 0) {
380 _hostname = temp_buff;
382 _hostname =
"unknown";
391 void PStatClientImpl::
393 nassertv(_is_connected);
396 message._type = PStatClientControlMessage::T_hello;
397 message._client_hostname = get_hostname();
398 message._client_progname = _client_name;
404 _writer.send(datagram, _tcp_connection,
true);
411 void PStatClientImpl::
412 report_new_collectors() {
416 static const int max_collectors_at_once = 700;
418 while (_is_connected && _collectors_reported < _client->_num_collectors) {
420 message._type = PStatClientControlMessage::T_define_collectors;
422 while (_collectors_reported < _client->_num_collectors &&
423 i < max_collectors_at_once) {
424 message._collectors.push_back(_client->get_collector_def(_collectors_reported));
425 _collectors_reported++;
431 _writer.send(datagram, _tcp_connection,
true);
439 void PStatClientImpl::
440 report_new_threads() {
441 while (_is_connected && _threads_reported < _client->_num_threads) {
443 message._type = PStatClientControlMessage::T_define_threads;
444 message._first_thread_index = _threads_reported;
445 PStatClient::ThreadPointer *threads =
446 (PStatClient::ThreadPointer *)_client->_threads;
447 while (_threads_reported < _client->_num_threads) {
448 message._names.push_back(threads[_threads_reported]->_name);
454 _writer.send(datagram, _tcp_connection,
true);
462 void PStatClientImpl::
464 switch (message._type) {
465 case PStatServerControlMessage::T_hello:
467 <<
"Connected to " << message._server_progname <<
" on "
468 << message._server_hostname <<
"\n";
470 _server.set_port(message._udp_port);
471 _got_udp_port =
true;
476 <<
"Invalid control message received from server.\n";
483 void PStatClientImpl::
484 connection_reset(
const PT(
Connection) &connection,
bool) {
485 if (connection == _tcp_connection) {
487 }
else if (connection == _udp_connection) {
489 <<
"Trouble sending UDP; switching to TCP only.\n";
490 _tcp_count_factor = 0.0f;
491 _udp_count_factor = 1.0f;
494 <<
"Ignoring spurious connection_reset() message\n";