Panda3D
 All Classes Functions Variables Enumerations
directd.cxx
00001 // Filename: directd.cxx
00002 // Created by:  skyler 2002.04.08
00003 // Based on test_tcp_*.* by drose.
00004 //
00005 ////////////////////////////////////////////////////////////////////
00006 //
00007 // PANDA 3D SOFTWARE
00008 // Copyright (c) Carnegie Mellon University.  All rights reserved.
00009 //
00010 // All use of this software is subject to the terms of the revised BSD
00011 // license.  You should have received a copy of this license along
00012 // with this source code in a file named "LICENSE."
00013 //
00014 ////////////////////////////////////////////////////////////////////
00015 
00016 // This define tells the windows headers to include job objects:
00017 #define _WIN32_WINNT 0x0500
00018 
00019 #include "directd.h"
00020 /*#include "pandaFramework.h"
00021 #include "queuedConnectionManager.h"*/
00022 
00023 //#include <process.h>
00024 //#include <Windows.h>
00025 //#include "pandabase.h"
00026 
00027 //#include "queuedConnectionManager.h"
00028 //#include "queuedConnectionListener.h"
00029 //#include "queuedConnectionReader.h"
00030 //#include "connectionWriter.h"
00031 #include "netAddress.h"
00032 #include "connection.h"
00033 #include "datagramIterator.h"
00034 #include "netDatagram.h"
00035 
00036 #include "pset.h"
00037 
00038 namespace {
00039   // ...This section is part of the old stuff from the original implementation.
00040   // The new stuff that uses job objects doesn't need this stuff:
00041 
00042   // The following is from an MSDN example:
00043   
00044   #define TA_FAILED 0
00045   #define TA_SUCCESS_CLEAN 1
00046   #define TA_SUCCESS_KILL 2
00047   #define TA_SUCCESS_16 3
00048 
00049   BOOL CALLBACK
00050   TerminateAppEnum(HWND hwnd, LPARAM lParam) {
00051     DWORD dwID;
00052     GetWindowThreadProcessId(hwnd, &dwID);
00053     if(dwID == (DWORD)lParam) {
00054        PostMessage(hwnd, WM_CLOSE, 0, 0);
00055     }
00056     return TRUE;
00057   }
00058 
00059   /*
00060       DWORD WINAPI TerminateApp(DWORD dwPID, DWORD dwTimeout)
00061 
00062       Purpose:
00063         Shut down a 32-Bit Process
00064 
00065       Parameters:
00066         dwPID
00067            Process ID of the process to shut down.
00068 
00069         dwTimeout
00070            Wait time in milliseconds before shutting down the process.
00071 
00072       Return Value:
00073         TA_FAILED - If the shutdown failed.
00074         TA_SUCCESS_CLEAN - If the process was shutdown using WM_CLOSE.
00075         TA_SUCCESS_KILL - if the process was shut down with
00076            TerminateProcess().
00077   */ 
00078   DWORD WINAPI
00079   TerminateApp(DWORD dwPID, DWORD dwTimeout) {
00080     HANDLE   hProc;
00081     DWORD   dwRet;
00082 
00083     // If we can't open the process with PROCESS_TERMINATE rights,
00084     // then we give up immediately.
00085     hProc = OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, dwPID);
00086     if(hProc == NULL) {
00087        return TA_FAILED;
00088     }
00089 
00090     // TerminateAppEnum() posts WM_CLOSE to all windows whose PID
00091     // matches your process's.
00092     EnumWindows((WNDENUMPROC)TerminateAppEnum, (LPARAM)dwPID);
00093 
00094     // Wait on the handle. If it signals, great. If it times out,
00095     // then you kill it.
00096     if(WaitForSingleObject(hProc, dwTimeout)!=WAIT_OBJECT_0) {
00097        dwRet=(TerminateProcess(hProc,0)?TA_SUCCESS_KILL:TA_FAILED);
00098     } else {
00099        dwRet = TA_SUCCESS_CLEAN;
00100     }
00101     CloseHandle(hProc);
00102 
00103     return dwRet;
00104   }
00105 
00106   /*
00107       Start an application with the command line cmd.
00108       returns the process id of the new process/application.
00109   */
00110   DWORD
00111   StartApp(const string& cmd) {
00112     DWORD pid=0;
00113     STARTUPINFO si; 
00114     PROCESS_INFORMATION pi; 
00115     ZeroMemory(&si, sizeof(STARTUPINFO));
00116     si.cb = sizeof(STARTUPINFO); 
00117     ZeroMemory(&pi, sizeof(PROCESS_INFORMATION));
00118     if (CreateProcess(NULL, (char*)cmd.c_str(), 
00119         0, 0, 1, NORMAL_PRIORITY_CLASS, 
00120         0, 0, &si, &pi)) {
00121       pid=pi.dwProcessId;
00122       CloseHandle(pi.hProcess);
00123       CloseHandle(pi.hThread);
00124     } else {
00125       nout<<"CreateProcess failed: "<<cmd<<endl;
00126     }
00127     return pid;
00128   }
00129 
00130 }
00131 
00132 DirectD::DirectD() :
00133     _reader(&_cm, 1), _writer(&_cm, 0), _listener(&_cm, 0),
00134     _jobObject(0), _shutdown(false), _useOldStuff(false) {
00135 }
00136 
00137 DirectD::~DirectD() {
00138   // Close all the connections:
00139   ConnectionSet::iterator ci;
00140   for (ci = _connections.begin(); ci != _connections.end(); ++ci) {
00141     _cm.close_connection((*ci));
00142   }
00143   _connections.clear();
00144   
00145   kill_all();
00146 }
00147 
00148 int 
00149 DirectD::client_ready(const string& server_host, int port,
00150     const string& cmd) {
00151   stringstream ss;
00152   ss<<"!"<<cmd;
00153   send_one_message(server_host, port, ss.str());
00154   return 0;
00155 }
00156 
00157 int 
00158 DirectD::tell_server(const string& server_host, int port,
00159     const string& cmd) {
00160   send_one_message(server_host, port, cmd);
00161   return 0;
00162 }
00163 
00164 bool
00165 DirectD::wait_for_servers(int count, int timeout_ms) {
00166   if (count <= 0) {
00167     return true;
00168   }
00169   // The timeout is a rough estimate, we may wait slightly longer.
00170   const int wait_ms=200;
00171   int cycles=timeout_ms/wait_ms;
00172   while (cycles--) {
00173     check_for_new_clients();
00174     check_for_lost_connection();
00175     /*
00176         The following can be more generalized with a little work.
00177         check_for_datagrams() could take a handler function as an arugment, maybe.
00178     */
00179     ////check_for_datagrams();
00180     // Process all available datagrams.
00181     while (_reader.data_available()) {
00182       NetDatagram datagram;
00183       if (_reader.get_data(datagram)) {
00184         cout << count << ": Server at " << datagram.get_address()
00185             << " is ready." << endl;
00186         datagram.dump_hex(nout);
00187         //handle_datagram(datagram);
00188         DatagramIterator di(datagram);
00189         string s=di.get_string();
00190         if (s=="r" && !--count) {
00191           return true;
00192         }
00193       }
00194     }
00195 
00196     // Yield the timeslice before we poll again.
00197     //PR_Sleep(PR_MillisecondsToInterval(wait_ms));
00198     Sleep(wait_ms);
00199   }
00200   // We've waited long enough, assume they're not going to be
00201   // ready in the time we want them:
00202   return false;
00203 }
00204 
00205 int 
00206 DirectD::server_ready(const string& client_host, int port) {
00207   send_one_message(client_host, port, "r");
00208   return 0;
00209 }
00210 
00211 
00212 void
00213 DirectD::start_app(const string& cmd) {
00214   nout<<"start_app(cmd="<<cmd<<")"<<endl;
00215   if (_useOldStuff) {
00216     _pids.push_back(StartApp(cmd));
00217     nout<<"    pid="<<_pids.back()<<endl;
00218   } else {
00219     if (!_jobObject) {
00220       _jobObject=CreateJobObject(0, 0);
00221       if (!_jobObject) {
00222         nout<<"CreateProcess failed: no _jobObject: "<<GetLastError()<<endl;
00223         return;
00224       }
00225     }
00226     DWORD pid=0;
00227     STARTUPINFO si; 
00228     PROCESS_INFORMATION pi; 
00229     ZeroMemory(&si, sizeof(STARTUPINFO));
00230     si.cb = sizeof(STARTUPINFO); 
00231     ZeroMemory(&pi, sizeof(PROCESS_INFORMATION));
00232     if (CreateProcess(NULL, (char*)cmd.c_str(), 
00233         0, 0, 1, NORMAL_PRIORITY_CLASS | CREATE_SUSPENDED, 
00234         0, 0, &si, &pi)) {
00235       // The process must be created with CREATE_SUSPENDED to
00236       // give us a chance to get the handle into our sgJobObject
00237       // before the child processes starts sub-processes.
00238       if (!AssignProcessToJobObject(_jobObject, pi.hProcess)) {
00239         // ...The assign failed.
00240         cerr<<"StartJob AssignProcessToJobObject Error: "<<GetLastError()<<endl;
00241       }
00242       CloseHandle(pi.hProcess); //?????
00243       // Because we called CreateProcess with the CREATE_SUSPEND flag,
00244       // we must explicitly resume the processes main thread.
00245       if (ResumeThread(pi.hThread) == -1) {
00246         cerr<<"StartJob ResumeThread Error: "<<GetLastError()<<endl;
00247       }
00248       CloseHandle(pi.hThread);
00249     } else {
00250       nout<<"StartJob CreateProcess failed: "<<cmd<<endl;
00251     }
00252   }
00253 }
00254 
00255 void
00256 DirectD::kill_app(int index) {
00257   if (_useOldStuff) {
00258     int i = _pids.size() - 1 - index % _pids.size();
00259     PidStack::iterator pi = _pids.begin() + i;
00260     if (pi!=_pids.end()) {
00261       nout<<"trying kill "<<(*pi)<<endl;
00262       TerminateApp((*pi), 1000);
00263       _pids.erase(pi);
00264     }
00265   } else {
00266     cerr<<"kill_app(index) not implemented, calling kill_all() instead."<<endl;
00267     kill_all();
00268   }
00269 }
00270 
00271 void
00272 DirectD::kill_all() {
00273   if (_useOldStuff) {
00274     PidStack::reverse_iterator pi;
00275     for (pi = _pids.rbegin(); pi != _pids.rend(); ++pi) {
00276       nout<<"trying kill "<<(*pi)<<endl;
00277       TerminateApp((*pi), 1000);
00278     }
00279     _pids.clear();
00280   } else {
00281     if (!_jobObject) {
00282       cerr<<"kill_all(): No open _jobObject"<<endl;
00283     } else if (!TerminateJobObject(_jobObject, 0)) {
00284       cerr<<"kill_all() TerminateJobObject Error: "<<GetLastError()<<endl;
00285     }
00286     CloseHandle(_jobObject);
00287     _jobObject=0;
00288   }
00289 }
00290 
00291 void
00292 DirectD::send_command(const string& cmd) {
00293   NetDatagram datagram;
00294   datagram.add_string(cmd);
00295   // Send the datagram.
00296   ConnectionSet::iterator ci;
00297   for (ci = _connections.begin(); ci != _connections.end(); ++ci) {
00298     _writer.send(datagram, (*ci));
00299   }
00300 }
00301 
00302 void
00303 DirectD::handle_datagram(NetDatagram& datagram){
00304   DatagramIterator di(datagram);
00305   string cmd=di.get_string();
00306   handle_command(cmd);
00307 }
00308 
00309 void
00310 DirectD::handle_command(const string& cmd) {
00311   nout<<"DirectD::handle_command: "<<cmd<<endl;
00312 }
00313 
00314 void
00315 DirectD::send_one_message(const string& host_name, 
00316     int port,
00317     const string& message) {
00318   NetAddress host;
00319   if (!host.set_host(host_name, port)) {
00320     nout << "Unknown host: " << host_name << "\n";
00321   }
00322 
00323   const int timeout_ms=5000;
00324   PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms);
00325   if (c.is_null()) {
00326     nout << "No connection.\n";
00327     return;
00328   }
00329 
00330   nout << "Successfully opened TCP connection to " << host_name
00331        << " on port "
00332        << c->get_address().get_port() << " and IP "
00333        << c->get_address() << "\n";
00334 
00335   //_reader.add_connection(c);
00336   
00337   NetDatagram datagram;
00338   datagram.add_string(message);
00339   _writer.send(datagram, c);
00340   
00341   //PR_Sleep(PR_MillisecondsToInterval(200));
00342   //wait_for_servers(1, 10*1000);
00343   //_reader.remove_connection(c);
00344   _cm.close_connection(c);
00345 }
00346 
00347 int
00348 DirectD::connect_to(const string& host_name, int port) {
00349   NetAddress host;
00350   if (!host.set_host(host_name, port)) {
00351     nout << "Unknown host: " << host_name << "\n";
00352   }
00353 
00354   const int timeout_ms=5000;
00355   PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms);
00356   if (c.is_null()) {
00357     nout << "No connection.\n";
00358     return 0;
00359   }
00360 
00361   nout << "Successfully opened TCP connection to " << host_name
00362        << " on port "
00363        << c->get_address().get_port() << " and IP "
00364        << c->get_address() << "\n";
00365 
00366   _reader.add_connection(c);
00367   _connections.insert(c);
00368   return  c->get_address().get_port();
00369 }
00370 
00371 void
00372 DirectD::disconnect_from(const string& host_name, int port) {
00373   nout<<"disconnect_from(\""<<host_name<<", port="<<port<<")"<<endl;
00374   for (ConnectionSet::iterator i=_connections.begin(); i != _connections.end(); ++i) {
00375     nout<<"    found "<<(*i)->get_address().get_ip_string()<<", port "<<(*i)->get_address().get_port()<<endl;
00376     if ((*i)->get_address().get_ip_string()==host_name) {
00377       nout<<"    disconnecting."<<endl;
00378       _reader.remove_connection((*i));
00379       _cm.close_connection((*i));
00380       _connections.erase(i);
00381       break;
00382     }
00383   }
00384 }
00385 
00386 void
00387 DirectD::check_for_lost_connection() {
00388   while (_cm.reset_connection_available()) {
00389     PT(Connection) c;
00390     if (_cm.get_reset_connection(c)) {
00391       nout<<"Lost connection from "<<c->get_address()<<endl;
00392       _connections.erase(c);
00393       _cm.close_connection(c);
00394     }
00395   }
00396 }
00397 
00398 void
00399 DirectD::check_for_datagrams(){
00400   // Process all available datagrams.
00401   while (_reader.data_available()) {
00402     NetDatagram datagram;
00403     if (_reader.get_data(datagram)) {
00404       nout << "Got datagram " /*<< datagram <<*/ "from "
00405            << datagram.get_address() << endl;
00406       datagram.dump_hex(nout);
00407       handle_datagram(datagram);
00408     }
00409   }
00410 }
00411 
00412 void
00413 DirectD::listen_to(int port, int backlog) {
00414   PT(Connection) rendezvous = _cm.open_TCP_server_rendezvous(port, backlog);
00415   if (rendezvous.is_null()) {
00416     nout << "Cannot grab port " << port << ".\n";
00417     exit(1);
00418   }
00419   nout << "Listening for connections on port " << port << "\n";
00420   _listener.add_connection(rendezvous);
00421 }
00422 
00423 void
00424 DirectD::check_for_new_clients() {
00425   while (_listener.new_connection_available()) {
00426     PT(Connection) rv;
00427     NetAddress address;
00428     PT(Connection) new_connection;
00429     if (_listener.get_new_connection(rv, address, new_connection)) {
00430       nout << "Got connection from " << address << "\n";
00431       _reader.add_connection(new_connection);
00432       _connections.insert(new_connection);
00433     }
00434   }
00435 }
 All Classes Functions Variables Enumerations