Panda3D
directd.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file directd.cxx
10  * @author skyler
11  * @date 2002-04-08
12  * Based on test_tcp_*.* by drose.
13  */
14 
15 // This define tells the windows headers to include job objects:
16 #define _WIN32_WINNT 0x0500
17 
18 #include "directd.h"
19 /*#include "pandaFramework.h"
20 #include "queuedConnectionManager.h"*/
21 
22 // #include <process.h> #include <Windows.h> #include "pandabase.h"
23 
24 // #include "queuedConnectionManager.h" #include "queuedConnectionListener.h"
25 // #include "queuedConnectionReader.h" #include "connectionWriter.h"
26 #include "netAddress.h"
27 #include "connection.h"
28 #include "datagramIterator.h"
29 #include "netDatagram.h"
30 
31 #include "pset.h"
32 
33 #if !defined(CPPPARSER) && !defined(LINK_ALL_STATIC) && !defined(BUILDING_DIRECT_DIRECTD)
34  #error Buildsystem error: BUILDING_DIRECT_DIRECTD not defined
35 #endif
36 
37 using std::cerr;
38 using std::cout;
39 using std::endl;
40 using std::string;
41 
42 namespace {
43  // ...This section is part of the old stuff from the original
44  // implementation. The new stuff that uses job objects doesn't need this
45  // stuff:
46 
47  // The following is from an MSDN example:
48 
49  #define TA_FAILED 0
50  #define TA_SUCCESS_CLEAN 1
51  #define TA_SUCCESS_KILL 2
52  #define TA_SUCCESS_16 3
53 
54  BOOL CALLBACK
55  TerminateAppEnum(HWND hwnd, LPARAM lParam) {
56  DWORD dwID;
57  GetWindowThreadProcessId(hwnd, &dwID);
58  if(dwID == (DWORD)lParam) {
59  PostMessage(hwnd, WM_CLOSE, 0, 0);
60  }
61  return TRUE;
62  }
63 
64  /*
65  DWORD WINAPI TerminateApp(DWORD dwPID, DWORD dwTimeout)
66 
67  Purpose:
68  Shut down a 32-Bit Process
69 
70  Parameters:
71  dwPID
72  Process ID of the process to shut down.
73 
74  dwTimeout
75  Wait time in milliseconds before shutting down the process.
76 
77  Return Value:
78  TA_FAILED - If the shutdown failed.
79  TA_SUCCESS_CLEAN - If the process was shutdown using WM_CLOSE.
80  TA_SUCCESS_KILL - if the process was shut down with
81  TerminateProcess().
82  */
83  DWORD WINAPI
84  TerminateApp(DWORD dwPID, DWORD dwTimeout) {
85  HANDLE hProc;
86  DWORD dwRet;
87 
88  // If we can't open the process with PROCESS_TERMINATE rights, then we
89  // give up immediately.
90  hProc = OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, dwPID);
91  if(hProc == nullptr) {
92  return TA_FAILED;
93  }
94 
95  // TerminateAppEnum() posts WM_CLOSE to all windows whose PID matches your
96  // process's.
97  EnumWindows((WNDENUMPROC)TerminateAppEnum, (LPARAM)dwPID);
98 
99  // Wait on the handle. If it signals, great. If it times out, then you
100  // kill it.
101  if(WaitForSingleObject(hProc, dwTimeout)!=WAIT_OBJECT_0) {
102  dwRet=(TerminateProcess(hProc,0)?TA_SUCCESS_KILL:TA_FAILED);
103  } else {
104  dwRet = TA_SUCCESS_CLEAN;
105  }
106  CloseHandle(hProc);
107 
108  return dwRet;
109  }
110 
111  /*
112  Start an application with the command line cmd.
113  returns the process id of the new process/application.
114  */
115  DWORD
116  StartApp(const string& cmd) {
117  DWORD pid=0;
118  STARTUPINFO si;
119  PROCESS_INFORMATION pi;
120  ZeroMemory(&si, sizeof(STARTUPINFO));
121  si.cb = sizeof(STARTUPINFO);
122  ZeroMemory(&pi, sizeof(PROCESS_INFORMATION));
123  if (CreateProcess(nullptr, (char*)cmd.c_str(),
124  0, 0, 1, NORMAL_PRIORITY_CLASS,
125  0, 0, &si, &pi)) {
126  pid=pi.dwProcessId;
127  CloseHandle(pi.hProcess);
128  CloseHandle(pi.hThread);
129  } else {
130  nout<<"CreateProcess failed: "<<cmd<<endl;
131  }
132  return pid;
133  }
134 
135 }
136 
137 DirectD::DirectD() :
138  _reader(&_cm, 1), _writer(&_cm, 0), _listener(&_cm, 0),
139  _jobObject(0), _shutdown(false), _useOldStuff(false) {
140 }
141 
142 DirectD::~DirectD() {
143  // Close all the connections:
144  ConnectionSet::iterator ci;
145  for (ci = _connections.begin(); ci != _connections.end(); ++ci) {
146  _cm.close_connection((*ci));
147  }
148  _connections.clear();
149 
150  kill_all();
151 }
152 
153 int
154 DirectD::client_ready(const string& server_host, int port,
155  const string& cmd) {
156  std::stringstream ss;
157  ss<<"!"<<cmd;
158  send_one_message(server_host, port, ss.str());
159  return 0;
160 }
161 
162 int
163 DirectD::tell_server(const string& server_host, int port,
164  const string& cmd) {
165  send_one_message(server_host, port, cmd);
166  return 0;
167 }
168 
169 bool
170 DirectD::wait_for_servers(int count, int timeout_ms) {
171  if (count <= 0) {
172  return true;
173  }
174  // The timeout is a rough estimate, we may wait slightly longer.
175  const int wait_ms=200;
176  int cycles=timeout_ms/wait_ms;
177  while (cycles--) {
178  check_for_new_clients();
179  check_for_lost_connection();
180  /*
181  The following can be more generalized with a little work.
182  check_for_datagrams() could take a handler function as an arugment, maybe.
183  */
184  // check_for_datagrams(); Process all available datagrams.
185  while (_reader.data_available()) {
186  NetDatagram datagram;
187  if (_reader.get_data(datagram)) {
188  cout << count << ": Server at " << datagram.get_address()
189  << " is ready." << endl;
190  datagram.dump_hex(nout);
191  // handle_datagram(datagram);
192  DatagramIterator di(datagram);
193  string s=di.get_string();
194  if (s=="r" && !--count) {
195  return true;
196  }
197  }
198  }
199 
200  // Yield the timeslice before we poll again.
201  // PR_Sleep(PR_MillisecondsToInterval(wait_ms));
202  Sleep(wait_ms);
203  }
204  // We've waited long enough, assume they're not going to be ready in the
205  // time we want them:
206  return false;
207 }
208 
209 int
210 DirectD::server_ready(const string& client_host, int port) {
211  send_one_message(client_host, port, "r");
212  return 0;
213 }
214 
215 
216 void
217 DirectD::start_app(const string& cmd) {
218  nout<<"start_app(cmd="<<cmd<<")"<<endl;
219  if (_useOldStuff) {
220  _pids.push_back(StartApp(cmd));
221  nout<<" pid="<<_pids.back()<<endl;
222  } else {
223  if (!_jobObject) {
224  _jobObject=CreateJobObject(0, 0);
225  if (!_jobObject) {
226  nout<<"CreateProcess failed: no _jobObject: "<<GetLastError()<<endl;
227  return;
228  }
229  }
230  DWORD pid=0;
231  STARTUPINFO si;
232  PROCESS_INFORMATION pi;
233  ZeroMemory(&si, sizeof(STARTUPINFO));
234  si.cb = sizeof(STARTUPINFO);
235  ZeroMemory(&pi, sizeof(PROCESS_INFORMATION));
236  if (CreateProcess(nullptr, (char*)cmd.c_str(),
237  0, 0, 1, NORMAL_PRIORITY_CLASS | CREATE_SUSPENDED,
238  0, 0, &si, &pi)) {
239  // The process must be created with CREATE_SUSPENDED to give us a chance
240  // to get the handle into our sgJobObject before the child processes
241  // starts sub-processes.
242  if (!AssignProcessToJobObject(_jobObject, pi.hProcess)) {
243  // ...The assign failed.
244  cerr<<"StartJob AssignProcessToJobObject Error: "<<GetLastError()<<endl;
245  }
246  CloseHandle(pi.hProcess); //?????
247  // Because we called CreateProcess with the CREATE_SUSPEND flag, we must
248  // explicitly resume the processes main thread.
249  if (ResumeThread(pi.hThread) == -1) {
250  cerr<<"StartJob ResumeThread Error: "<<GetLastError()<<endl;
251  }
252  CloseHandle(pi.hThread);
253  } else {
254  nout<<"StartJob CreateProcess failed: "<<cmd<<endl;
255  }
256  }
257 }
258 
259 void
260 DirectD::kill_app(int index) {
261  if (_useOldStuff) {
262  int i = _pids.size() - 1 - index % _pids.size();
263  PidStack::iterator pi = _pids.begin() + i;
264  if (pi!=_pids.end()) {
265  nout<<"trying kill "<<(*pi)<<endl;
266  TerminateApp((*pi), 1000);
267  _pids.erase(pi);
268  }
269  } else {
270  cerr<<"kill_app(index) not implemented, calling kill_all() instead."<<endl;
271  kill_all();
272  }
273 }
274 
275 void
276 DirectD::kill_all() {
277  if (_useOldStuff) {
278  PidStack::reverse_iterator pi;
279  for (pi = _pids.rbegin(); pi != _pids.rend(); ++pi) {
280  nout<<"trying kill "<<(*pi)<<endl;
281  TerminateApp((*pi), 1000);
282  }
283  _pids.clear();
284  } else {
285  if (!_jobObject) {
286  cerr<<"kill_all(): No open _jobObject"<<endl;
287  } else if (!TerminateJobObject(_jobObject, 0)) {
288  cerr<<"kill_all() TerminateJobObject Error: "<<GetLastError()<<endl;
289  }
290  CloseHandle(_jobObject);
291  _jobObject=0;
292  }
293 }
294 
295 void
296 DirectD::send_command(const string& cmd) {
297  NetDatagram datagram;
298  datagram.add_string(cmd);
299  // Send the datagram.
300  ConnectionSet::iterator ci;
301  for (ci = _connections.begin(); ci != _connections.end(); ++ci) {
302  _writer.send(datagram, (*ci));
303  }
304 }
305 
306 void
307 DirectD::handle_datagram(NetDatagram& datagram){
308  DatagramIterator di(datagram);
309  string cmd=di.get_string();
310  handle_command(cmd);
311 }
312 
313 void
314 DirectD::handle_command(const string& cmd) {
315  nout<<"DirectD::handle_command: "<<cmd<<endl;
316 }
317 
318 void
319 DirectD::send_one_message(const string& host_name,
320  int port,
321  const string& message) {
322  NetAddress host;
323  if (!host.set_host(host_name, port)) {
324  nout << "Unknown host: " << host_name << "\n";
325  }
326 
327  const int timeout_ms=5000;
328  PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms);
329  if (c.is_null()) {
330  nout << "No connection.\n";
331  return;
332  }
333 
334  nout << "Successfully opened TCP connection to " << host_name
335  << " on port "
336  << c->get_address().get_port() << " and IP "
337  << c->get_address() << "\n";
338 
339  // _reader.add_connection(c);
340 
341  NetDatagram datagram;
342  datagram.add_string(message);
343  _writer.send(datagram, c);
344 
345  // PR_Sleep(PR_MillisecondsToInterval(200)); wait_for_servers(1, 10*1000);
346  // _reader.remove_connection(c);
347  _cm.close_connection(c);
348 }
349 
350 int
351 DirectD::connect_to(const string& host_name, int port) {
352  NetAddress host;
353  if (!host.set_host(host_name, port)) {
354  nout << "Unknown host: " << host_name << "\n";
355  }
356 
357  const int timeout_ms=5000;
358  PT(Connection) c = _cm.open_TCP_client_connection(host, timeout_ms);
359  if (c.is_null()) {
360  nout << "No connection.\n";
361  return 0;
362  }
363 
364  nout << "Successfully opened TCP connection to " << host_name
365  << " on port "
366  << c->get_address().get_port() << " and IP "
367  << c->get_address() << "\n";
368 
369  _reader.add_connection(c);
370  _connections.insert(c);
371  return c->get_address().get_port();
372 }
373 
374 void
375 DirectD::disconnect_from(const string& host_name, int port) {
376  nout<<"disconnect_from(\""<<host_name<<", port="<<port<<")"<<endl;
377  for (ConnectionSet::iterator i=_connections.begin(); i != _connections.end(); ++i) {
378  nout<<" found "<<(*i)->get_address().get_ip_string()<<", port "<<(*i)->get_address().get_port()<<endl;
379  if ((*i)->get_address().get_ip_string()==host_name) {
380  nout<<" disconnecting."<<endl;
381  _reader.remove_connection((*i));
382  _cm.close_connection((*i));
383  _connections.erase(i);
384  break;
385  }
386  }
387 }
388 
389 void
390 DirectD::check_for_lost_connection() {
391  while (_cm.reset_connection_available()) {
392  PT(Connection) c;
393  if (_cm.get_reset_connection(c)) {
394  nout<<"Lost connection from "<<c->get_address()<<endl;
395  _connections.erase(c);
396  _cm.close_connection(c);
397  }
398  }
399 }
400 
401 void
402 DirectD::check_for_datagrams(){
403  // Process all available datagrams.
404  while (_reader.data_available()) {
405  NetDatagram datagram;
406  if (_reader.get_data(datagram)) {
407  nout << "Got datagram " /*<< datagram <<*/ "from "
408  << datagram.get_address() << endl;
409  datagram.dump_hex(nout);
410  handle_datagram(datagram);
411  }
412  }
413 }
414 
415 void
416 DirectD::listen_to(int port, int backlog) {
417  PT(Connection) rendezvous = _cm.open_TCP_server_rendezvous(port, backlog);
418  if (rendezvous.is_null()) {
419  nout << "Cannot grab port " << port << ".\n";
420  exit(1);
421  }
422  nout << "Listening for connections on port " << port << "\n";
423  _listener.add_connection(rendezvous);
424 }
425 
426 void
427 DirectD::check_for_new_clients() {
428  while (_listener.new_connection_available()) {
429  PT(Connection) rv;
430  NetAddress address;
431  PT(Connection) new_connection;
432  if (_listener.get_new_connection(rv, address, new_connection)) {
433  nout << "Got connection from " << address << "\n";
434  _reader.add_connection(new_connection);
435  _connections.insert(new_connection);
436  }
437  }
438 }
A specific kind of Datagram, especially for sending across or receiving from a network.
Definition: netDatagram.h:40
bool send(const Datagram &datagram, const PT(Connection) &connection, bool block=false)
Enqueues a datagram for transmittal on the indicated socket.
int tell_server(const std::string &server_host, int port, const std::string &cmd)
Tell the server to do the command cmd.
Definition: directd.cxx:163
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void dump_hex(std::ostream &out, unsigned int indent=0) const
Writes a representation of the entire datagram contents, as a sequence of hex (and ASCII) values.
Definition: datagram.cxx:44
void disconnect_from(const std::string &server_host, int port)
This is the counterpart to connect_to().
Definition: directd.cxx:375
bool get_reset_connection(PT(Connection) &connection)
If a previous call to reset_connection_available() returned true, this function will return informati...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int server_ready(const std::string &client_host, int port)
Call this function from the server when import ShowbaseGlobal is nearly finished.
Definition: directd.cxx:210
std::string get_string()
Extracts a variable-length string.
bool reset_connection_available() const
Returns true if one of the readers/writers/listeners reported a connection reset recently.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
int connect_to(const std::string &server_host, int port)
Call connect_to from client for each server.
Definition: directd.cxx:351
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
int client_ready(const std::string &server_host, int port, const std::string &cmd)
Call this function from the client when import ShowbaseGlobal is nearly finished.
Definition: directd.cxx:154
bool data_available()
Returns true if a datagram is available on the queue; call get_data() to extract the datagram.
bool set_host(const std::string &hostname, int port)
Sets the address up to refer to a particular port on a particular host.
Definition: netAddress.cxx:66
void listen_to(int port, int backlog=8)
Call listen_to in the server.
Definition: directd.cxx:416
void add_string(const std::string &str)
Adds a variable-length string to the datagram.
Definition: datagram.I:219
bool new_connection_available()
Returns true if a new connection was recently established; the connection information may then be ret...
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
bool wait_for_servers(int count, int timeout_ms=2 *60 *1000)
Call this function from the client after calling <count> client_ready() calls.
Definition: directd.cxx:170
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void send_command(const std::string &cmd)
Send the same command string to all current connections.
Definition: directd.cxx:296
bool get_data(NetDatagram &result)
If a previous call to data_available() returned true, this function will return the datagram that has...
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
A class to retrieve the individual data elements previously stored in a Datagram.
Represents a single TCP or UDP socket for input or output.
Definition: connection.h:29
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool get_new_connection(PT(Connection) &rendezvous, NetAddress &address, PT(Connection) &new_connection)
If a previous call to new_connection_available() returned true, this function will return information...
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.
Definition: netAddress.h:25