Panda3D
|
00001 // Filename: directd.cxx 00002 // Created by: skyler 2002.04.08 00003 // Based on test_tcp_*.* by drose. 00004 // 00005 //////////////////////////////////////////////////////////////////// 00006 // 00007 // PANDA 3D SOFTWARE 00008 // Copyright (c) Carnegie Mellon University. All rights reserved. 00009 // 00010 // All use of this software is subject to the terms of the revised BSD 00011 // license. You should have received a copy of this license along 00012 // with this source code in a file named "LICENSE." 00013 // 00014 //////////////////////////////////////////////////////////////////// 00015 00016 // This define tells the windows headers to include job objects: 00017 #define _WIN32_WINNT 0x0500 00018 00019 #include "directd.h" 00020 /*#include "pandaFramework.h" 00021 #include "queuedConnectionManager.h"*/ 00022 00023 //#include <process.h> 00024 //#include <Windows.h> 00025 //#include "pandabase.h" 00026 00027 //#include "queuedConnectionManager.h" 00028 //#include "queuedConnectionListener.h" 00029 //#include "queuedConnectionReader.h" 00030 //#include "connectionWriter.h" 00031 #include "netAddress.h" 00032 #include "connection.h" 00033 #include "datagramIterator.h" 00034 #include "netDatagram.h" 00035 00036 #include "pset.h" 00037 00038 namespace { 00039 // ...This section is part of the old stuff from the original implementation. 00040 // The new stuff that uses job objects doesn't need this stuff: 00041 00042 // The following is from an MSDN example: 00043 00044 #define TA_FAILED 0 00045 #define TA_SUCCESS_CLEAN 1 00046 #define TA_SUCCESS_KILL 2 00047 #define TA_SUCCESS_16 3 00048 00049 BOOL CALLBACK 00050 TerminateAppEnum(HWND hwnd, LPARAM lParam) { 00051 DWORD dwID; 00052 GetWindowThreadProcessId(hwnd, &dwID); 00053 if(dwID == (DWORD)lParam) { 00054 PostMessage(hwnd, WM_CLOSE, 0, 0); 00055 } 00056 return TRUE; 00057 } 00058 00059 /* 00060 DWORD WINAPI TerminateApp(DWORD dwPID, DWORD dwTimeout) 00061 00062 Purpose: 00063 Shut down a 32-Bit Process 00064 00065 Parameters: 00066 dwPID 00067 Process ID of the process to shut down. 00068 00069 dwTimeout 00070 Wait time in milliseconds before shutting down the process. 00071 00072 Return Value: 00073 TA_FAILED - If the shutdown failed. 00074 TA_SUCCESS_CLEAN - If the process was shutdown using WM_CLOSE. 00075 TA_SUCCESS_KILL - if the process was shut down with 00076 TerminateProcess(). 00077 */ 00078 DWORD WINAPI 00079 TerminateApp(DWORD dwPID, DWORD dwTimeout) { 00080 HANDLE hProc; 00081 DWORD dwRet; 00082 00083 // If we can't open the process with PROCESS_TERMINATE rights, 00084 // then we give up immediately. 00085 hProc = OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, dwPID); 00086 if(hProc == NULL) { 00087 return TA_FAILED; 00088 } 00089 00090 // TerminateAppEnum() posts WM_CLOSE to all windows whose PID 00091 // matches your process's. 00092 EnumWindows((WNDENUMPROC)TerminateAppEnum, (LPARAM)dwPID); 00093 00094 // Wait on the handle. If it signals, great. If it times out, 00095 // then you kill it. 00096 if(WaitForSingleObject(hProc, dwTimeout)!=WAIT_OBJECT_0) { 00097 dwRet=(TerminateProcess(hProc,0)?TA_SUCCESS_KILL:TA_FAILED); 00098 } else { 00099 dwRet = TA_SUCCESS_CLEAN; 00100 } 00101 CloseHandle(hProc); 00102 00103 return dwRet; 00104 } 00105 00106 /* 00107 Start an application with the command line cmd. 00108 returns the process id of the new process/application. 00109 */ 00110 DWORD 00111 StartApp(const string& cmd) { 00112 DWORD pid=0; 00113 STARTUPINFO si; 00114 PROCESS_INFORMATION pi; 00115 ZeroMemory(&si, sizeof(STARTUPINFO)); 00116 si.cb = sizeof(STARTUPINFO); 00117 ZeroMemory(&pi, sizeof(PROCESS_INFORMATION)); 00118 if (CreateProcess(NULL, (char*)cmd.c_str(), 00119 0, 0, 1, NORMAL_PRIORITY_CLASS, 00120 0, 0, &si, &pi)) { 00121 pid=pi.dwProcessId; 00122 CloseHandle(pi.hProcess); 00123 CloseHandle(pi.hThread); 00124 } else { 00125 nout<<"CreateProcess failed: "<<cmd<<endl; 00126 } 00127 return pid; 00128 } 00129 00130 } 00131 00132 DirectD::DirectD() : 00133 _reader(&_cm, 1), _writer(&_cm, 0), _listener(&_cm, 0), 00134 _jobObject(0), _shutdown(false), _useOldStuff(false) { 00135 } 00136 00137 DirectD::~DirectD() { 00138 // Close all the connections: 00139 ConnectionSet::iterator ci; 00140 for (ci = _connections.begin(); ci != _connections.end(); ++ci) { 00141 _cm.close_connection((*ci)); 00142 } 00143 _connections.clear(); 00144 00145 kill_all(); 00146 } 00147 00148 int 00149 DirectD::client_ready(const string& server_host, int port, 00150 const string& cmd) { 00151 stringstream ss; 00152 ss<<"!"<<cmd; 00153 send_one_message(server_host, port, ss.str()); 00154 return 0; 00155 } 00156 00157 int 00158 DirectD::tell_server(const string& server_host, int port, 00159 const string& cmd) { 00160 send_one_message(server_host, port, cmd); 00161 return 0; 00162 } 00163 00164 bool 00165 DirectD::wait_for_servers(int count, int timeout_ms) { 00166 if (count <= 0) { 00167 return true; 00168 } 00169 // The timeout is a rough estimate, we may wait slightly longer. 00170 const int wait_ms=200; 00171 int cycles=timeout_ms/wait_ms; 00172 while (cycles--) { 00173 check_for_new_clients(); 00174 check_for_lost_connection(); 00175 /* 00176 The following can be more generalized with a little work. 00177 check_for_datagrams() could take a handler function as an arugment, maybe. 00178 */ 00179 ////check_for_datagrams(); 00180 // Process all available datagrams. 00181 while (_reader.data_available()) { 00182 NetDatagram datagram; 00183 if (_reader.get_data(datagram)) { 00184 cout << count << ": Server at " << datagram.get_address() 00185 << " is ready." << endl; 00186 datagram.dump_hex(nout); 00187 //handle_datagram(datagram); 00188 DatagramIterator di(datagram); 00189 string s=di.get_string(); 00190 if (s=="r" && !--count) { 00191 return true; 00192 } 00193 } 00194 } 00195 00196 // Yield the timeslice before we poll again. 00197 //PR_Sleep(PR_MillisecondsToInterval(wait_ms)); 00198 Sleep(wait_ms); 00199 } 00200 // We've waited long enough, assume they're not going to be 00201 // ready in the time we want them: 00202 return false; 00203 } 00204 00205 int 00206 DirectD::server_ready(const string& client_host, int port) { 00207 send_one_message(client_host, port, "r"); 00208 return 0; 00209 } 00210 00211 00212 void 00213 DirectD::start_app(const string& cmd) { 00214 nout<<"start_app(cmd="<<cmd<<")"<<endl; 00215 if (_useOldStuff) { 00216 _pids.push_back(StartApp(cmd)); 00217 nout<<" pid="<<_pids.back()<<endl; 00218 } else { 00219 if (!_jobObject) { 00220 _jobObject=CreateJobObject(0, 0); 00221 if (!_jobObject) { 00222 nout<<"CreateProcess failed: no _jobObject: "<<GetLastError()<<endl; 00223 return; 00224 } 00225 } 00226 DWORD pid=0; 00227 STARTUPINFO si; 00228 PROCESS_INFORMATION pi; 00229 ZeroMemory(&si, sizeof(STARTUPINFO)); 00230 si.cb = sizeof(STARTUPINFO); 00231 ZeroMemory(&pi, sizeof(PROCESS_INFORMATION)); 00232 if (CreateProcess(NULL, (char*)cmd.c_str(), 00233 0, 0, 1, NORMAL_PRIORITY_CLASS | CREATE_SUSPENDED, 00234 0, 0, &si, &pi)) { 00235 // The process must be created with CREATE_SUSPENDED to 00236 // give us a chance to get the handle into our sgJobObject 00237 // before the child processes starts sub-processes. 00238 if (!AssignProcessToJobObject(_jobObject, pi.hProcess)) { 00239 // ...The assign failed. 00240 cerr<<"StartJob AssignProcessToJobObject Error: "<<GetLastError()<<endl; 00241 } 00242 CloseHandle(pi.hProcess); //????? 00243 // Because we called CreateProcess with the CREATE_SUSPEND flag, 00244 // we must explicitly resume the processes main thread. 00245 if (ResumeThread(pi.hThread) == -1) { 00246 cerr<<"StartJob ResumeThread Error: "<<GetLastError()<<endl; 00247 } 00248 CloseHandle(pi.hThread); 00249 } else { 00250 nout<<"StartJob CreateProcess failed: "<<cmd<<endl; 00251 } 00252 } 00253 } 00254 00255 void 00256 DirectD::kill_app(int index) { 00257 if (_useOldStuff) { 00258 int i = _pids.size() - 1 - index % _pids.size(); 00259 PidStack::iterator pi = _pids.begin() + i; 00260 if (pi!=_pids.end()) { 00261 nout<<"trying kill "<<(*pi)<<endl; 00262 TerminateApp((*pi), 1000); 00263 _pids.erase(pi); 00264 } 00265 } else { 00266 cerr<<"kill_app(index) not implemented, calling kill_all() instead."<<endl; 00267 kill_all(); 00268 } 00269 } 00270 00271 void 00272 DirectD::kill_all() { 00273 if (_useOldStuff) { 00274 PidStack::reverse_iterator pi; 00275 for (pi = _pids.rbegin(); pi != _pids.rend(); ++pi) { 00276 nout<<"trying kill "<<(*pi)<<endl; 00277 TerminateApp((*pi), 1000); 00278 } 00279 _pids.clear(); 00280 } else { 00281 if (!_jobObject) { 00282 cerr<<"kill_all(): No open _jobObject"<<endl; 00283 } else if (!TerminateJobObject(_jobObject, 0)) { 00284 cerr<<"kill_all() TerminateJobObject Error: "<<GetLastError()<<endl; 00285 } 00286 CloseHandle(_jobObject); 00287 _jobObject=0; 00288 } 00289 } 00290 00291 void 00292 DirectD::send_command(const string& cmd) { 00293 NetDatagram datagram; 00294 datagram.add_string(cmd); 00295 // Send the datagram. 00296 ConnectionSet::iterator ci; 00297 for (ci = _connections.begin(); ci != _connections.end(); ++ci) { 00298 _writer.send(datagram, (*ci)); 00299 } 00300 } 00301 00302 void 00303 DirectD::handle_datagram(NetDatagram& datagram){ 00304 DatagramIterator di(datagram); 00305 string cmd=di.get_string(); 00306 handle_command(cmd); 00307 } 00308 00309 void 00310 DirectD::handle_command(const string& cmd) { 00311 nout<<"DirectD::handle_command: "<<cmd<<endl; 00312 } 00313 00314 void 00315 DirectD::send_one_message(const string& host_name, 00316 int port, 00317 const string& message) { 00318 NetAddress host; 00319 if (!host.set_host(host_name, port)) { 00320 nout << "Unknown host: " << host_name << "\n"; 00321 } 00322 00323 const int timeout_ms=5000; 00324 PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms); 00325 if (c.is_null()) { 00326 nout << "No connection.\n"; 00327 return; 00328 } 00329 00330 nout << "Successfully opened TCP connection to " << host_name 00331 << " on port " 00332 << c->get_address().get_port() << " and IP " 00333 << c->get_address() << "\n"; 00334 00335 //_reader.add_connection(c); 00336 00337 NetDatagram datagram; 00338 datagram.add_string(message); 00339 _writer.send(datagram, c); 00340 00341 //PR_Sleep(PR_MillisecondsToInterval(200)); 00342 //wait_for_servers(1, 10*1000); 00343 //_reader.remove_connection(c); 00344 _cm.close_connection(c); 00345 } 00346 00347 int 00348 DirectD::connect_to(const string& host_name, int port) { 00349 NetAddress host; 00350 if (!host.set_host(host_name, port)) { 00351 nout << "Unknown host: " << host_name << "\n"; 00352 } 00353 00354 const int timeout_ms=5000; 00355 PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms); 00356 if (c.is_null()) { 00357 nout << "No connection.\n"; 00358 return 0; 00359 } 00360 00361 nout << "Successfully opened TCP connection to " << host_name 00362 << " on port " 00363 << c->get_address().get_port() << " and IP " 00364 << c->get_address() << "\n"; 00365 00366 _reader.add_connection(c); 00367 _connections.insert(c); 00368 return c->get_address().get_port(); 00369 } 00370 00371 void 00372 DirectD::disconnect_from(const string& host_name, int port) { 00373 nout<<"disconnect_from(\""<<host_name<<", port="<<port<<")"<<endl; 00374 for (ConnectionSet::iterator i=_connections.begin(); i != _connections.end(); ++i) { 00375 nout<<" found "<<(*i)->get_address().get_ip_string()<<", port "<<(*i)->get_address().get_port()<<endl; 00376 if ((*i)->get_address().get_ip_string()==host_name) { 00377 nout<<" disconnecting."<<endl; 00378 _reader.remove_connection((*i)); 00379 _cm.close_connection((*i)); 00380 _connections.erase(i); 00381 break; 00382 } 00383 } 00384 } 00385 00386 void 00387 DirectD::check_for_lost_connection() { 00388 while (_cm.reset_connection_available()) { 00389 PT(Connection) c; 00390 if (_cm.get_reset_connection(c)) { 00391 nout<<"Lost connection from "<<c->get_address()<<endl; 00392 _connections.erase(c); 00393 _cm.close_connection(c); 00394 } 00395 } 00396 } 00397 00398 void 00399 DirectD::check_for_datagrams(){ 00400 // Process all available datagrams. 00401 while (_reader.data_available()) { 00402 NetDatagram datagram; 00403 if (_reader.get_data(datagram)) { 00404 nout << "Got datagram " /*<< datagram <<*/ "from " 00405 << datagram.get_address() << endl; 00406 datagram.dump_hex(nout); 00407 handle_datagram(datagram); 00408 } 00409 } 00410 } 00411 00412 void 00413 DirectD::listen_to(int port, int backlog) { 00414 PT(Connection) rendezvous = _cm.open_TCP_server_rendezvous(port, backlog); 00415 if (rendezvous.is_null()) { 00416 nout << "Cannot grab port " << port << ".\n"; 00417 exit(1); 00418 } 00419 nout << "Listening for connections on port " << port << "\n"; 00420 _listener.add_connection(rendezvous); 00421 } 00422 00423 void 00424 DirectD::check_for_new_clients() { 00425 while (_listener.new_connection_available()) { 00426 PT(Connection) rv; 00427 NetAddress address; 00428 PT(Connection) new_connection; 00429 if (_listener.get_new_connection(rv, address, new_connection)) { 00430 nout << "Got connection from " << address << "\n"; 00431 _reader.add_connection(new_connection); 00432 _connections.insert(new_connection); 00433 } 00434 } 00435 }