Panda3D
directd.cxx
1 // Filename: directd.cxx
2 // Created by: skyler 2002.04.08
3 // Based on test_tcp_*.* by drose.
4 //
5 ////////////////////////////////////////////////////////////////////
6 //
7 // PANDA 3D SOFTWARE
8 // Copyright (c) Carnegie Mellon University. All rights reserved.
9 //
10 // All use of this software is subject to the terms of the revised BSD
11 // license. You should have received a copy of this license along
12 // with this source code in a file named "LICENSE."
13 //
14 ////////////////////////////////////////////////////////////////////
15 
16 // This define tells the windows headers to include job objects:
17 #define _WIN32_WINNT 0x0500
18 
19 #include "directd.h"
20 /*#include "pandaFramework.h"
21 #include "queuedConnectionManager.h"*/
22 
23 //#include <process.h>
24 //#include <Windows.h>
25 //#include "pandabase.h"
26 
27 //#include "queuedConnectionManager.h"
28 //#include "queuedConnectionListener.h"
29 //#include "queuedConnectionReader.h"
30 //#include "connectionWriter.h"
31 #include "netAddress.h"
32 #include "connection.h"
33 #include "datagramIterator.h"
34 #include "netDatagram.h"
35 
36 #include "pset.h"
37 
38 namespace {
39  // ...This section is part of the old stuff from the original implementation.
40  // The new stuff that uses job objects doesn't need this stuff:
41 
42  // The following is from an MSDN example:
43 
44  #define TA_FAILED 0
45  #define TA_SUCCESS_CLEAN 1
46  #define TA_SUCCESS_KILL 2
47  #define TA_SUCCESS_16 3
48 
49  BOOL CALLBACK
50  TerminateAppEnum(HWND hwnd, LPARAM lParam) {
51  DWORD dwID;
52  GetWindowThreadProcessId(hwnd, &dwID);
53  if(dwID == (DWORD)lParam) {
54  PostMessage(hwnd, WM_CLOSE, 0, 0);
55  }
56  return TRUE;
57  }
58 
59  /*
60  DWORD WINAPI TerminateApp(DWORD dwPID, DWORD dwTimeout)
61 
62  Purpose:
63  Shut down a 32-Bit Process
64 
65  Parameters:
66  dwPID
67  Process ID of the process to shut down.
68 
69  dwTimeout
70  Wait time in milliseconds before shutting down the process.
71 
72  Return Value:
73  TA_FAILED - If the shutdown failed.
74  TA_SUCCESS_CLEAN - If the process was shutdown using WM_CLOSE.
75  TA_SUCCESS_KILL - if the process was shut down with
76  TerminateProcess().
77  */
78  DWORD WINAPI
79  TerminateApp(DWORD dwPID, DWORD dwTimeout) {
80  HANDLE hProc;
81  DWORD dwRet;
82 
83  // If we can't open the process with PROCESS_TERMINATE rights,
84  // then we give up immediately.
85  hProc = OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, dwPID);
86  if(hProc == NULL) {
87  return TA_FAILED;
88  }
89 
90  // TerminateAppEnum() posts WM_CLOSE to all windows whose PID
91  // matches your process's.
92  EnumWindows((WNDENUMPROC)TerminateAppEnum, (LPARAM)dwPID);
93 
94  // Wait on the handle. If it signals, great. If it times out,
95  // then you kill it.
96  if(WaitForSingleObject(hProc, dwTimeout)!=WAIT_OBJECT_0) {
97  dwRet=(TerminateProcess(hProc,0)?TA_SUCCESS_KILL:TA_FAILED);
98  } else {
99  dwRet = TA_SUCCESS_CLEAN;
100  }
101  CloseHandle(hProc);
102 
103  return dwRet;
104  }
105 
106  /*
107  Start an application with the command line cmd.
108  returns the process id of the new process/application.
109  */
110  DWORD
111  StartApp(const string& cmd) {
112  DWORD pid=0;
113  STARTUPINFO si;
114  PROCESS_INFORMATION pi;
115  ZeroMemory(&si, sizeof(STARTUPINFO));
116  si.cb = sizeof(STARTUPINFO);
117  ZeroMemory(&pi, sizeof(PROCESS_INFORMATION));
118  if (CreateProcess(NULL, (char*)cmd.c_str(),
119  0, 0, 1, NORMAL_PRIORITY_CLASS,
120  0, 0, &si, &pi)) {
121  pid=pi.dwProcessId;
122  CloseHandle(pi.hProcess);
123  CloseHandle(pi.hThread);
124  } else {
125  nout<<"CreateProcess failed: "<<cmd<<endl;
126  }
127  return pid;
128  }
129 
130 }
131 
132 DirectD::DirectD() :
133  _reader(&_cm, 1), _writer(&_cm, 0), _listener(&_cm, 0),
134  _jobObject(0), _shutdown(false), _useOldStuff(false) {
135 }
136 
137 DirectD::~DirectD() {
138  // Close all the connections:
139  ConnectionSet::iterator ci;
140  for (ci = _connections.begin(); ci != _connections.end(); ++ci) {
141  _cm.close_connection((*ci));
142  }
143  _connections.clear();
144 
145  kill_all();
146 }
147 
148 int
149 DirectD::client_ready(const string& server_host, int port,
150  const string& cmd) {
151  stringstream ss;
152  ss<<"!"<<cmd;
153  send_one_message(server_host, port, ss.str());
154  return 0;
155 }
156 
157 int
158 DirectD::tell_server(const string& server_host, int port,
159  const string& cmd) {
160  send_one_message(server_host, port, cmd);
161  return 0;
162 }
163 
164 bool
165 DirectD::wait_for_servers(int count, int timeout_ms) {
166  if (count <= 0) {
167  return true;
168  }
169  // The timeout is a rough estimate, we may wait slightly longer.
170  const int wait_ms=200;
171  int cycles=timeout_ms/wait_ms;
172  while (cycles--) {
173  check_for_new_clients();
174  check_for_lost_connection();
175  /*
176  The following can be more generalized with a little work.
177  check_for_datagrams() could take a handler function as an arugment, maybe.
178  */
179  ////check_for_datagrams();
180  // Process all available datagrams.
181  while (_reader.data_available()) {
182  NetDatagram datagram;
183  if (_reader.get_data(datagram)) {
184  cout << count << ": Server at " << datagram.get_address()
185  << " is ready." << endl;
186  datagram.dump_hex(nout);
187  //handle_datagram(datagram);
188  DatagramIterator di(datagram);
189  string s=di.get_string();
190  if (s=="r" && !--count) {
191  return true;
192  }
193  }
194  }
195 
196  // Yield the timeslice before we poll again.
197  //PR_Sleep(PR_MillisecondsToInterval(wait_ms));
198  Sleep(wait_ms);
199  }
200  // We've waited long enough, assume they're not going to be
201  // ready in the time we want them:
202  return false;
203 }
204 
205 int
206 DirectD::server_ready(const string& client_host, int port) {
207  send_one_message(client_host, port, "r");
208  return 0;
209 }
210 
211 
212 void
213 DirectD::start_app(const string& cmd) {
214  nout<<"start_app(cmd="<<cmd<<")"<<endl;
215  if (_useOldStuff) {
216  _pids.push_back(StartApp(cmd));
217  nout<<" pid="<<_pids.back()<<endl;
218  } else {
219  if (!_jobObject) {
220  _jobObject=CreateJobObject(0, 0);
221  if (!_jobObject) {
222  nout<<"CreateProcess failed: no _jobObject: "<<GetLastError()<<endl;
223  return;
224  }
225  }
226  DWORD pid=0;
227  STARTUPINFO si;
228  PROCESS_INFORMATION pi;
229  ZeroMemory(&si, sizeof(STARTUPINFO));
230  si.cb = sizeof(STARTUPINFO);
231  ZeroMemory(&pi, sizeof(PROCESS_INFORMATION));
232  if (CreateProcess(NULL, (char*)cmd.c_str(),
233  0, 0, 1, NORMAL_PRIORITY_CLASS | CREATE_SUSPENDED,
234  0, 0, &si, &pi)) {
235  // The process must be created with CREATE_SUSPENDED to
236  // give us a chance to get the handle into our sgJobObject
237  // before the child processes starts sub-processes.
238  if (!AssignProcessToJobObject(_jobObject, pi.hProcess)) {
239  // ...The assign failed.
240  cerr<<"StartJob AssignProcessToJobObject Error: "<<GetLastError()<<endl;
241  }
242  CloseHandle(pi.hProcess); //?????
243  // Because we called CreateProcess with the CREATE_SUSPEND flag,
244  // we must explicitly resume the processes main thread.
245  if (ResumeThread(pi.hThread) == -1) {
246  cerr<<"StartJob ResumeThread Error: "<<GetLastError()<<endl;
247  }
248  CloseHandle(pi.hThread);
249  } else {
250  nout<<"StartJob CreateProcess failed: "<<cmd<<endl;
251  }
252  }
253 }
254 
255 void
256 DirectD::kill_app(int index) {
257  if (_useOldStuff) {
258  int i = _pids.size() - 1 - index % _pids.size();
259  PidStack::iterator pi = _pids.begin() + i;
260  if (pi!=_pids.end()) {
261  nout<<"trying kill "<<(*pi)<<endl;
262  TerminateApp((*pi), 1000);
263  _pids.erase(pi);
264  }
265  } else {
266  cerr<<"kill_app(index) not implemented, calling kill_all() instead."<<endl;
267  kill_all();
268  }
269 }
270 
271 void
272 DirectD::kill_all() {
273  if (_useOldStuff) {
274  PidStack::reverse_iterator pi;
275  for (pi = _pids.rbegin(); pi != _pids.rend(); ++pi) {
276  nout<<"trying kill "<<(*pi)<<endl;
277  TerminateApp((*pi), 1000);
278  }
279  _pids.clear();
280  } else {
281  if (!_jobObject) {
282  cerr<<"kill_all(): No open _jobObject"<<endl;
283  } else if (!TerminateJobObject(_jobObject, 0)) {
284  cerr<<"kill_all() TerminateJobObject Error: "<<GetLastError()<<endl;
285  }
286  CloseHandle(_jobObject);
287  _jobObject=0;
288  }
289 }
290 
291 void
292 DirectD::send_command(const string& cmd) {
293  NetDatagram datagram;
294  datagram.add_string(cmd);
295  // Send the datagram.
296  ConnectionSet::iterator ci;
297  for (ci = _connections.begin(); ci != _connections.end(); ++ci) {
298  _writer.send(datagram, (*ci));
299  }
300 }
301 
302 void
303 DirectD::handle_datagram(NetDatagram& datagram){
304  DatagramIterator di(datagram);
305  string cmd=di.get_string();
306  handle_command(cmd);
307 }
308 
309 void
310 DirectD::handle_command(const string& cmd) {
311  nout<<"DirectD::handle_command: "<<cmd<<endl;
312 }
313 
314 void
315 DirectD::send_one_message(const string& host_name,
316  int port,
317  const string& message) {
318  NetAddress host;
319  if (!host.set_host(host_name, port)) {
320  nout << "Unknown host: " << host_name << "\n";
321  }
322 
323  const int timeout_ms=5000;
324  PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms);
325  if (c.is_null()) {
326  nout << "No connection.\n";
327  return;
328  }
329 
330  nout << "Successfully opened TCP connection to " << host_name
331  << " on port "
332  << c->get_address().get_port() << " and IP "
333  << c->get_address() << "\n";
334 
335  //_reader.add_connection(c);
336 
337  NetDatagram datagram;
338  datagram.add_string(message);
339  _writer.send(datagram, c);
340 
341  //PR_Sleep(PR_MillisecondsToInterval(200));
342  //wait_for_servers(1, 10*1000);
343  //_reader.remove_connection(c);
344  _cm.close_connection(c);
345 }
346 
347 int
348 DirectD::connect_to(const string& host_name, int port) {
349  NetAddress host;
350  if (!host.set_host(host_name, port)) {
351  nout << "Unknown host: " << host_name << "\n";
352  }
353 
354  const int timeout_ms=5000;
355  PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms);
356  if (c.is_null()) {
357  nout << "No connection.\n";
358  return 0;
359  }
360 
361  nout << "Successfully opened TCP connection to " << host_name
362  << " on port "
363  << c->get_address().get_port() << " and IP "
364  << c->get_address() << "\n";
365 
366  _reader.add_connection(c);
367  _connections.insert(c);
368  return c->get_address().get_port();
369 }
370 
371 void
372 DirectD::disconnect_from(const string& host_name, int port) {
373  nout<<"disconnect_from(\""<<host_name<<", port="<<port<<")"<<endl;
374  for (ConnectionSet::iterator i=_connections.begin(); i != _connections.end(); ++i) {
375  nout<<" found "<<(*i)->get_address().get_ip_string()<<", port "<<(*i)->get_address().get_port()<<endl;
376  if ((*i)->get_address().get_ip_string()==host_name) {
377  nout<<" disconnecting."<<endl;
378  _reader.remove_connection((*i));
379  _cm.close_connection((*i));
380  _connections.erase(i);
381  break;
382  }
383  }
384 }
385 
386 void
387 DirectD::check_for_lost_connection() {
388  while (_cm.reset_connection_available()) {
389  PT(Connection) c;
390  if (_cm.get_reset_connection(c)) {
391  nout<<"Lost connection from "<<c->get_address()<<endl;
392  _connections.erase(c);
393  _cm.close_connection(c);
394  }
395  }
396 }
397 
398 void
399 DirectD::check_for_datagrams(){
400  // Process all available datagrams.
401  while (_reader.data_available()) {
402  NetDatagram datagram;
403  if (_reader.get_data(datagram)) {
404  nout << "Got datagram " /*<< datagram <<*/ "from "
405  << datagram.get_address() << endl;
406  datagram.dump_hex(nout);
407  handle_datagram(datagram);
408  }
409  }
410 }
411 
412 void
413 DirectD::listen_to(int port, int backlog) {
414  PT(Connection) rendezvous = _cm.open_TCP_server_rendezvous(port, backlog);
415  if (rendezvous.is_null()) {
416  nout << "Cannot grab port " << port << ".\n";
417  exit(1);
418  }
419  nout << "Listening for connections on port " << port << "\n";
420  _listener.add_connection(rendezvous);
421 }
422 
423 void
424 DirectD::check_for_new_clients() {
425  while (_listener.new_connection_available()) {
426  PT(Connection) rv;
427  NetAddress address;
428  PT(Connection) new_connection;
429  if (_listener.get_new_connection(rv, address, new_connection)) {
430  nout << "Got connection from " << address << "\n";
431  _reader.add_connection(new_connection);
432  _connections.insert(new_connection);
433  }
434  }
435 }
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:43
void add_string(const string &str)
Adds a variable-length string to the datagram.
Definition: datagram.I:351
void dump_hex(ostream &out, unsigned int indent=0) const
Writes a representation of the entire datagram contents, as a sequence of hex (and ASCII) values...
Definition: datagram.cxx:52
bool set_host(const string &hostname, int port)
Sets the address up to refer to a particular port on a particular host.
Definition: netAddress.cxx:83
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent...
A class to retrieve the individual data elements previously stored in a Datagram. ...
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:32
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound...
Definition: netAddress.h:27