1 // -*-Mode: C++;-*-
3 // * BeginRiceCopyright *****************************************************
4 //
5 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Slave.cpp $
6 // $Id: Slave.cpp 4461 2014-03-12 16:49:28Z laksono@gmail.com $
7 //
8 // --------------------------------------------------------------------------
9 // Part of HPCToolkit (hpctoolkit.org)
10 //
11 // Information about sources of support for research and development of
12 // HPCToolkit is at 'hpctoolkit.org' and in 'README.Acknowledgments'.
13 // --------------------------------------------------------------------------
14 //
15 // Copyright ((c)) 2002-2019, Rice University
16 // All rights reserved.
17 //
18 // Redistribution and use in source and binary forms, with or without
19 // modification, are permitted provided that the following conditions are
20 // met:
21 //
22 // * Redistributions of source code must retain the above copyright
23 // notice, this list of conditions and the following disclaimer.
24 //
25 // * Redistributions in binary form must reproduce the above copyright
26 // notice, this list of conditions and the following disclaimer in the
27 // documentation and/or other materials provided with the distribution.
28 //
29 // * Neither the name of Rice University (RICE) nor the names of its
30 // contributors may be used to endorse or promote products derived from
31 // this software without specific prior written permission.
32 //
33 // This software is provided by RICE and contributors "as is" and any
34 // express or implied warranties, including, but not limited to, the
35 // implied warranties of merchantability and fitness for a particular
36 // purpose are disclaimed. In no event shall RICE or contributors be
37 // liable for any direct, indirect, incidental, special, exemplary, or
38 // consequential damages (including, but not limited to, procurement of
39 // substitute goods or services; loss of use, data, or profits; or
40 // business interruption) however caused and on any theory of liability,
41 // whether in contract, strict liability, or tort (including negligence
42 // or otherwise) arising in any way out of the use of this software, even
43 // if advised of the possibility of such damage.
44 //
45 // ******************************************************* EndRiceCopyright *
47 //***************************************************************************
48 //
49 // File:
50 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Slave.cpp $
51 //
52 // Purpose:
53 // Controls MPI processes' communication and data-getting behavior
54 //
55 // Description:
56 // [The set of functions, macros, etc. defined in the file]
57 //
58 //***************************************************************************
60 #include "Slave.hpp"
62 #include <mpi.h>
64 #include <vector>
65 #include <list>
66 #include <cmath>
67 #include <assert.h>
69 #include "TimeCPID.hpp"
70 #include "Constants.hpp"
71 #include "DBOpener.hpp"
72 #include "ImageTraceAttributes.hpp"
73 #include "DataCompressionLayer.hpp"
74 #include "Server.hpp"
75 #include "FilterSet.hpp"
76 #include "DebugUtils.hpp"
79  #include "hpctoolkit.h"
80 #endif
82 using namespace MPI;
83 using namespace std;
85 namespace TraceviewerServer
86 {
88  Slave::Slave()
89  {
90  controller= NULL;
92  run();
94  }
95  void Slave::run()
96  {
97  while (true)
98  {
100  COMM_WORLD.Bcast(&Message, sizeof(Message), MPI_PACKED,
101  MPICommunication::SOCKET_SERVER);
104 #endif
105  switch (Message.command)
106  {
107  case OPEN:
108  delete (controller);
109  {//Set an artificial context to avoid initialization crossing cases
110  DBOpener DBO;
111  controller = DBO.openDbAndCreateStdc(string(Message.ofile.path));
112  }
113  break;
114  case INFO:
115  controller->setInfo(Message.minfo.minBegTime, Message.minfo.maxEndTime,
116  Message.minfo.headerSize);
117  break;
118  case DATA:
119  {
120  int linesSent = getData(&Message);
121  MPICommunication::ResultMessage nodeFinishedMsg;
122  nodeFinishedMsg.tag = SLAVE_DONE;
123  nodeFinishedMsg.done.rankID = COMM_WORLD.Get_rank();
124  nodeFinishedMsg.done.traceLinesSent = linesSent;
125  DEBUGCOUT(1) << "Rank " << nodeFinishedMsg.done.rankID << " done, having created "
126  << linesSent << " trace lines." << endl;
128  COMM_WORLD.Send(&nodeFinishedMsg, sizeof(nodeFinishedMsg), MPI_PACKED,
129  MPICommunication::SOCKET_SERVER, 0);
130  break;
131  }
132  case FLTR:
133  {
134  FilterSet f(Message.filt.excludeMatches);
135  for (int i = 0; i < Message.filt.count; ++i) {
137  COMM_WORLD.Bcast(&b, sizeof(b), MPI_PACKED, MPICommunication::SOCKET_SERVER);
138  f.add(Filter(b));
139  }
140  controller->applyFilters(f);
141  break;
142  }
143  case DONE: //Server shutdown
144  return;
145  default:
146  cerr << "Unexpected message command: " << Message.command << endl;
147  break;
148  }
151 #endif
152  }
153  }
155  int Slave::getData(MPICommunication::CommandMessage* Message)
156  {
158  ImageTraceAttributes correspondingAttributes;
160  int trueRank = COMM_WORLD.Get_rank();
161  int size = COMM_WORLD.Get_size();
163  // Keep track of all these buffers we declare so that we can free them
164  // all at the end. Allocating them on the heap lets us put out multiple
165  // ISends and overlap computation and communication at the cost of extra
166  // memory usage (a negligible amount though: < 10 MB)
167  list<MPICommunication::ResultBufferLocations*> buffers;
169  //Gives us a contiguous count of ranks from 0 to size-2 regardless of which node is the socket server
170  //If ss = 0, they are all mapped one less. If ss = size-1, no changes happen
171  int rank = trueRank > MPICommunication::SOCKET_SERVER ? trueRank - 1 : trueRank;
173  int n = gc.processEnd - gc.processStart;
174  int p = size;
175  int mod = n % (p - 1);
176  double q = ((double) n) / (p - 1);
178  //If rank > n, there are more nodes than trace lines, so this node will do nothing
179  if (rank > n)
180  {
181  DEBUGCOUT(1) << "No work to do" << endl;
182  return 0;
183  }
185  //If rank < (n % (p-1)) this node should compute ceil(n/(p-1)) trace lines,
186  //otherwise it should compute floor(n/(p-1))
188  //=MIN(F5, $D$1)*(CEILING($B$1/($B$2-1),1)) + (F5-MIN(F5, $D$1))*(FLOOR($B$1/($B$2-1),1))
189  int LowerInclusiveBound = (int) (min(mod, rank) * ceil(q)
190  + (rank - min(mod, rank)) * floor(q) + gc.processStart);
191  int UpperInclusiveBound = (int) (min(mod, rank + 1) * ceil(q)
192  + (rank + 1 - min(mod, rank + 1)) * floor(q) - 1 + gc.processStart);
194  DEBUGCOUT(1) << "Rank " << trueRank << " is getting lines [" << LowerInclusiveBound << ", "
195  << UpperInclusiveBound << "]" << endl;
197  //These have to be the originals so that the strides will be correct
198  correspondingAttributes.begProcess = gc.processStart;
199  correspondingAttributes.endProcess = gc.processEnd;
200  correspondingAttributes.numPixelsH = gc.horizontalResolution;
201  //double processsamplefreq = ((double)gc.verticalResolution)/(p-1);
202  //correspondingAttributes.numPixelsV = /*rank< mod*/ true ? ceil(processsamplefreq) : floor(processsamplefreq);
203  correspondingAttributes.numPixelsV = gc.verticalResolution;
205  correspondingAttributes.begTime = gc.timeStart;
206  correspondingAttributes.endTime = gc.timeEnd;
208  int totalTraces = min(correspondingAttributes.numPixelsV, n);
209  int autoskip;
210  /* The work distribution is a tiny bit irregular because we distribute based on process number,
211  * which because of the striding can lead to an occasional +- 1. I spent a while trying to get the
212  * skipping exact, but I don't think it's worth it. We just need to get it close, and then we can
213  * do an additional O(1) step. Even though this solution is not as elegant, the big difference is
214  * that this brings it down to O(1) from O(n).*/
216  autoskip = (int)floor(rank*totalTraces/(size - 1.0));
218  DEBUGCOUT(2) << "Was going to autoskip " <<autoskip << " traces."<<endl;
220  correspondingAttributes.lineNum = autoskip;
222  *controller->attributes = correspondingAttributes;
224  ProcessTimeline* nextTrace = controller->getNextTrace();
225  int LinesSentCount = 0;
226  int waitcount = 0;
228  //If it were NULL, the no rank check would have caught it.
229  assert (nextTrace != NULL);
232  while (nextTrace != NULL)
233  {
235  if ((nextTrace->data->rank < LowerInclusiveBound)
236  || (nextTrace->data->rank > UpperInclusiveBound))
237  {
238  nextTrace = controller->getNextTrace();
239  waitcount++;
240  continue;
241  }
242  if (waitcount != 0)
243  {
244  DEBUGCOUT(2) << trueRank << " skipped " << waitcount
245  << " processes before actually starting work. Autoskip was " <<autoskip<< " and actual skip was " << controller->attributes->lineNum -1 << endl;
247  waitcount = 0;
248  }
249  nextTrace->readInData();
251  vector<TimeCPID> ActualData = *nextTrace->data->listCPID;
256  locs->header = msg;
258  msg->tag = SLAVE_REPLY;
259  msg->data.line = nextTrace->line();
260  int entries = ActualData.size();
261  msg->data.entries = entries;
263  msg->data.begtime = ActualData[0].timestamp;
264  msg->data.endtime = ActualData[entries - 1].timestamp;
265  msg->data.rankID = trueRank;
268  int i = 0;
270  unsigned char* outputBuffer = NULL;
271  DataCompressionLayer* compr = NULL;
272  int outputBufferLen;
273  if (useCompression)
274  {
275  compr = new DataCompressionLayer();
277  locs->compressed = true;
278  locs->compMsg = compr;
280  Time currentTimestamp = msg->data.begtime;
281  for (i = 0; i < entries; i++)
282  {
283  compr->writeInt((int) (ActualData[i].timestamp - currentTimestamp));
284  compr->writeInt(ActualData[i].cpid);
285  currentTimestamp = ActualData[i].timestamp;
286  }
287  compr->flush();
288  outputBufferLen = compr->getOutputLength();
289  outputBuffer = compr->getOutputBuffer();
290  }
291  else
292  {
294  outputBuffer = new unsigned char[entries*SIZEOF_DELTASAMPLE];
296  locs->compressed = false;
297  locs->message = outputBuffer;
299  char* ptrToFirstElem = (char*)&(outputBuffer[0]);
300  char* currentPtr = ptrToFirstElem;
301  Time currentTimestamp = msg->data.begtime;
302  for (i = 0; i < entries; i++)
303  {
304  int deltaTimestamp = ActualData[i].timestamp - currentTimestamp;
305  ByteUtilities::writeInt(currentPtr, deltaTimestamp);
306  currentPtr += SIZEOF_INT;
307  ByteUtilities::writeInt(currentPtr, ActualData[i].cpid);
308  currentPtr += SIZEOF_INT;
309  }
310  outputBufferLen = entries*SIZEOF_DELTASAMPLE;
311  }
315  msg->data.compressedSize = outputBufferLen;
316  locs->headerRequest = COMM_WORLD.Isend(msg, sizeof(*msg), MPI_PACKED,
317  MPICommunication::SOCKET_SERVER, 0);
319  locs->bodyRequest = COMM_WORLD.Isend(outputBuffer, outputBufferLen,
320  MPI_BYTE, MPICommunication::SOCKET_SERVER, 0);
322  LinesSentCount++;
323  buffers.push_back(locs);
325  cleanSent(buffers, false);
327  if (LinesSentCount % 100 == 0)
328  DEBUGCOUT(2) << trueRank << " Has sent " << LinesSentCount
329  << " ranks." << endl;
331  delete nextTrace;
332  nextTrace = controller->getNextTrace();
333  }
334  //Clean up all our MPI buffers.
335  cleanSent(buffers, true);
338  return LinesSentCount;
339  }
340  void Slave::cleanSent(list<MPICommunication::ResultBufferLocations*>& buffers, bool wait)
341  {
343  while (!buffers.empty())
344  {
345  current = buffers.front();
347  bool okayToDelete = current->headerRequest.Test() && current->bodyRequest.Test();
348  if (!wait && !okayToDelete) return;
350  if (!okayToDelete)
351  {
352  current->headerRequest.Wait();
353  current->bodyRequest.Wait();
354  }
355  //Now it is safe to delete everything
356  delete (current->header);
357  if (current->compressed)
358  delete (current->compMsg);
359  else
360  delete (current->message);
361  delete (current);
362  buffers.pop_front();
363  }
364  }
365  Slave::~Slave()
366  {
367  delete (controller);
368  }
370 } /* namespace TraceviewerServer */
