Panda3D
 All Classes Functions Variables Enumerations
pStatClientImpl.cxx
00001 // Filename: pStatClientImpl.cxx
00002 // Created by:  drose (23Dec04)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00008 //
00009 // All use of this software is subject to the terms of the revised BSD
00010 // license.  You should have received a copy of this license along
00011 // with this source code in a file named "LICENSE."
00012 //
00013 ////////////////////////////////////////////////////////////////////
00014 
00015 #include "pStatClientImpl.h"
00016 
00017 // This file only defines anything if DO_PSTATS is defined.
00018 #ifdef DO_PSTATS
00019 
00020 #include "pStatClient.h"
00021 #include "pStatClientControlMessage.h"
00022 #include "pStatServerControlMessage.h"
00023 #include "pStatCollector.h"
00024 #include "pStatThread.h"
00025 #include "config_pstats.h"
00026 #include "pStatProperties.h"
00027 #include "cmath.h"
00028 
00029 #include <algorithm>
00030 
00031 #if defined(WIN32_VC) || defined(WIN64_VC)
00032 #include <Winsock2.h>
00033 #include <windows.h>
00034 #endif
00035 
00036 ////////////////////////////////////////////////////////////////////
00037 //     Function: PStatClientImpl::Constructor
00038 //       Access: Public
00039 //  Description:
00040 ////////////////////////////////////////////////////////////////////
00041 PStatClientImpl::
00042 PStatClientImpl(PStatClient *client) :
00043   _clock(TrueClock::get_global_ptr()),
00044   _delta(0.0),
00045   _last_frame(0.0),
00046   _client(client),
00047   _reader(this, 0),
00048   _writer(this, pstats_threaded_write ? 1 : 0)
00049 {
00050   _writer.set_max_queue_size(pstats_max_queue_size); 
00051   _reader.set_tcp_header_size(4);
00052   _writer.set_tcp_header_size(4);
00053   _is_connected = false;
00054   _got_udp_port = false;
00055   _collectors_reported = 0;
00056   _threads_reported = 0;
00057 
00058   _client_name = pstats_name;
00059   _max_rate = pstats_max_rate;
00060 
00061   _tcp_count = 1;
00062   _udp_count = 1;
00063 
00064   if (pstats_tcp_ratio >= 1.0f) {
00065     _tcp_count_factor = 0.0f;
00066     _udp_count_factor = 1.0f;
00067 
00068   } else if (pstats_tcp_ratio <= 0.0f) {
00069     _tcp_count_factor = 1.0f;
00070     _udp_count_factor = 0.0f;
00071 
00072   } else {
00073     csincos(pstats_tcp_ratio * (3.14159265f / 2.0f),
00074             &_udp_count_factor,
00075             &_tcp_count_factor);
00076   }
00077 }
00078 
00079 ////////////////////////////////////////////////////////////////////
00080 //     Function: PStatClientImpl::Destructor
00081 //       Access: Public
00082 //  Description:
00083 ////////////////////////////////////////////////////////////////////
00084 PStatClientImpl::
00085 ~PStatClientImpl() {
00086   nassertv(!_is_connected);
00087 }
00088 
00089 ////////////////////////////////////////////////////////////////////
00090 //     Function: PStatClientImpl::client_connect
00091 //       Access: Public
00092 //  Description: Called only by PStatClient::client_connect().
00093 ////////////////////////////////////////////////////////////////////
00094 bool PStatClientImpl::
00095 client_connect(string hostname, int port) {
00096   nassertr(!_is_connected, true);
00097 
00098   if (hostname.empty()) {
00099     hostname = pstats_host;
00100   }
00101   if (port < 0) {
00102     port = pstats_port;
00103   }
00104 
00105   if (!_server.set_host(hostname, port)) {
00106     pstats_cat.error()
00107       << "Unknown host: " << hostname << "\n";
00108     return false;
00109   }
00110 
00111   _tcp_connection = open_TCP_client_connection(_server, 5000);
00112 
00113   if (_tcp_connection.is_null()) {
00114     pstats_cat.error()
00115       << "Couldn't connect to PStatServer at " << hostname << ":"
00116       << port << "\n";
00117     return false;
00118   }
00119   // Make sure we're not queuing up multiple TCP sockets--we expect
00120   // immediate writes of our TCP datagrams.
00121   _tcp_connection->set_collect_tcp(false);
00122 
00123   _reader.add_connection(_tcp_connection);
00124   _is_connected = true;
00125 
00126   _udp_connection = open_UDP_connection();
00127 
00128   send_hello();
00129 
00130 #ifdef DEBUG_THREADS
00131   MutexDebug::increment_pstats();
00132 #endif // DEBUG_THREADS
00133 
00134   return _is_connected;
00135 }
00136 
00137 ////////////////////////////////////////////////////////////////////
00138 //     Function: PStatClientImpl::client_disconnect
00139 //       Access: Public
00140 //  Description: Called only by PStatClient::client_disconnect().
00141 ////////////////////////////////////////////////////////////////////
00142 void PStatClientImpl::
00143 client_disconnect() {
00144   if (_is_connected) {
00145 #ifdef DEBUG_THREADS
00146     MutexDebug::decrement_pstats();
00147 #endif // DEBUG_THREADS
00148     _reader.remove_connection(_tcp_connection);
00149     close_connection(_tcp_connection);
00150     close_connection(_udp_connection);
00151   }
00152 
00153   _tcp_connection.clear();
00154   _udp_connection.clear();
00155 
00156   _is_connected = false;
00157   _got_udp_port = false;
00158 
00159   _collectors_reported = 0;
00160   _threads_reported = 0;
00161 }
00162 
00163 ////////////////////////////////////////////////////////////////////
00164 //     Function: PStatClientImpl::new_frame
00165 //       Access: Public
00166 //  Description: Called by the PStatThread interface at the beginning
00167 //               of every frame, for each thread.  This resets the
00168 //               clocks for the new frame and transmits the data for
00169 //               the previous frame.
00170 ////////////////////////////////////////////////////////////////////
00171 void PStatClientImpl::
00172 new_frame(int thread_index) {
00173   nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
00174 
00175   PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
00176 
00177   // If we're the main thread, we should exchange control packets with
00178   // the server.
00179   if (thread_index == 0) {
00180     transmit_control_data();
00181   }
00182 
00183   // If we've got the UDP port by the time the frame starts, it's
00184   // time to become active and start actually tracking data.
00185   if (_got_udp_port) {
00186     pthread->_is_active = true;
00187   }
00188 
00189   if (!pthread->_is_active) {
00190     return;
00191   }
00192 
00193   double frame_start = get_real_time();
00194   int frame_number = -1;
00195   PStatFrameData frame_data;
00196 
00197   if (!pthread->_frame_data.is_empty()) {
00198     // Collector 0 is the whole frame.
00199     _client->stop(0, thread_index, frame_start);
00200 
00201     // Fill up the level data for all the collectors who have level
00202     // data for this pthread.
00203     int num_collectors = _client->_num_collectors;
00204     PStatClient::CollectorPointer *collectors = 
00205       (PStatClient::CollectorPointer *)_client->_collectors;
00206     for (int i = 0; i < num_collectors; i++) {
00207       const PStatClient::PerThreadData &ptd = 
00208         collectors[i]->_per_thread[thread_index];
00209       if (ptd._has_level) {
00210         pthread->_frame_data.add_level(i, ptd._level);
00211       }
00212     }
00213     pthread->_frame_data.swap(frame_data);
00214     frame_number = pthread->_frame_number;
00215   }
00216 
00217   pthread->_frame_data.clear();
00218   pthread->_frame_number++;
00219   _client->start(0, thread_index, frame_start);
00220 
00221   // Also record the time for the PStats operation itself.
00222   int pstats_index = PStatClient::_pstats_pcollector.get_index();
00223   _client->start(pstats_index, thread_index, frame_start);
00224 
00225   if (frame_number != -1) {
00226     transmit_frame_data(thread_index, frame_number, frame_data);
00227   }
00228   _client->stop(pstats_index, thread_index, get_real_time());
00229 }
00230 
00231 ////////////////////////////////////////////////////////////////////
00232 //     Function: PStatClientImpl::transmit_frame_data
00233 //       Access: Private
00234 //  Description: Should be called once per frame per thread to
00235 //               transmit the latest data to the PStatServer.
00236 ////////////////////////////////////////////////////////////////////
00237 void PStatClientImpl::
00238 transmit_frame_data(int thread_index, int frame_number, 
00239                     const PStatFrameData &frame_data) {
00240   nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
00241   PStatClient::InternalThread *thread = _client->get_thread_ptr(thread_index);
00242   if (_is_connected && thread->_is_active) {
00243 
00244     // We don't want to send too many packets in a hurry and flood the
00245     // server.  Check that enough time has elapsed for us to send a
00246     // new packet.  If not, we'll drop this packet on the floor and
00247     // send a new one next time around.
00248     double now = get_real_time();
00249     if (now >= thread->_next_packet) {
00250       // We don't want to send more than _max_rate UDP-size packets
00251       // per second, per thread.
00252       double packet_delay = 1.0 / _max_rate;
00253 
00254       // Send new data.
00255       NetDatagram datagram;
00256       // We always start with a zero byte, to differentiate it from a
00257       // control message.
00258       datagram.add_uint8(0);
00259 
00260       datagram.add_uint16(thread_index);
00261       datagram.add_uint32(frame_number);
00262 
00263       bool sent;
00264 
00265       if (!frame_data.write_datagram(datagram, _client)) {
00266         // Too many events to fit in a single datagram.  Maybe it was
00267         // a long frame load or something.  Just drop the datagram.
00268         sent = false;
00269 
00270       } else if (_writer.is_valid_for_udp(datagram)) {
00271         if (_udp_count * _udp_count_factor < _tcp_count * _tcp_count_factor) {
00272           // Send this one as a UDP packet.
00273           nassertv(_got_udp_port);
00274           sent = _writer.send(datagram, _udp_connection, _server);
00275           _udp_count++;
00276 
00277           if (_udp_count == 0) {
00278             // Wraparound!
00279             _udp_count = 1;
00280             _tcp_count = 1;
00281           }
00282 
00283         } else {
00284           // Send this one as a TCP packet.
00285           sent = _writer.send(datagram, _tcp_connection);
00286           _tcp_count++;
00287 
00288           if (_tcp_count == 0) {
00289             // Wraparound!
00290             _udp_count = 1;
00291             _tcp_count = 1;
00292           }
00293         }
00294 
00295       } else {
00296         sent = _writer.send(datagram, _tcp_connection);
00297         // If our packets are so large that we must ship them via TCP,
00298         // then artificially slow down the packet rate even further.
00299         int packet_ratio =
00300           (datagram.get_length() + maximum_udp_datagram - 1) /
00301           maximum_udp_datagram;
00302         packet_delay *= (double)packet_ratio;
00303       }
00304 
00305       thread->_next_packet = now + packet_delay;
00306 
00307       if (!sent) {
00308         if (pstats_cat.is_debug()) {
00309           pstats_cat.debug()
00310             << "Couldn't send packet.\n";
00311         }
00312       }
00313     }
00314   }
00315 }
00316 
00317 ////////////////////////////////////////////////////////////////////
00318 //     Function: PStatClientImpl::transmit_control_data
00319 //       Access: Private
00320 //  Description: Should be called once a frame to exchange control
00321 //               information with the server.
00322 ////////////////////////////////////////////////////////////////////
00323 void PStatClientImpl::
00324 transmit_control_data() {
00325   // Check for new messages from the server.
00326   while (_is_connected && _reader.data_available()) {
00327     NetDatagram datagram;
00328 
00329     if (_reader.get_data(datagram)) {
00330       PStatServerControlMessage message;
00331       if (message.decode(datagram)) {
00332         handle_server_control_message(message);
00333 
00334       } else {
00335         pstats_cat.error()
00336           << "Got unexpected message from server.\n";
00337       }
00338     }
00339   }
00340 
00341   if (_is_connected) {
00342     report_new_collectors();
00343     report_new_threads();
00344   }
00345 }
00346 
00347 
00348 ////////////////////////////////////////////////////////////////////
00349 //     Function: PStatClientImpl::get_hostname
00350 //       Access: Private
00351 //  Description: Returns the current machine's hostname.
00352 ////////////////////////////////////////////////////////////////////
00353 string PStatClientImpl::
00354 get_hostname() {
00355   if (_hostname.empty()) {
00356     char temp_buff[1024];
00357     if (gethostname(temp_buff, 1024) == 0) {
00358       _hostname = temp_buff;
00359     } else {
00360       _hostname = "unknown";
00361     }
00362   }
00363   return _hostname;
00364 }
00365 
00366 ////////////////////////////////////////////////////////////////////
00367 //     Function: PStatClientImpl::send_hello
00368 //       Access: Private
00369 //  Description: Sends the initial greeting message to the server.
00370 ////////////////////////////////////////////////////////////////////
00371 void PStatClientImpl::
00372 send_hello() {
00373   nassertv(_is_connected);
00374 
00375   PStatClientControlMessage message;
00376   message._type = PStatClientControlMessage::T_hello;
00377   message._client_hostname = get_hostname();
00378   message._client_progname = _client_name;
00379   message._major_version = get_current_pstat_major_version();
00380   message._minor_version = get_current_pstat_minor_version();
00381 
00382   Datagram datagram;
00383   message.encode(datagram);
00384   _writer.send(datagram, _tcp_connection, true);
00385 }
00386 
00387 ////////////////////////////////////////////////////////////////////
00388 //     Function: PStatClientImpl::report_new_collectors
00389 //       Access: Private
00390 //  Description: Sends over any information about new Collectors that
00391 //               the user code might have recently created.
00392 ////////////////////////////////////////////////////////////////////
00393 void PStatClientImpl::
00394 report_new_collectors() {
00395   // Empirically, we determined that you can't send more than about
00396   // 1400 collectors at once without exceeding the 64K limit on a
00397   // single datagram.  So we limit ourselves here to sending only
00398   // half that many.
00399   static const int max_collectors_at_once = 700;
00400   
00401   while (_is_connected && _collectors_reported < _client->_num_collectors) {
00402     PStatClientControlMessage message;
00403     message._type = PStatClientControlMessage::T_define_collectors;
00404     int i = 0;
00405     while (_collectors_reported < _client->_num_collectors &&
00406            i < max_collectors_at_once) {
00407       message._collectors.push_back(_client->get_collector_def(_collectors_reported));
00408       _collectors_reported++;
00409       i++;
00410     }
00411     
00412     Datagram datagram;
00413     message.encode(datagram);
00414     _writer.send(datagram, _tcp_connection, true);
00415   }
00416 }
00417 
00418 ////////////////////////////////////////////////////////////////////
00419 //     Function: PStatClientImpl::report_new_threads
00420 //       Access: Private
00421 //  Description: Sends over any information about new Threads that
00422 //               the user code might have recently created.
00423 ////////////////////////////////////////////////////////////////////
00424 void PStatClientImpl::
00425 report_new_threads() {
00426   while (_is_connected && _threads_reported < _client->_num_threads) {
00427     PStatClientControlMessage message;
00428     message._type = PStatClientControlMessage::T_define_threads;
00429     message._first_thread_index = _threads_reported;
00430     PStatClient::ThreadPointer *threads = 
00431       (PStatClient::ThreadPointer *)_client->_threads;
00432     while (_threads_reported < _client->_num_threads) {
00433       message._names.push_back(threads[_threads_reported]->_name);
00434       _threads_reported++;
00435     }
00436 
00437     Datagram datagram;
00438     message.encode(datagram);
00439     _writer.send(datagram, _tcp_connection, true);
00440   }
00441 }
00442 
00443 ////////////////////////////////////////////////////////////////////
00444 //     Function: PStatClientImpl::handle_server_control_message
00445 //       Access: Private
00446 //  Description: Called when a control message has been received by
00447 //               the server over the TCP connection.
00448 ////////////////////////////////////////////////////////////////////
00449 void PStatClientImpl::
00450 handle_server_control_message(const PStatServerControlMessage &message) {
00451   switch (message._type) {
00452   case PStatServerControlMessage::T_hello:
00453     pstats_cat.info()
00454       << "Connected to " << message._server_progname << " on "
00455       << message._server_hostname << "\n";
00456 
00457     _server.set_port(message._udp_port);
00458     _got_udp_port = true;
00459     break;
00460 
00461   default:
00462     pstats_cat.error()
00463       << "Invalid control message received from server.\n";
00464   }
00465 }
00466 
00467 ////////////////////////////////////////////////////////////////////
00468 //     Function: PStatClientImpl::connection_reset
00469 //       Access: Private, Virtual
00470 //  Description: Called by the internal net code when the connection
00471 //               has been lost.
00472 ////////////////////////////////////////////////////////////////////
00473 void PStatClientImpl::
00474 connection_reset(const PT(Connection) &connection, bool) {
00475   if (connection == _tcp_connection) {
00476     client_disconnect();
00477   } else if (connection == _udp_connection) {
00478     pstats_cat.warning()
00479       << "Trouble sending UDP; switching to TCP only.\n";
00480     _tcp_count_factor = 0.0f;
00481     _udp_count_factor = 1.0f;
00482   } else {
00483     pstats_cat.warning()
00484       << "Ignoring spurious connection_reset() message\n";
00485   }
00486 }
00487 
00488 #endif // DO_PSTATS
 All Classes Functions Variables Enumerations