HPCToolkit
Slave.cpp
Go to the documentation of this file.
1 // -*-Mode: C++;-*-
2 
3 // * BeginRiceCopyright *****************************************************
4 //
5 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Slave.cpp $
6 // $Id: Slave.cpp 4461 2014-03-12 16:49:28Z laksono@gmail.com $
7 //
8 // --------------------------------------------------------------------------
9 // Part of HPCToolkit (hpctoolkit.org)
10 //
11 // Information about sources of support for research and development of
12 // HPCToolkit is at 'hpctoolkit.org' and in 'README.Acknowledgments'.
13 // --------------------------------------------------------------------------
14 //
15 // Copyright ((c)) 2002-2019, Rice University
16 // All rights reserved.
17 //
18 // Redistribution and use in source and binary forms, with or without
19 // modification, are permitted provided that the following conditions are
20 // met:
21 //
22 // * Redistributions of source code must retain the above copyright
23 // notice, this list of conditions and the following disclaimer.
24 //
25 // * Redistributions in binary form must reproduce the above copyright
26 // notice, this list of conditions and the following disclaimer in the
27 // documentation and/or other materials provided with the distribution.
28 //
29 // * Neither the name of Rice University (RICE) nor the names of its
30 // contributors may be used to endorse or promote products derived from
31 // this software without specific prior written permission.
32 //
33 // This software is provided by RICE and contributors "as is" and any
34 // express or implied warranties, including, but not limited to, the
35 // implied warranties of merchantability and fitness for a particular
36 // purpose are disclaimed. In no event shall RICE or contributors be
37 // liable for any direct, indirect, incidental, special, exemplary, or
38 // consequential damages (including, but not limited to, procurement of
39 // substitute goods or services; loss of use, data, or profits; or
40 // business interruption) however caused and on any theory of liability,
41 // whether in contract, strict liability, or tort (including negligence
42 // or otherwise) arising in any way out of the use of this software, even
43 // if advised of the possibility of such damage.
44 //
45 // ******************************************************* EndRiceCopyright *
46 
47 //***************************************************************************
48 //
49 // File:
50 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Slave.cpp $
51 //
52 // Purpose:
53 // Controls MPI processes' communication and data-getting behavior
54 //
55 // Description:
56 // [The set of functions, macros, etc. defined in the file]
57 //
58 //***************************************************************************
59 
60 #include "Slave.hpp"
61 
62 #include <mpi.h>
63 
64 #include <vector>
65 #include <list>
66 #include <cmath>
67 #include <assert.h>
68 
69 #include "TimeCPID.hpp"
70 #include "Constants.hpp"
71 #include "DBOpener.hpp"
72 #include "ImageTraceAttributes.hpp"
73 #include "DataCompressionLayer.hpp"
74 #include "Server.hpp"
75 #include "FilterSet.hpp"
76 #include "DebugUtils.hpp"
77 
78 #ifdef HPCTOOLKIT_PROFILE
79  #include "hpctoolkit.h"
80 #endif
81 
82 using namespace MPI;
83 using namespace std;
84 
85 namespace TraceviewerServer
86 {
87 
88  Slave::Slave()
89  {
90  controller= NULL;
91 
92  run();
93 
94  }
95  void Slave::run()
96  {
97  while (true)
98  {
100  COMM_WORLD.Bcast(&Message, sizeof(Message), MPI_PACKED,
101  MPICommunication::SOCKET_SERVER);
102 #ifdef HPCTOOLKIT_PROFILE
104 #endif
105  switch (Message.command)
106  {
107  case OPEN:
108  delete (controller);
109  {//Set an artificial context to avoid initialization crossing cases
110  DBOpener DBO;
111  controller = DBO.openDbAndCreateStdc(string(Message.ofile.path));
112  }
113  break;
114  case INFO:
115  controller->setInfo(Message.minfo.minBegTime, Message.minfo.maxEndTime,
116  Message.minfo.headerSize);
117  break;
118  case DATA:
119  {
120  int linesSent = getData(&Message);
121  MPICommunication::ResultMessage nodeFinishedMsg;
122  nodeFinishedMsg.tag = SLAVE_DONE;
123  nodeFinishedMsg.done.rankID = COMM_WORLD.Get_rank();
124  nodeFinishedMsg.done.traceLinesSent = linesSent;
125  DEBUGCOUT(1) << "Rank " << nodeFinishedMsg.done.rankID << " done, having created "
126  << linesSent << " trace lines." << endl;
127 
128  COMM_WORLD.Send(&nodeFinishedMsg, sizeof(nodeFinishedMsg), MPI_PACKED,
129  MPICommunication::SOCKET_SERVER, 0);
130  break;
131  }
132  case FLTR:
133  {
134  FilterSet f(Message.filt.excludeMatches);
135  for (int i = 0; i < Message.filt.count; ++i) {
137  COMM_WORLD.Bcast(&b, sizeof(b), MPI_PACKED, MPICommunication::SOCKET_SERVER);
138  f.add(Filter(b));
139  }
140  controller->applyFilters(f);
141  break;
142  }
143  case DONE: //Server shutdown
144  return;
145  default:
146  cerr << "Unexpected message command: " << Message.command << endl;
147  break;
148  }
149 #ifdef HPCTOOLKIT_PROFILE
151 #endif
152  }
153  }
154 
155  int Slave::getData(MPICommunication::CommandMessage* Message)
156  {
158  ImageTraceAttributes correspondingAttributes;
159 
160  int trueRank = COMM_WORLD.Get_rank();
161  int size = COMM_WORLD.Get_size();
162 
163  // Keep track of all these buffers we declare so that we can free them
164  // all at the end. Allocating them on the heap lets us put out multiple
165  // ISends and overlap computation and communication at the cost of extra
166  // memory usage (a negligible amount though: < 10 MB)
167  list<MPICommunication::ResultBufferLocations*> buffers;
168 
169  //Gives us a contiguous count of ranks from 0 to size-2 regardless of which node is the socket server
170  //If ss = 0, they are all mapped one less. If ss = size-1, no changes happen
171  int rank = trueRank > MPICommunication::SOCKET_SERVER ? trueRank - 1 : trueRank;
172 
173  int n = gc.processEnd - gc.processStart;
174  int p = size;
175  int mod = n % (p - 1);
176  double q = ((double) n) / (p - 1);
177 
178  //If rank > n, there are more nodes than trace lines, so this node will do nothing
179  if (rank > n)
180  {
181  DEBUGCOUT(1) << "No work to do" << endl;
182  return 0;
183  }
184 
185  //If rank < (n % (p-1)) this node should compute ceil(n/(p-1)) trace lines,
186  //otherwise it should compute floor(n/(p-1))
187 
188  //=MIN(F5, $D$1)*(CEILING($B$1/($B$2-1),1)) + (F5-MIN(F5, $D$1))*(FLOOR($B$1/($B$2-1),1))
189  int LowerInclusiveBound = (int) (min(mod, rank) * ceil(q)
190  + (rank - min(mod, rank)) * floor(q) + gc.processStart);
191  int UpperInclusiveBound = (int) (min(mod, rank + 1) * ceil(q)
192  + (rank + 1 - min(mod, rank + 1)) * floor(q) - 1 + gc.processStart);
193 
194  DEBUGCOUT(1) << "Rank " << trueRank << " is getting lines [" << LowerInclusiveBound << ", "
195  << UpperInclusiveBound << "]" << endl;
196 
197  //These have to be the originals so that the strides will be correct
198  correspondingAttributes.begProcess = gc.processStart;
199  correspondingAttributes.endProcess = gc.processEnd;
200  correspondingAttributes.numPixelsH = gc.horizontalResolution;
201  //double processsamplefreq = ((double)gc.verticalResolution)/(p-1);
202  //correspondingAttributes.numPixelsV = /*rank< mod*/ true ? ceil(processsamplefreq) : floor(processsamplefreq);
203  correspondingAttributes.numPixelsV = gc.verticalResolution;
204 
205  correspondingAttributes.begTime = gc.timeStart;
206  correspondingAttributes.endTime = gc.timeEnd;
207 
208  int totalTraces = min(correspondingAttributes.numPixelsV, n);
209  int autoskip;
210  /* The work distribution is a tiny bit irregular because we distribute based on process number,
211  * which because of the striding can lead to an occasional +- 1. I spent a while trying to get the
212  * skipping exact, but I don't think it's worth it. We just need to get it close, and then we can
213  * do an additional O(1) step. Even though this solution is not as elegant, the big difference is
214  * that this brings it down to O(1) from O(n).*/
215 
216  autoskip = (int)floor(rank*totalTraces/(size - 1.0));
217 
218  DEBUGCOUT(2) << "Was going to autoskip " <<autoskip << " traces."<<endl;
219 
220  correspondingAttributes.lineNum = autoskip;
221 
222  *controller->attributes = correspondingAttributes;
223 
224  ProcessTimeline* nextTrace = controller->getNextTrace();
225  int LinesSentCount = 0;
226  int waitcount = 0;
227 
228  //If it were NULL, the no rank check would have caught it.
229  assert (nextTrace != NULL);
230 
231 
232  while (nextTrace != NULL)
233  {
234 
235  if ((nextTrace->data->rank < LowerInclusiveBound)
236  || (nextTrace->data->rank > UpperInclusiveBound))
237  {
238  nextTrace = controller->getNextTrace();
239  waitcount++;
240  continue;
241  }
242  if (waitcount != 0)
243  {
244  DEBUGCOUT(2) << trueRank << " skipped " << waitcount
245  << " processes before actually starting work. Autoskip was " <<autoskip<< " and actual skip was " << controller->attributes->lineNum -1 << endl;
246 
247  waitcount = 0;
248  }
249  nextTrace->readInData();
250 
251  vector<TimeCPID> ActualData = *nextTrace->data->listCPID;
252 
254 
256  locs->header = msg;
257 
258  msg->tag = SLAVE_REPLY;
259  msg->data.line = nextTrace->line();
260  int entries = ActualData.size();
261  msg->data.entries = entries;
262 
263  msg->data.begtime = ActualData[0].timestamp;
264  msg->data.endtime = ActualData[entries - 1].timestamp;
265  msg->data.rankID = trueRank;
266 
267 
268  int i = 0;
269 
270  unsigned char* outputBuffer = NULL;
271  DataCompressionLayer* compr = NULL;
272  int outputBufferLen;
273  if (useCompression)
274  {
275  compr = new DataCompressionLayer();
276 
277  locs->compressed = true;
278  locs->compMsg = compr;
279 
280  Time currentTimestamp = msg->data.begtime;
281  for (i = 0; i < entries; i++)
282  {
283  compr->writeInt((int) (ActualData[i].timestamp - currentTimestamp));
284  compr->writeInt(ActualData[i].cpid);
285  currentTimestamp = ActualData[i].timestamp;
286  }
287  compr->flush();
288  outputBufferLen = compr->getOutputLength();
289  outputBuffer = compr->getOutputBuffer();
290  }
291  else
292  {
293 
294  outputBuffer = new unsigned char[entries*SIZEOF_DELTASAMPLE];
295 
296  locs->compressed = false;
297  locs->message = outputBuffer;
298 
299  char* ptrToFirstElem = (char*)&(outputBuffer[0]);
300  char* currentPtr = ptrToFirstElem;
301  Time currentTimestamp = msg->data.begtime;
302  for (i = 0; i < entries; i++)
303  {
304  int deltaTimestamp = ActualData[i].timestamp - currentTimestamp;
305  ByteUtilities::writeInt(currentPtr, deltaTimestamp);
306  currentPtr += SIZEOF_INT;
307  ByteUtilities::writeInt(currentPtr, ActualData[i].cpid);
308  currentPtr += SIZEOF_INT;
309  }
310  outputBufferLen = entries*SIZEOF_DELTASAMPLE;
311  }
312 
313 
314 
315  msg->data.compressedSize = outputBufferLen;
316  locs->headerRequest = COMM_WORLD.Isend(msg, sizeof(*msg), MPI_PACKED,
317  MPICommunication::SOCKET_SERVER, 0);
318 
319  locs->bodyRequest = COMM_WORLD.Isend(outputBuffer, outputBufferLen,
320  MPI_BYTE, MPICommunication::SOCKET_SERVER, 0);
321 
322  LinesSentCount++;
323  buffers.push_back(locs);
324 
325  cleanSent(buffers, false);
326 
327  if (LinesSentCount % 100 == 0)
328  DEBUGCOUT(2) << trueRank << " Has sent " << LinesSentCount
329  << " ranks." << endl;
330 
331  delete nextTrace;
332  nextTrace = controller->getNextTrace();
333  }
334  //Clean up all our MPI buffers.
335  cleanSent(buffers, true);
336 
337 
338  return LinesSentCount;
339  }
340  void Slave::cleanSent(list<MPICommunication::ResultBufferLocations*>& buffers, bool wait)
341  {
343  while (!buffers.empty())
344  {
345  current = buffers.front();
346 
347  bool okayToDelete = current->headerRequest.Test() && current->bodyRequest.Test();
348  if (!wait && !okayToDelete) return;
349 
350  if (!okayToDelete)
351  {
352  current->headerRequest.Wait();
353  current->bodyRequest.Wait();
354  }
355  //Now it is safe to delete everything
356  delete (current->header);
357  if (current->compressed)
358  delete (current->compMsg);
359  else
360  delete (current->message);
361  delete (current);
362  buffers.pop_front();
363  }
364  }
365  Slave::~Slave()
366  {
367  delete (controller);
368  }
369 
370 } /* namespace TraceviewerServer */
void hpctoolkit_sampling_start(void)
Definition: start-stop.c:107
void hpctoolkit_sampling_stop(void)
Definition: start-stop.c:118
SpaceTimeDataController * openDbAndCreateStdc(string)
Definition: DBOpener.cpp:84
#define SIZEOF_INT
Definition: Constants.hpp:67
bool useCompression
Definition: Server.cpp:89
#define SIZEOF_DELTASAMPLE
Definition: Constants.hpp:71
void add(Filter toAdd)
Definition: FilterSet.hpp:78
#define DEBUGCOUT(a)
Definition: DebugUtils.hpp:72
#define NULL
Definition: ElfHelper.cpp:85
<!-- ********************************************************************--> n<!-- HPCToolkit Experiment DTD --> n<!-- Version 2.1 --> n<!-- ********************************************************************--> n<!ELEMENT HPCToolkitExperiment(Header,(SecCallPathProfile|SecFlatProfile) *)> n<!ATTLIST HPCToolkitExperiment\n version CDATA #REQUIRED > n n<!-- ******************************************************************--> n n<!-- Info/NV:flexible name-value pairs:(n) ame;(t) ype;(v) alue --> n<!ELEMENT Info(NV *)> n<!ATTLIST Info\n n CDATA #IMPLIED > n<!ELEMENT NV EMPTY > n<!ATTLIST NV\n n CDATA #REQUIRED\n t CDATA #IMPLIED\n v CDATA #REQUIRED > n n<!-- ******************************************************************--> n<!-- Header --> n<!-- ******************************************************************--> n<!ELEMENT Header(Info *)> n<!ATTLIST Header\n n CDATA #REQUIRED > n n<!-- ******************************************************************--> n<!-- Section Header --> n<!-- ******************************************************************--> n<!ELEMENT SecHeader(MetricTable?, MetricDBTable?, TraceDBTable?, LoadModuleTable?, FileTable?, ProcedureTable?, Info *)> n n<!-- MetricTable:--> n<!ELEMENT MetricTable(Metric) * > n n<!-- Metric:(i) d;(n) ame --> n<!--(v) alue-type:transient type of values --> n<!--(t) ype:persistent type of metric --> n<!-- fmt:format;show;--> n<!ELEMENT Metric(MetricFormula *, Info?)> n<!ATTLIST Metric\n i CDATA #REQUIRED\n n CDATA #REQUIRED\n es CDATA #IMPLIED\n em CDATA #IMPLIED\n ep CDATA #IMPLIED\n v(raw|final|derived-incr|derived) \"raw\\ t (inclusive|exclusive|nil) \nil\\ partner CDATA #IMPLIED\ fmt CDATA #IMPLIED\ show (1|0) \1\\ show-percent (1|0) \1> n n<!-- MetricFormula represents derived metrics: (t)ype; (frm): formula --> n<!ELEMENT MetricFormula (Info?)> n<!ATTLIST MetricFormula\ t (combine|finalize) \finalize\\ i CDATA #IMPLIED\ frm CDATA #REQUIRED> n n<!-- Metric data, used in sections: (n)ame [from Metric]; (v)alue --> n<!ELEMENT M EMPTY> n<!ATTLIST M\ n CDATA #REQUIRED\ v CDATA #REQUIRED> n n<!-- MetricDBTable: --> n<!ELEMENT MetricDBTable (MetricDB)*> n n<!-- MetricDB: (i)d; (n)ame --> n<!-- (t)ype: persistent type of metric --> n<!-- db-glob: file glob describing files in metric db --> n<!-- db-id: id within metric db --> n<!-- db-num-metrics: number of metrics in db --> n<!-- db-header-sz: size (in bytes) of a db file header --> n<!ELEMENT MetricDB EMPTY> n<!ATTLIST MetricDB\ i CDATA #REQUIRED\ n CDATA #REQUIRED\ t (inclusive|exclusive|nil) \nil\\ partner CDATA #IMPLIED\ db-glob CDATA #IMPLIED\ db-id CDATA #IMPLIED\ db-num-metrics CDATA #IMPLIED\ db-header-sz CDATA #IMPLIED> n n<!-- TraceDBTable: --> n<!ELEMENT TraceDBTable (TraceDB)> n n<!-- TraceDB: (i)d --> n<!-- db-min-time: min beginning time stamp (global) --> n<!-- db-max-time: max ending time stamp (global) --> n<!ELEMENT TraceDB EMPTY> n<!ATTLIST TraceDB\ i CDATA #REQUIRED\ db-glob CDATA #IMPLIED\ db-min-time CDATA #IMPLIED\ db-max-time CDATA #IMPLIED\ db-header-sz CDATA #IMPLIED> n n<!-- LoadModuleTable assigns a short name to a load module --> n<!ELEMENT LoadModuleTable (LoadModule)*> n n<!ELEMENT LoadModule (Info?)> n<!ATTLIST LoadModule\ i CDATA #REQUIRED\ n CDATA #REQUIRED> n n<!-- FileTable assigns a short name to a file --> n<!ELEMENT FileTable (File)*> n n<!ELEMENT File (Info?)> n<!ATTLIST File\ i CDATA #REQUIRED\ n CDATA #REQUIRED> n n<!-- ProcedureTable assigns a short name to a procedure --> n<!ELEMENT ProcedureTable (Procedure)*> n n<!-- Info/NV: flexible name-value pairs: (n)ame; (t)ype; (v)alue --> n<!-- f: family of the procedure (fake, root, ...)--> n<!ELEMENT Procedure (Info?)> n<!ATTLIST Procedure\ i CDATA #REQUIRED\ n CDATA #REQUIRED\ f CDATA #IMPLIED> n n<!-- ****************************************************************** --> n<!-- Section: Call path profile --> n<!-- ****************************************************************** --> n<!ELEMENT SecCallPathProfile (SecHeader, SecCallPathProfileData)> n<!ATTLIST SecCallPathProfile\ i CDATA #REQUIRED\ n CDATA #REQUIRED> n n<!ELEMENT SecCallPathProfileData (PF|M)*> n<!-- Procedure frame --> n<!-- (i)d: unique identifier for cross referencing --> n<!-- (s)tatic scope id --> n<!-- (n)ame: a string or an id in ProcedureTable --> n<!-- (lm) load module: a string or an id in LoadModuleTable --> n<!-- (f)ile name: a string or an id in LoadModuleTable --> n<!-- (l)ine range: \beg-end\ (inclusive range) --> n<!-- (a)lien: whether frame is alien to enclosing P --> n<!-- (str)uct: hpcstruct node id --> n<!-- (t)ype: hpcrun node type: memory access, variable declaration, ... --> n<!-- (v)ma-range-set: \{[beg-end), [beg-end)...}\ --> n<!ELEMENT PF (PF|Pr|L|C|S|M)*> n<!ATTLIST PF\ i CDATA #IMPLIED\ s CDATA #IMPLIED\ n CDATA #REQUIRED\ lm CDATA #IMPLIED\ f CDATA #IMPLIED\ l CDATA #IMPLIED\ str CDATA #IMPLIED\ v CDATA #IMPLIED> n<!-- Procedure (static): GOAL: replace with 'P' --> n<!ELEMENT Pr (Pr|L|C|S|M)*> n<!ATTLIST Pr\ i CDATA #IMPLIED\ s CDATA #IMPLIED\ n CDATA #REQUIRED\ lm CDATA #IMPLIED\ f CDATA #IMPLIED\ l CDATA #IMPLIED\ a (1|0) \0\\ str CDATA #IMPLIED\ v CDATA #IMPLIED> n<!-- Callsite (a special StatementRange) --> n<!ELEMENT C (PF|M)*> n<!ATTLIST C\ i CDATA #IMPLIED\ s CDATA #IMPLIED\ l CDATA #IMPLIED\ str CDATA #IMPLIED\ v CDATA #IMPLIED> n n<!-- ****************************************************************** --> n<!-- Section: Flat profile --> n<!-- ****************************************************************** --> n<!ELEMENT SecFlatProfile (SecHeader, SecFlatProfileData)> n<!ATTLIST SecFlatProfile\ i CDATA #REQUIRED\ n CDATA #REQUIRED> n n<!ELEMENT SecFlatProfileData (LM|M)*> n<!-- Load module: (i)d; (n)ame; (v)ma-range-set --> n<!ELEMENT LM (F|P|M)*> n<!ATTLIST LM\ i CDATA #IMPLIED\ n CDATA #REQUIRED\ v CDATA #IMPLIED> n<!-- File --> n<!ELEMENT F (P|L|S|M)*> n<!ATTLIST F\ i CDATA #IMPLIED\ n CDATA #REQUIRED> n<!-- Procedure (Note 1) --> n<!ELEMENT P (P|A|L|S|C|M)*> n<!ATTLIST P\ i CDATA #IMPLIED\ n CDATA #REQUIRED\ l CDATA #IMPLIED\ str CDATA #IMPLIED\ v CDATA #IMPLIED> n<!-- Alien (Note 1) --> n<!ELEMENT A (A|L|S|C|M)*> n<!ATTLIST A\ i CDATA #IMPLIED\ f CDATA #IMPLIED\ n CDATA #IMPLIED\ l CDATA #IMPLIED\ str CDATA #IMPLIED\ v CDATA #IMPLIED> n<!-- Loop (Note 1,2) --> n<!ELEMENT L (A|Pr|L|S|C|M)*> n<!ATTLIST L\ i CDATA #IMPLIED\ s CDATA #IMPLIED\ l CDATA #IMPLIED\ f CDATA #IMPLIED\ str CDATA #IMPLIED\ v CDATA #IMPLIED> n<!-- Statement (Note 2) --> n<!-- (it): trace record identifier --> n<!ELEMENT S (S|M)*> n<!ATTLIST S\ i CDATA #IMPLIED\ it CDATA #IMPLIED\ s CDATA #IMPLIED\ l CDATA #IMPLIED\ str CDATA #IMPLIED\ v CDATA #IMPLIED> n<!-- Note 1: Contained Cs may not contain PFs --> n<!-- Note 2: The 's' attribute is not used for flat profiles --> n