00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "pStatReader.h"
00016 #include "pStatServer.h"
00017 #include "pStatMonitor.h"
00018
00019 #include "pStatClientControlMessage.h"
00020 #include "pStatServerControlMessage.h"
00021 #include "pStatFrameData.h"
00022 #include "pStatProperties.h"
00023 #include "datagram.h"
00024 #include "datagramIterator.h"
00025 #include "connectionManager.h"
00026
00027
00028
00029
00030
00031
00032 PStatReader::
00033 PStatReader(PStatServer *manager, PStatMonitor *monitor) :
00034 #ifdef HAVE_THREADS
00035 ConnectionReader(manager, monitor->is_thread_safe() ? 1 : 0),
00036 #else
00037 ConnectionReader(manager, 0),
00038 #endif
00039 _manager(manager),
00040 _monitor(monitor),
00041 _writer(manager, 0)
00042 {
00043 set_tcp_header_size(4);
00044 _writer.set_tcp_header_size(4);
00045 _udp_port = 0;
00046 _client_data = new PStatClientData(this);
00047 _monitor->set_client_data(_client_data);
00048 }
00049
00050
00051
00052
00053
00054
00055 PStatReader::
00056 ~PStatReader() {
00057 _manager->release_udp_port(_udp_port);
00058 }
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 void PStatReader::
00069 close() {
00070 _manager->remove_reader(_tcp_connection, this);
00071 lost_connection();
00072 }
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082 void PStatReader::
00083 set_tcp_connection(Connection *tcp_connection) {
00084 _tcp_connection = tcp_connection;
00085 add_connection(_tcp_connection);
00086
00087 _udp_port = _manager->get_udp_port();
00088 _udp_connection = _manager->open_UDP_connection(_udp_port);
00089 while (_udp_connection.is_null()) {
00090
00091 _udp_port = _manager->get_udp_port();
00092 _udp_connection = _manager->open_UDP_connection(_udp_port);
00093 }
00094
00095 add_connection(_udp_connection);
00096
00097 send_hello();
00098 }
00099
00100
00101
00102
00103
00104
00105
00106
00107 void PStatReader::
00108 lost_connection() {
00109 _client_data->_is_alive = false;
00110 _monitor->lost_connection();
00111 _client_data.clear();
00112
00113 _manager->close_connection(_tcp_connection);
00114 _manager->close_connection(_udp_connection);
00115 _tcp_connection.clear();
00116 _udp_connection.clear();
00117 }
00118
00119
00120
00121
00122
00123
00124
00125 void PStatReader::
00126 idle() {
00127 dequeue_frame_data();
00128 _monitor->idle();
00129 }
00130
00131
00132
00133
00134
00135
00136 PStatMonitor *PStatReader::
00137 get_monitor() {
00138 return _monitor;
00139 }
00140
00141
00142
00143
00144
00145
00146 string PStatReader::
00147 get_hostname() {
00148 if (_hostname.empty()) {
00149 _hostname = ConnectionManager::get_host_name();
00150 if (_hostname.empty()) {
00151 _hostname = "unknown";
00152 }
00153 }
00154 return _hostname;
00155 }
00156
00157
00158
00159
00160
00161
00162 void PStatReader::
00163 send_hello() {
00164 PStatServerControlMessage message;
00165 message._type = PStatServerControlMessage::T_hello;
00166 message._server_hostname = get_hostname();
00167 message._server_progname = _monitor->get_monitor_name();
00168 message._udp_port = _udp_port;
00169
00170 Datagram datagram;
00171 message.encode(datagram);
00172 _writer.send(datagram, _tcp_connection);
00173 }
00174
00175
00176
00177
00178
00179
00180
00181 void PStatReader::
00182 receive_datagram(const NetDatagram &datagram) {
00183 Connection *connection = datagram.get_connection();
00184
00185 if (connection == _tcp_connection) {
00186 PStatClientControlMessage message;
00187 if (message.decode(datagram, _client_data)) {
00188 handle_client_control_message(message);
00189
00190 } else if (message._type == PStatClientControlMessage::T_datagram) {
00191 handle_client_udp_data(datagram);
00192
00193 } else {
00194 nout << "Got unexpected message from client.\n";
00195 }
00196
00197 } else if (connection == _udp_connection) {
00198 handle_client_udp_data(datagram);
00199
00200 } else {
00201 nout << "Got datagram from unexpected socket.\n";
00202 }
00203 }
00204
00205
00206
00207
00208
00209
00210
00211 void PStatReader::
00212 handle_client_control_message(const PStatClientControlMessage &message) {
00213 switch (message._type) {
00214 case PStatClientControlMessage::T_hello:
00215 {
00216 _client_data->set_version(message._major_version, message._minor_version);
00217 int server_major_version = get_current_pstat_major_version();
00218 int server_minor_version = get_current_pstat_minor_version();
00219
00220 if (message._major_version != server_major_version ||
00221 (message._major_version == server_major_version &&
00222 message._minor_version > server_minor_version)) {
00223 _monitor->bad_version(message._client_hostname, message._client_progname,
00224 message._major_version, message._minor_version,
00225 server_major_version, server_minor_version);
00226 _monitor->close();
00227 } else {
00228 _monitor->hello_from(message._client_hostname, message._client_progname);
00229 }
00230 }
00231 break;
00232
00233 case PStatClientControlMessage::T_define_collectors:
00234 {
00235 for (int i = 0; i < (int)message._collectors.size(); i++) {
00236 _client_data->add_collector(message._collectors[i]);
00237 _monitor->new_collector(message._collectors[i]->_index);
00238 }
00239 }
00240 break;
00241
00242 case PStatClientControlMessage::T_define_threads:
00243 {
00244 for (int i = 0; i < (int)message._names.size(); i++) {
00245 int thread_index = message._first_thread_index + i;
00246 string name = message._names[i];
00247 _client_data->define_thread(thread_index, name);
00248 _monitor->new_thread(thread_index);
00249 }
00250 }
00251 break;
00252
00253 default:
00254 nout << "Invalid control message received from client.\n";
00255 }
00256 }
00257
00258
00259
00260
00261
00262
00263
00264
00265 void PStatReader::
00266 handle_client_udp_data(const Datagram &datagram) {
00267 if (!_monitor->is_client_known()) {
00268
00269
00270
00271
00272 return;
00273 }
00274
00275 DatagramIterator source(datagram);
00276
00277 if (_client_data->is_at_least(2, 1)) {
00278
00279 int initial_byte = source.get_uint8();
00280 nassertv(initial_byte == 0);
00281 }
00282
00283 if (!_queued_frame_data.full()) {
00284 FrameData data;
00285 data._thread_index = source.get_uint16();
00286 data._frame_number = source.get_uint32();
00287 data._frame_data = new PStatFrameData;
00288 data._frame_data->read_datagram(source, _client_data);
00289
00290
00291
00292 _queued_frame_data.push_back(data);
00293 }
00294 }
00295
00296
00297
00298
00299
00300
00301
00302
00303 void PStatReader::
00304 dequeue_frame_data() {
00305 while (!_queued_frame_data.empty()) {
00306 const FrameData &data = _queued_frame_data.front();
00307 nassertv(_client_data != (PStatClientData *)NULL);
00308
00309
00310 int num_levels = data._frame_data->get_num_levels();
00311 for (int i = 0; i < num_levels; i++) {
00312 int collector_index = data._frame_data->get_level_collector(i);
00313 if (!_client_data->get_collector_has_level(collector_index, data._thread_index)) {
00314
00315
00316 _client_data->set_collector_has_level(collector_index, data._thread_index, true);
00317 _monitor->new_collector(collector_index);
00318 }
00319 }
00320
00321 _client_data->record_new_frame(data._thread_index,
00322 data._frame_number,
00323 data._frame_data);
00324 _monitor->new_data(data._thread_index, data._frame_number);
00325
00326 _queued_frame_data.pop_front();
00327 }
00328 }
00329