00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "pStatClientImpl.h"
00016
00017
00018 #ifdef DO_PSTATS
00019
00020 #include "pStatClient.h"
00021 #include "pStatClientControlMessage.h"
00022 #include "pStatServerControlMessage.h"
00023 #include "pStatCollector.h"
00024 #include "pStatThread.h"
00025 #include "config_pstats.h"
00026 #include "pStatProperties.h"
00027 #include "cmath.h"
00028
00029 #include <algorithm>
00030
00031 #if defined(WIN32_VC) || defined(WIN64_VC)
00032 #include <Winsock2.h>
00033 #include <windows.h>
00034 #endif
00035
00036
00037
00038
00039
00040
00041 PStatClientImpl::
00042 PStatClientImpl(PStatClient *client) :
00043 _clock(TrueClock::get_global_ptr()),
00044 _delta(0.0),
00045 _last_frame(0.0),
00046 _client(client),
00047 _reader(this, 0),
00048 _writer(this, pstats_threaded_write ? 1 : 0)
00049 {
00050 _writer.set_max_queue_size(pstats_max_queue_size);
00051 _reader.set_tcp_header_size(4);
00052 _writer.set_tcp_header_size(4);
00053 _is_connected = false;
00054 _got_udp_port = false;
00055 _collectors_reported = 0;
00056 _threads_reported = 0;
00057
00058 _client_name = pstats_name;
00059 _max_rate = pstats_max_rate;
00060
00061 _tcp_count = 1;
00062 _udp_count = 1;
00063
00064 if (pstats_tcp_ratio >= 1.0f) {
00065 _tcp_count_factor = 0.0f;
00066 _udp_count_factor = 1.0f;
00067
00068 } else if (pstats_tcp_ratio <= 0.0f) {
00069 _tcp_count_factor = 1.0f;
00070 _udp_count_factor = 0.0f;
00071
00072 } else {
00073 csincos(pstats_tcp_ratio * (3.14159265f / 2.0f),
00074 &_udp_count_factor,
00075 &_tcp_count_factor);
00076 }
00077 }
00078
00079
00080
00081
00082
00083
00084 PStatClientImpl::
00085 ~PStatClientImpl() {
00086 nassertv(!_is_connected);
00087 }
00088
00089
00090
00091
00092
00093
00094 bool PStatClientImpl::
00095 client_connect(string hostname, int port) {
00096 nassertr(!_is_connected, true);
00097
00098 if (hostname.empty()) {
00099 hostname = pstats_host;
00100 }
00101 if (port < 0) {
00102 port = pstats_port;
00103 }
00104
00105 if (!_server.set_host(hostname, port)) {
00106 pstats_cat.error()
00107 << "Unknown host: " << hostname << "\n";
00108 return false;
00109 }
00110
00111 _tcp_connection = open_TCP_client_connection(_server, 5000);
00112
00113 if (_tcp_connection.is_null()) {
00114 pstats_cat.error()
00115 << "Couldn't connect to PStatServer at " << hostname << ":"
00116 << port << "\n";
00117 return false;
00118 }
00119
00120
00121 _tcp_connection->set_collect_tcp(false);
00122
00123 _reader.add_connection(_tcp_connection);
00124 _is_connected = true;
00125
00126 _udp_connection = open_UDP_connection();
00127
00128 send_hello();
00129
00130 #ifdef DEBUG_THREADS
00131 MutexDebug::increment_pstats();
00132 #endif // DEBUG_THREADS
00133
00134 return _is_connected;
00135 }
00136
00137
00138
00139
00140
00141
00142 void PStatClientImpl::
00143 client_disconnect() {
00144 if (_is_connected) {
00145 #ifdef DEBUG_THREADS
00146 MutexDebug::decrement_pstats();
00147 #endif // DEBUG_THREADS
00148 _reader.remove_connection(_tcp_connection);
00149 close_connection(_tcp_connection);
00150 close_connection(_udp_connection);
00151 }
00152
00153 _tcp_connection.clear();
00154 _udp_connection.clear();
00155
00156 _is_connected = false;
00157 _got_udp_port = false;
00158
00159 _collectors_reported = 0;
00160 _threads_reported = 0;
00161 }
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 void PStatClientImpl::
00172 new_frame(int thread_index) {
00173 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
00174
00175 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
00176
00177
00178
00179 if (thread_index == 0) {
00180 transmit_control_data();
00181 }
00182
00183
00184
00185 if (_got_udp_port) {
00186 pthread->_is_active = true;
00187 }
00188
00189 if (!pthread->_is_active) {
00190 return;
00191 }
00192
00193 double frame_start = get_real_time();
00194 int frame_number = -1;
00195 PStatFrameData frame_data;
00196
00197 if (!pthread->_frame_data.is_empty()) {
00198
00199 _client->stop(0, thread_index, frame_start);
00200
00201
00202
00203 int num_collectors = _client->_num_collectors;
00204 PStatClient::CollectorPointer *collectors =
00205 (PStatClient::CollectorPointer *)_client->_collectors;
00206 for (int i = 0; i < num_collectors; i++) {
00207 const PStatClient::PerThreadData &ptd =
00208 collectors[i]->_per_thread[thread_index];
00209 if (ptd._has_level) {
00210 pthread->_frame_data.add_level(i, ptd._level);
00211 }
00212 }
00213 pthread->_frame_data.swap(frame_data);
00214 frame_number = pthread->_frame_number;
00215 }
00216
00217 pthread->_frame_data.clear();
00218 pthread->_frame_number++;
00219 _client->start(0, thread_index, frame_start);
00220
00221
00222 int pstats_index = PStatClient::_pstats_pcollector.get_index();
00223 _client->start(pstats_index, thread_index, frame_start);
00224
00225 if (frame_number != -1) {
00226 transmit_frame_data(thread_index, frame_number, frame_data);
00227 }
00228 _client->stop(pstats_index, thread_index, get_real_time());
00229 }
00230
00231
00232
00233
00234
00235
00236
00237 void PStatClientImpl::
00238 transmit_frame_data(int thread_index, int frame_number,
00239 const PStatFrameData &frame_data) {
00240 nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
00241 PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index);
00242 if (_is_connected && thread->_is_active) {
00243
00244
00245
00246
00247
00248 double now = get_real_time();
00249 if (now >= thread->_next_packet) {
00250
00251
00252 double packet_delay = 1.0 / _max_rate;
00253
00254
00255 NetDatagram datagram;
00256
00257
00258 datagram.add_uint8(0);
00259
00260 datagram.add_uint16(thread_index);
00261 datagram.add_uint32(frame_number);
00262
00263 bool sent;
00264
00265 if (!frame_data.write_datagram(datagram, _client)) {
00266
00267
00268 sent = false;
00269
00270 } else if (_writer.is_valid_for_udp(datagram)) {
00271 if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) {
00272
00273 nassertv(_got_udp_port);
00274 sent = _writer.send(datagram, _udp_connection, _server);
00275 _udp_count++;
00276
00277 if (_udp_count == 0) {
00278
00279 _udp_count = 1;
00280 _tcp_count = 1;
00281 }
00282
00283 } else {
00284
00285 sent = _writer.send(datagram, _tcp_connection);
00286 _tcp_count++;
00287
00288 if (_tcp_count == 0) {
00289
00290 _udp_count = 1;
00291 _tcp_count = 1;
00292 }
00293 }
00294
00295 } else {
00296 sent = _writer.send(datagram, _tcp_connection);
00297
00298
00299 int packet_ratio =
00300 (datagram.get_length() + maximum_udp_datagram - 1) /
00301 maximum_udp_datagram;
00302 packet_delay *= (double)packet_ratio;
00303 }
00304
00305 thread->_next_packet = now + packet_delay;
00306
00307 if (!sent) {
00308 if (pstats_cat.is_debug()) {
00309 pstats_cat.debug()
00310 << "Couldn't send packet.\n";
00311 }
00312 }
00313 }
00314 }
00315 }
00316
00317
00318
00319
00320
00321
00322
00323 void PStatClientImpl::
00324 transmit_control_data() {
00325
00326 while (_is_connected && _reader.data_available()) {
00327 NetDatagram datagram;
00328
00329 if (_reader.get_data(datagram)) {
00330 PStatServerControlMessage message;
00331 if (message.decode(datagram)) {
00332 handle_server_control_message(message);
00333
00334 } else {
00335 pstats_cat.error()
00336 << "Got unexpected message from server.\n";
00337 }
00338 }
00339 }
00340
00341 if (_is_connected) {
00342 report_new_collectors();
00343 report_new_threads();
00344 }
00345 }
00346
00347
00348
00349
00350
00351
00352
00353 string PStatClientImpl::
00354 get_hostname() {
00355 if (_hostname.empty()) {
00356 char temp_buff[1024];
00357 if (gethostname(temp_buff, 1024) == 0) {
00358 _hostname = temp_buff;
00359 } else {
00360 _hostname = "unknown";
00361 }
00362 }
00363 return _hostname;
00364 }
00365
00366
00367
00368
00369
00370
00371 void PStatClientImpl::
00372 send_hello() {
00373 nassertv(_is_connected);
00374
00375 PStatClientControlMessage message;
00376 message._type = PStatClientControlMessage::T_hello;
00377 message._client_hostname = get_hostname();
00378 message._client_progname = _client_name;
00379 message._major_version = get_current_pstat_major_version();
00380 message._minor_version = get_current_pstat_minor_version();
00381
00382 Datagram datagram;
00383 message.encode(datagram);
00384 _writer.send(datagram, _tcp_connection, true);
00385 }
00386
00387
00388
00389
00390
00391
00392
00393 void PStatClientImpl::
00394 report_new_collectors() {
00395
00396
00397
00398
00399 static const int max_collectors_at_once = 700;
00400
00401 while (_is_connected && _collectors_reported < _client->_num_collectors) {
00402 PStatClientControlMessage message;
00403 message._type = PStatClientControlMessage::T_define_collectors;
00404 int i = 0;
00405 while (_collectors_reported < _client->_num_collectors &&
00406 i < max_collectors_at_once) {
00407 message._collectors.push_back(_client->get_collector_def(_collectors_reported));
00408 _collectors_reported++;
00409 i++;
00410 }
00411
00412 Datagram datagram;
00413 message.encode(datagram);
00414 _writer.send(datagram, _tcp_connection, true);
00415 }
00416 }
00417
00418
00419
00420
00421
00422
00423
00424 void PStatClientImpl::
00425 report_new_threads() {
00426 while (_is_connected && _threads_reported < _client->_num_threads) {
00427 PStatClientControlMessage message;
00428 message._type = PStatClientControlMessage::T_define_threads;
00429 message._first_thread_index = _threads_reported;
00430 PStatClient::ThreadPointer *threads =
00431 (PStatClient::ThreadPointer *)_client->_threads;
00432 while (_threads_reported < _client->_num_threads) {
00433 message._names.push_back(threads[_threads_reported]->_name);
00434 _threads_reported++;
00435 }
00436
00437 Datagram datagram;
00438 message.encode(datagram);
00439 _writer.send(datagram, _tcp_connection, true);
00440 }
00441 }
00442
00443
00444
00445
00446
00447
00448
00449 void PStatClientImpl::
00450 handle_server_control_message(const PStatServerControlMessage &message) {
00451 switch (message._type) {
00452 case PStatServerControlMessage::T_hello:
00453 pstats_cat.info()
00454 << "Connected to " << message._server_progname << " on "
00455 << message._server_hostname << "\n";
00456
00457 _server.set_port(message._udp_port);
00458 _got_udp_port = true;
00459 break;
00460
00461 default:
00462 pstats_cat.error()
00463 << "Invalid control message received from server.\n";
00464 }
00465 }
00466
00467
00468
00469
00470
00471
00472
00473 void PStatClientImpl::
00474 connection_reset(const PT(Connection) &connection, bool) {
00475 if (connection == _tcp_connection) {
00476 client_disconnect();
00477 } else if (connection == _udp_connection) {
00478 pstats_cat.warning()
00479 << "Trouble sending UDP; switching to TCP only.\n";
00480 _tcp_count_factor = 0.0f;
00481 _udp_count_factor = 1.0f;
00482 } else {
00483 pstats_cat.warning()
00484 << "Ignoring spurious connection_reset() message\n";
00485 }
00486 }
00487
00488 #endif // DO_PSTATS