Panda3D
|
00001 // Filename: pStatClientImpl.cxx 00002 // Created by: drose (23Dec04) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) Carnegie Mellon University. All rights reserved. 00008 // 00009 // All use of this software is subject to the terms of the revised BSD 00010 // license. You should have received a copy of this license along 00011 // with this source code in a file named "LICENSE." 00012 // 00013 //////////////////////////////////////////////////////////////////// 00014 00015 #include "pStatClientImpl.h" 00016 00017 // This file only defines anything if DO_PSTATS is defined. 00018 #ifdef DO_PSTATS 00019 00020 #include "pStatClient.h" 00021 #include "pStatClientControlMessage.h" 00022 #include "pStatServerControlMessage.h" 00023 #include "pStatCollector.h" 00024 #include "pStatThread.h" 00025 #include "config_pstats.h" 00026 #include "pStatProperties.h" 00027 #include "cmath.h" 00028 00029 #include <algorithm> 00030 00031 #if defined(WIN32_VC) || defined(WIN64_VC) 00032 #include <Winsock2.h> 00033 #include <windows.h> 00034 #endif 00035 00036 //////////////////////////////////////////////////////////////////// 00037 // Function: PStatClientImpl::Constructor 00038 // Access: Public 00039 // Description: 00040 //////////////////////////////////////////////////////////////////// 00041 PStatClientImpl:: 00042 PStatClientImpl(PStatClient *client) : 00043 _clock(TrueClock::get_global_ptr()), 00044 _delta(0.0), 00045 _last_frame(0.0), 00046 _client(client), 00047 _reader(this, 0), 00048 _writer(this, pstats_threaded_write ? 1 : 0) 00049 { 00050 _writer.set_max_queue_size(pstats_max_queue_size); 00051 _reader.set_tcp_header_size(4); 00052 _writer.set_tcp_header_size(4); 00053 _is_connected = false; 00054 _got_udp_port = false; 00055 _collectors_reported = 0; 00056 _threads_reported = 0; 00057 00058 _client_name = pstats_name; 00059 _max_rate = pstats_max_rate; 00060 00061 _tcp_count = 1; 00062 _udp_count = 1; 00063 00064 if (pstats_tcp_ratio >= 1.0f) { 00065 _tcp_count_factor = 0.0f; 00066 _udp_count_factor = 1.0f; 00067 00068 } else if (pstats_tcp_ratio <= 0.0f) { 00069 _tcp_count_factor = 1.0f; 00070 _udp_count_factor = 0.0f; 00071 00072 } else { 00073 csincos(pstats_tcp_ratio * (3.14159265f / 2.0f), 00074 &_udp_count_factor, 00075 &_tcp_count_factor); 00076 } 00077 } 00078 00079 //////////////////////////////////////////////////////////////////// 00080 // Function: PStatClientImpl::Destructor 00081 // Access: Public 00082 // Description: 00083 //////////////////////////////////////////////////////////////////// 00084 PStatClientImpl:: 00085 ~PStatClientImpl() { 00086 nassertv(!_is_connected); 00087 } 00088 00089 //////////////////////////////////////////////////////////////////// 00090 // Function: PStatClientImpl::client_connect 00091 // Access: Public 00092 // Description: Called only by PStatClient::client_connect(). 00093 //////////////////////////////////////////////////////////////////// 00094 bool PStatClientImpl:: 00095 client_connect(string hostname, int port) { 00096 nassertr(!_is_connected, true); 00097 00098 if (hostname.empty()) { 00099 hostname = pstats_host; 00100 } 00101 if (port < 0) { 00102 port = pstats_port; 00103 } 00104 00105 if (!_server.set_host(hostname, port)) { 00106 pstats_cat.error() 00107 << "Unknown host: " << hostname << "\n"; 00108 return false; 00109 } 00110 00111 _tcp_connection = open_TCP_client_connection(_server, 5000); 00112 00113 if (_tcp_connection.is_null()) { 00114 pstats_cat.error() 00115 << "Couldn't connect to PStatServer at " << hostname << ":" 00116 << port << "\n"; 00117 return false; 00118 } 00119 // Make sure we're not queuing up multiple TCP sockets--we expect 00120 // immediate writes of our TCP datagrams. 00121 _tcp_connection->set_collect_tcp(false); 00122 00123 _reader.add_connection(_tcp_connection); 00124 _is_connected = true; 00125 00126 _udp_connection = open_UDP_connection(); 00127 00128 send_hello(); 00129 00130 #ifdef DEBUG_THREADS 00131 MutexDebug::increment_pstats(); 00132 #endif // DEBUG_THREADS 00133 00134 return _is_connected; 00135 } 00136 00137 //////////////////////////////////////////////////////////////////// 00138 // Function: PStatClientImpl::client_disconnect 00139 // Access: Public 00140 // Description: Called only by PStatClient::client_disconnect(). 00141 //////////////////////////////////////////////////////////////////// 00142 void PStatClientImpl:: 00143 client_disconnect() { 00144 if (_is_connected) { 00145 #ifdef DEBUG_THREADS 00146 MutexDebug::decrement_pstats(); 00147 #endif // DEBUG_THREADS 00148 _reader.remove_connection(_tcp_connection); 00149 close_connection(_tcp_connection); 00150 close_connection(_udp_connection); 00151 } 00152 00153 _tcp_connection.clear(); 00154 _udp_connection.clear(); 00155 00156 _is_connected = false; 00157 _got_udp_port = false; 00158 00159 _collectors_reported = 0; 00160 _threads_reported = 0; 00161 } 00162 00163 //////////////////////////////////////////////////////////////////// 00164 // Function: PStatClientImpl::new_frame 00165 // Access: Public 00166 // Description: Called by the PStatThread interface at the beginning 00167 // of every frame, for each thread. This resets the 00168 // clocks for the new frame and transmits the data for 00169 // the previous frame. 00170 //////////////////////////////////////////////////////////////////// 00171 void PStatClientImpl:: 00172 new_frame(int thread_index) { 00173 nassertv(thread_index >= 0 && thread_index < _client->_num_threads); 00174 00175 PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index); 00176 00177 // If we're the main thread, we should exchange control packets with 00178 // the server. 00179 if (thread_index == 0) { 00180 transmit_control_data(); 00181 } 00182 00183 // If we've got the UDP port by the time the frame starts, it's 00184 // time to become active and start actually tracking data. 00185 if (_got_udp_port) { 00186 pthread->_is_active = true; 00187 } 00188 00189 if (!pthread->_is_active) { 00190 return; 00191 } 00192 00193 double frame_start = get_real_time(); 00194 int frame_number = -1; 00195 PStatFrameData frame_data; 00196 00197 if (!pthread->_frame_data.is_empty()) { 00198 // Collector 0 is the whole frame. 00199 _client->stop(0, thread_index, frame_start); 00200 00201 // Fill up the level data for all the collectors who have level 00202 // data for this pthread. 00203 int num_collectors = _client->_num_collectors; 00204 PStatClient::CollectorPointer *collectors = 00205 (PStatClient::CollectorPointer *)_client->_collectors; 00206 for (int i = 0; i < num_collectors; i++) { 00207 const PStatClient::PerThreadData &ptd = 00208 collectors[i]->_per_thread[thread_index]; 00209 if (ptd._has_level) { 00210 pthread->_frame_data.add_level(i, ptd._level); 00211 } 00212 } 00213 pthread->_frame_data.swap(frame_data); 00214 frame_number = pthread->_frame_number; 00215 } 00216 00217 pthread->_frame_data.clear(); 00218 pthread->_frame_number++; 00219 _client->start(0, thread_index, frame_start); 00220 00221 // Also record the time for the PStats operation itself. 00222 int pstats_index = PStatClient::_pstats_pcollector.get_index(); 00223 _client->start(pstats_index, thread_index, frame_start); 00224 00225 if (frame_number != -1) { 00226 transmit_frame_data(thread_index, frame_number, frame_data); 00227 } 00228 _client->stop(pstats_index, thread_index, get_real_time()); 00229 } 00230 00231 //////////////////////////////////////////////////////////////////// 00232 // Function: PStatClientImpl::transmit_frame_data 00233 // Access: Private 00234 // Description: Should be called once per frame per thread to 00235 // transmit the latest data to the PStatServer. 00236 //////////////////////////////////////////////////////////////////// 00237 void PStatClientImpl:: 00238 transmit_frame_data(int thread_index, int frame_number, 00239 const PStatFrameData &frame_data) { 00240 nassertv(thread_index >= 0 && thread_index < _client->_num_threads); 00241 PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index); 00242 if (_is_connected && thread->_is_active) { 00243 00244 // We don't want to send too many packets in a hurry and flood the 00245 // server. Check that enough time has elapsed for us to send a 00246 // new packet. If not, we'll drop this packet on the floor and 00247 // send a new one next time around. 00248 double now = get_real_time(); 00249 if (now >= thread->_next_packet) { 00250 // We don't want to send more than _max_rate UDP-size packets 00251 // per second, per thread. 00252 double packet_delay = 1.0 / _max_rate; 00253 00254 // Send new data. 00255 NetDatagram datagram; 00256 // We always start with a zero byte, to differentiate it from a 00257 // control message. 00258 datagram.add_uint8(0); 00259 00260 datagram.add_uint16(thread_index); 00261 datagram.add_uint32(frame_number); 00262 00263 bool sent; 00264 00265 if (!frame_data.write_datagram(datagram, _client)) { 00266 // Too many events to fit in a single datagram. Maybe it was 00267 // a long frame load or something. Just drop the datagram. 00268 sent = false; 00269 00270 } else if (_writer.is_valid_for_udp(datagram)) { 00271 if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) { 00272 // Send this one as a UDP packet. 00273 nassertv(_got_udp_port); 00274 sent = _writer.send(datagram, _udp_connection, _server); 00275 _udp_count++; 00276 00277 if (_udp_count == 0) { 00278 // Wraparound! 00279 _udp_count = 1; 00280 _tcp_count = 1; 00281 } 00282 00283 } else { 00284 // Send this one as a TCP packet. 00285 sent = _writer.send(datagram, _tcp_connection); 00286 _tcp_count++; 00287 00288 if (_tcp_count == 0) { 00289 // Wraparound! 00290 _udp_count = 1; 00291 _tcp_count = 1; 00292 } 00293 } 00294 00295 } else { 00296 sent = _writer.send(datagram, _tcp_connection); 00297 // If our packets are so large that we must ship them via TCP, 00298 // then artificially slow down the packet rate even further. 00299 int packet_ratio = 00300 (datagram.get_length() + maximum_udp_datagram - 1) / 00301 maximum_udp_datagram; 00302 packet_delay *= (double)packet_ratio; 00303 } 00304 00305 thread->_next_packet = now + packet_delay; 00306 00307 if (!sent) { 00308 if (pstats_cat.is_debug()) { 00309 pstats_cat.debug() 00310 << "Couldn't send packet.\n"; 00311 } 00312 } 00313 } 00314 } 00315 } 00316 00317 //////////////////////////////////////////////////////////////////// 00318 // Function: PStatClientImpl::transmit_control_data 00319 // Access: Private 00320 // Description: Should be called once a frame to exchange control 00321 // information with the server. 00322 //////////////////////////////////////////////////////////////////// 00323 void PStatClientImpl:: 00324 transmit_control_data() { 00325 // Check for new messages from the server. 00326 while (_is_connected && _reader.data_available()) { 00327 NetDatagram datagram; 00328 00329 if (_reader.get_data(datagram)) { 00330 PStatServerControlMessage message; 00331 if (message.decode(datagram)) { 00332 handle_server_control_message(message); 00333 00334 } else { 00335 pstats_cat.error() 00336 << "Got unexpected message from server.\n"; 00337 } 00338 } 00339 } 00340 00341 if (_is_connected) { 00342 report_new_collectors(); 00343 report_new_threads(); 00344 } 00345 } 00346 00347 00348 //////////////////////////////////////////////////////////////////// 00349 // Function: PStatClientImpl::get_hostname 00350 // Access: Private 00351 // Description: Returns the current machine's hostname. 00352 //////////////////////////////////////////////////////////////////// 00353 string PStatClientImpl:: 00354 get_hostname() { 00355 if (_hostname.empty()) { 00356 char temp_buff[1024]; 00357 if (gethostname(temp_buff, 1024) == 0) { 00358 _hostname = temp_buff; 00359 } else { 00360 _hostname = "unknown"; 00361 } 00362 } 00363 return _hostname; 00364 } 00365 00366 //////////////////////////////////////////////////////////////////// 00367 // Function: PStatClientImpl::send_hello 00368 // Access: Private 00369 // Description: Sends the initial greeting message to the server. 00370 //////////////////////////////////////////////////////////////////// 00371 void PStatClientImpl:: 00372 send_hello() { 00373 nassertv(_is_connected); 00374 00375 PStatClientControlMessage message; 00376 message._type = PStatClientControlMessage::T_hello; 00377 message._client_hostname = get_hostname(); 00378 message._client_progname = _client_name; 00379 message._major_version = get_current_pstat_major_version(); 00380 message._minor_version = get_current_pstat_minor_version(); 00381 00382 Datagram datagram; 00383 message.encode(datagram); 00384 _writer.send(datagram, _tcp_connection, true); 00385 } 00386 00387 //////////////////////////////////////////////////////////////////// 00388 // Function: PStatClientImpl::report_new_collectors 00389 // Access: Private 00390 // Description: Sends over any information about new Collectors that 00391 // the user code might have recently created. 00392 //////////////////////////////////////////////////////////////////// 00393 void PStatClientImpl:: 00394 report_new_collectors() { 00395 // Empirically, we determined that you can't send more than about 00396 // 1400 collectors at once without exceeding the 64K limit on a 00397 // single datagram. So we limit ourselves here to sending only 00398 // half that many. 00399 static const int max_collectors_at_once = 700; 00400 00401 while (_is_connected && _collectors_reported < _client->_num_collectors) { 00402 PStatClientControlMessage message; 00403 message._type = PStatClientControlMessage::T_define_collectors; 00404 int i = 0; 00405 while (_collectors_reported < _client->_num_collectors && 00406 i < max_collectors_at_once) { 00407 message._collectors.push_back(_client->get_collector_def(_collectors_reported)); 00408 _collectors_reported++; 00409 i++; 00410 } 00411 00412 Datagram datagram; 00413 message.encode(datagram); 00414 _writer.send(datagram, _tcp_connection, true); 00415 } 00416 } 00417 00418 //////////////////////////////////////////////////////////////////// 00419 // Function: PStatClientImpl::report_new_threads 00420 // Access: Private 00421 // Description: Sends over any information about new Threads that 00422 // the user code might have recently created. 00423 //////////////////////////////////////////////////////////////////// 00424 void PStatClientImpl:: 00425 report_new_threads() { 00426 while (_is_connected && _threads_reported < _client->_num_threads) { 00427 PStatClientControlMessage message; 00428 message._type = PStatClientControlMessage::T_define_threads; 00429 message._first_thread_index = _threads_reported; 00430 PStatClient::ThreadPointer *threads = 00431 (PStatClient::ThreadPointer *)_client->_threads; 00432 while (_threads_reported < _client->_num_threads) { 00433 message._names.push_back(threads[_threads_reported]->_name); 00434 _threads_reported++; 00435 } 00436 00437 Datagram datagram; 00438 message.encode(datagram); 00439 _writer.send(datagram, _tcp_connection, true); 00440 } 00441 } 00442 00443 //////////////////////////////////////////////////////////////////// 00444 // Function: PStatClientImpl::handle_server_control_message 00445 // Access: Private 00446 // Description: Called when a control message has been received by 00447 // the server over the TCP connection. 00448 //////////////////////////////////////////////////////////////////// 00449 void PStatClientImpl:: 00450 handle_server_control_message(const PStatServerControlMessage &message) { 00451 switch (message._type) { 00452 case PStatServerControlMessage::T_hello: 00453 pstats_cat.info() 00454 << "Connected to " << message._server_progname << " on " 00455 << message._server_hostname << "\n"; 00456 00457 _server.set_port(message._udp_port); 00458 _got_udp_port = true; 00459 break; 00460 00461 default: 00462 pstats_cat.error() 00463 << "Invalid control message received from server.\n"; 00464 } 00465 } 00466 00467 //////////////////////////////////////////////////////////////////// 00468 // Function: PStatClientImpl::connection_reset 00469 // Access: Private, Virtual 00470 // Description: Called by the internal net code when the connection 00471 // has been lost. 00472 //////////////////////////////////////////////////////////////////// 00473 void PStatClientImpl:: 00474 connection_reset(const PT(Connection) &connection, bool) { 00475 if (connection == _tcp_connection) { 00476 client_disconnect(); 00477 } else if (connection == _udp_connection) { 00478 pstats_cat.warning() 00479 << "Trouble sending UDP; switching to TCP only.\n"; 00480 _tcp_count_factor = 0.0f; 00481 _udp_count_factor = 1.0f; 00482 } else { 00483 pstats_cat.warning() 00484 << "Ignoring spurious connection_reset() message\n"; 00485 } 00486 } 00487 00488 #endif // DO_PSTATS