HPCToolkit
Server.cpp
Go to the documentation of this file.
1 
3 // * BeginRiceCopyright *****************************************************
4 //
5 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Server.cpp $
6 // $Id: Server.cpp 4461 2014-03-12 16:49:28Z laksono@gmail.com $
7 //
8 // --------------------------------------------------------------------------
9 // Part of HPCToolkit (hpctoolkit.org)
10 //
11 // Information about sources of support for research and development of
12 // HPCToolkit is at 'hpctoolkit.org' and in 'README.Acknowledgments'.
13 // --------------------------------------------------------------------------
14 //
15 // Copyright ((c)) 2002-2019, Rice University
16 // All rights reserved.
17 //
18 // Redistribution and use in source and binary forms, with or without
19 // modification, are permitted provided that the following conditions are
20 // met:
21 //
22 // * Redistributions of source code must retain the above copyright
23 // notice, this list of conditions and the following disclaimer.
24 //
25 // * Redistributions in binary form must reproduce the above copyright
26 // notice, this list of conditions and the following disclaimer in the
27 // documentation and/or other materials provided with the distribution.
28 //
29 // * Neither the name of Rice University (RICE) nor the names of its
30 // contributors may be used to endorse or promote products derived from
31 // this software without specific prior written permission.
32 //
33 // This software is provided by RICE and contributors "as is" and any
34 // express or implied warranties, including, but not limited to, the
35 // implied warranties of merchantability and fitness for a particular
36 // purpose are disclaimed. In no event shall RICE or contributors be
37 // liable for any direct, indirect, incidental, special, exemplary, or
38 // consequential damages (including, but not limited to, procurement of
39 // substitute goods or services; loss of use, data, or profits; or
40 // business interruption) however caused and on any theory of liability,
41 // whether in contract, strict liability, or tort (including negligence
42 // or otherwise) arising in any way out of the use of this software, even
43 // if advised of the possibility of such damage.
44 //
45 // ******************************************************* EndRiceCopyright *
46 
47 //***************************************************************************
48 //
49 // File:
50 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Server.cpp $
51 //
52 // Purpose:
53 // Handles the socket communication for the most part and the conversion
54 // between messages and work to be done
55 //
56 // Description:
57 // [The set of functions, macros, etc. defined in the file]
58 //
59 //***************************************************************************
60 
61 #include "Server.hpp"
62 #include "DataSocketStream.hpp"
63 #include "DBOpener.hpp"
64 #include "Constants.hpp"
65 #include "DataCompressionLayer.hpp"
66 #include "ProgressBar.hpp"
67 #include "FileUtils.hpp"
68 #include "Communication.hpp"
69 #include "DebugUtils.hpp"
70 #include "Filter.hpp"
71 #include "FilterSet.hpp"
73 #include "TimeCPID.hpp" //For Time
74 
75 #ifdef HPCTOOLKIT_PROFILE
76  #include "hpctoolkit.h"
77 #endif
78 
79 #include <iostream>
80 #include <cstdio>
81 #include <zlib.h>
82 #include <algorithm> //for min of int64_t
83 #include <string>
84 
85 using namespace std;
86 
87 namespace TraceviewerServer
88 {
89  bool useCompression = true;
91  int xmlPortNumber = 0;
92 
93  Server::Server()
94  {
95  DataSocketStream* socketptr = NULL;
96  DataSocketStream* xmlSocketPtr = NULL;
97 
98  //Port 21590 is used by vofr-gateway. Do we want to change it?
99  DataSocketStream socket(mainPortNumber, true);
100  socketptr = &socket;
101 
102  mainPortNumber = socketptr->getPort();
103  cout << "Received connection" << endl;
104 
105  int command = socketptr->readInt();
106  if (command == OPEN)
107  {
108  // Laksono 2014.11.11: Somehow the class Args.cpp cannot accept -1 as an integer argument
109  // (not sure if this is a feature or it's a bug to confuse between a flag and negative number)
110  // Temporary, we can specify that if the xml port is 1 then it will be the same as the main port
111  if (xmlPortNumber == 1)
112  xmlPortNumber = mainPortNumber;
113 
114  if (xmlPortNumber != mainPortNumber)
115  {//On a different port. Create another socket.
116  xmlSocketPtr = new DataSocketStream(xmlPortNumber, false);
117  }
118  else
119  {
120  //Same port, simply use this socket
121  xmlSocketPtr = socketptr;
122  }
123 
124  // ----------------------------------------------------------------------------------
125  // Loop for the Server: As long as the client doesn't close the socket communication
126  // we'll remain in this loop
127  // ----------------------------------------------------------------------------------
128 
129  while( runConnection(socketptr, xmlSocketPtr)==START_NEW_CONNECTION_IMMEDIATELY) ;
130 
131  if (xmlPortNumber != mainPortNumber)
132  {
133  delete (xmlSocketPtr);
134  }
135  }
136  else
137  {
138  cerr << "Expected an open command, got " << command << endl;
140  }
141  }
142 
143  Server::~Server()
144  {
145  delete (controller);
146  }
147 
148 
149  int Server::runConnection(DataSocketStream* socketptr, DataSocketStream* xmlSocket)
150  {
151 #ifdef HPCTOOLKIT_PROFILE
153 #endif
154  controller = parseOpenDB(socketptr);
155 
156  if (controller == NULL)
157  {
158  cout << "Could not open database" << endl;
159  sendDBOpenFailed(socketptr);
160  //Now wait until we get the next tag. If we don't read it here, the message stream will
161  //be off by 4 bytes because parseOpenDB (which reads from the stream next) does not expect OPEN.
162  int tag = socketptr->readInt();
163  if (tag == OPEN)
165  else
166  return CLOSE_SERVER;
167  }
168  else
169  {
170  DEBUGCOUT(1) << "Database opened" << endl;
171  sendDBOpenedSuccessfully(socketptr, xmlSocket);
172  }
173 
174  int Message = socketptr->readInt();
175  if (Message == INFO)
176  parseInfo(socketptr);
177  else
178  cerr << "Did not receive info packet" << endl;
179 
180 #ifdef HPCTOOLKIT_PROFILE
182 #endif
183  // ------------------------------------------------------------------
184  // main loop for a communication session
185  // as long as the client doesn't send OPEN or DONE, we remain in
186  // in this loop
187  // ------------------------------------------------------------------
188  while (true)
189  {
190  int nextCommand = socketptr->readInt();
191  switch (nextCommand)
192  {
193  case DATA:
194 #ifdef HPCTOOLKIT_PROFILE
196 #endif
197  getAndSendData(socketptr);
198 #ifdef HPCTOOLKIT_PROFILE
200 #endif
201  break;
202  case FLTR:
203 #ifdef HPCTOOLKIT_PROFILE
205 #endif
206  filter(socketptr);
207 #ifdef HPCTOOLKIT_PROFILE
209 #endif
210  break;
211  case DONE:
212  return CLOSE_SERVER;
213  case OPEN:
215  default:
216  cerr << "Unknown command received" << endl;
217  return ERROR_UNKNOWN_COMMAND;
218  }
219  }
220 
221  return CLOSE_SERVER;
222  }
223  void Server::parseInfo(DataSocketStream* socket)
224  {
225 
226  Time minBegTime = socket->readLong();
227  Time maxEndTime = socket->readLong();
228  int headerSize = socket->readInt();
229  controller->setInfo(minBegTime, maxEndTime, headerSize);
230 
231  Communication::sendParseInfo(minBegTime, maxEndTime, headerSize);//Send to MPI if necessary
232  }
233 
234  void Server::sendDBOpenedSuccessfully(DataSocketStream* socket, DataSocketStream* xmlSocket)
235  {
236  socket->writeInt(DBOK);
237 
238  int actualXMLPort = xmlSocket->getPort();
239  socket->writeInt(actualXMLPort);
240 
241  int numFiles = controller->getNumRanks();
242  socket->writeInt(numFiles);
243 
244  // This is an int so that it is possible to have different compression
245  // algorithms in the future. For now, it is just
246  // 0=no compression, 1= normal compression
247  int compressionType;
248  compressionType = useCompression ? 1 : 0;
249  socket->writeInt(compressionType);
250 
251  //Send ValuesX
252  int* rankProcessIds = controller->getValuesXProcessID();
253  short* rankThreadIds = controller->getValuesXThreadID();
254  for (int i = 0; i < numFiles; i++)
255  {
256  socket->writeInt(rankProcessIds[i]);
257  socket->writeShort(rankThreadIds[i]);
258  }
259 
260  socket->flush();
261 
262  cout << "Waiting to send XML on port " << actualXMLPort << endl;
263  if (actualXMLPort != mainPortNumber)
264  {
265  xmlSocket->acceptSocket();
266  }
267 
268 
269  sendXML(xmlSocket);
270  }
271 
272  void Server::sendXML(DataSocketStream* xmlSocket)
273  {
274  xmlSocket->writeInt(EXML);
275  //If this overflows, we may have problems...
276  int uncompressedFileSize = FileUtils::getFileSize(controller->getExperimentXML());
277  //Set up a subscope so that the ProgressBar will get deleted (which
278  //cleans up the output) before we write to cout again.
279  {
280  ProgressBar prog("Compressing XML", uncompressedFileSize);
281  FILE* in = fopen(controller->getExperimentXML().c_str(), "r");
282  //From http://zlib.net/zpipe.c with some editing
283  z_stream compressor;
284  compressor.zalloc = Z_NULL;
285  compressor.zfree = Z_NULL;
286  compressor.opaque = Z_NULL;
287 
288  //This makes a gzip stream with a window of 15 bits
289  int ret = deflateInit2(&compressor, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 16+15, 8, Z_DEFAULT_STRATEGY);
290  if (ret != Z_OK)
291  throw ret;
292 
293  DataCompressionLayer compL(compressor, &prog);
294  compL.writeFile(in);
295 
296  fclose(in);
297  int compressedSize = compL.getOutputLength();
298  DEBUGCOUT(2)<<"Compressed XML Size: "<<compressedSize<<endl;
299 
300  xmlSocket->writeInt(compressedSize);
301  xmlSocket->writeRawData((char*)compL.getOutputBuffer(), compressedSize);
302 
303  xmlSocket->flush();
304  }
305  cout << "XML Sent" << endl;
306  }
307 
308  void Server::checkProtocolVersions(DataSocketStream* receiver)
309  {
310  int clientProtocolVersion = receiver->readInt();
311 
312  if (clientProtocolVersion != SERVER_PROTOCOL_MAX_VERSION)
313  cout << "The client is using protocol version 0x" << hex << clientProtocolVersion<<
314  " and the server supports version 0x" << SERVER_PROTOCOL_MAX_VERSION << endl;
315 
316  if (clientProtocolVersion < SERVER_PROTOCOL_MAX_VERSION) {
317  cout << "Warning: The server is running in compatibility mode." << endl;
318  agreedUponProtocolVersion = clientProtocolVersion;
319  }
320  else if (clientProtocolVersion > SERVER_PROTOCOL_MAX_VERSION) {
321  cout << "The client protocol version is not supported by this server."<<
322  "Please upgrade the server. This session may be buggy and problematic." << endl;
323  agreedUponProtocolVersion = SERVER_PROTOCOL_MAX_VERSION;
324  }
325  cout << dec;//Switch it back to decimal mode
326  }
327 
328  SpaceTimeDataController* Server::parseOpenDB(DataSocketStream* receiver)
329  {
330  checkProtocolVersions(receiver);
331 
332  string pathToDB = receiver->readString();
333  DBOpener DBO;
334  cout << "Opening database: " << pathToDB << endl;
335  SpaceTimeDataController* controller = DBO.openDbAndCreateStdc(pathToDB);
336 
337  if (controller != NULL)
338  {
339  Communication::sendParseOpenDB(pathToDB);
340  }
341 
342  return controller;
343 
344  }
345 
346  void Server::sendDBOpenFailed(DataSocketStream* socket)
347  {
348  socket->writeInt(NODB);
349  socket->writeInt(0);
350  socket->flush();
351  }
352 
353 
354 
355  void Server::getAndSendData(DataSocketStream* stream)
356  {
357  LOGTIMESTAMPEDMSG("Front end received data request.")
358  int processStart = stream->readInt();
359  int processEnd = stream->readInt();
360  Time timeStart = stream->readLong();
361  Time timeEnd = stream->readLong();
362  int verticalResolution = stream->readInt();
363  int horizontalResolution = stream->readInt();
364 
365  DEBUGCOUT(2) << "Time end: " << timeEnd <<endl;
366 
367 
368  if ((processStart < 0) || (processEnd<0) || (processStart > processEnd)
369  || (verticalResolution<0) || (horizontalResolution<0)
370  || (timeEnd < timeStart))
371  {
372  cerr
373  << "A data request with invalid parameters was received. This sometimes happens if the client shuts down in the middle of a request. The server will now shut down."
374  << endl;
376  }
377  Communication::sendStartGetData(controller, processStart, processEnd, timeStart, timeEnd, verticalResolution, horizontalResolution);
378  LOGTIMESTAMPEDMSG("Back end received data request.")
379 
380  stream->writeInt(HERE);
381  stream->flush();
382 
383  ProgressBar prog("Computing traces", min(processEnd - processStart, verticalResolution));
384 
385  Communication::sendEndGetData(stream, &prog, controller);
386 
387  }
388 
389  void Server::filter(DataSocketStream* stream)
390  {
391  stream->readByte();//Padding
392  bool excludeMatches = stream->readByte();
393  int count = stream->readShort();
394  Communication::sendStartFilter(count, excludeMatches);
395  FilterSet filters(excludeMatches);
396  for (int i = 0; i < count; ++i) {
397  BinaryRepresentationOfFilter filt;//This makes the MPI code easier and the non-mpi code about the same
398  filt.processMin = stream->readInt();
399  filt.processMax = stream->readInt();
400  filt.processStride = stream->readInt();
401  filt.threadMin = stream->readInt();
402  filt.threadMax = stream->readInt();
403  filt.threadStride = stream->readInt();
404  DEBUGCOUT(2) << "Filter proc: " << filt.processMin <<":" << filt.processMax <<":"<<filt.processStride<<",";
405  DEBUGCOUT(2) << "Filter thread: " << filt.threadMax <<":" << filt.threadMax <<":"<<filt.threadStride<<endl;
406 
407  Communication::sendFilter(filt);
408 
409  filters.add(Filter(filt));
410  }
411  controller->applyFilters(filters);
412  }
413 
414 } /* namespace TraceviewerServer */
void hpctoolkit_sampling_start(void)
Definition: start-stop.c:107
static const int DEFAULT_PORT
Definition: Args.cpp:134
void hpctoolkit_sampling_stop(void)
Definition: start-stop.c:118
SpaceTimeDataController * openDbAndCreateStdc(string)
Definition: DBOpener.cpp:84
bool useCompression
Definition: Server.cpp:89
virtual void writeRawData(char *, int)
void add(Filter toAdd)
Definition: FilterSet.hpp:78
#define DEBUGCOUT(a)
Definition: DebugUtils.hpp:72
#define NULL
Definition: ElfHelper.cpp:85
#define LOGTIMESTAMPEDMSG(msg)
Definition: DebugUtils.hpp:75