HPCToolkit
Communication-MPI.cpp
Go to the documentation of this file.
1 // -*-Mode: C++;-*-
2 
3 // * BeginRiceCopyright *****************************************************
4 //
5 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Communication-MPI.cpp $
6 // $Id: Communication-MPI.cpp 4317 2013-07-25 16:32:22Z felipet1326@gmail.com $
7 //
8 // --------------------------------------------------------------------------
9 // Part of HPCToolkit (hpctoolkit.org)
10 //
11 // Information about sources of support for research and development of
12 // HPCToolkit is at 'hpctoolkit.org' and in 'README.Acknowledgments'.
13 // --------------------------------------------------------------------------
14 //
15 // Copyright ((c)) 2002-2019, Rice University
16 // All rights reserved.
17 //
18 // Redistribution and use in source and binary forms, with or without
19 // modification, are permitted provided that the following conditions are
20 // met:
21 //
22 // * Redistributions of source code must retain the above copyright
23 // notice, this list of conditions and the following disclaimer.
24 //
25 // * Redistributions in binary form must reproduce the above copyright
26 // notice, this list of conditions and the following disclaimer in the
27 // documentation and/or other materials provided with the distribution.
28 //
29 // * Neither the name of Rice University (RICE) nor the names of its
30 // contributors may be used to endorse or promote products derived from
31 // this software without specific prior written permission.
32 //
33 // This software is provided by RICE and contributors "as is" and any
34 // express or implied warranties, including, but not limited to, the
35 // implied warranties of merchantability and fitness for a particular
36 // purpose are disclaimed. In no event shall RICE or contributors be
37 // liable for any direct, indirect, incidental, special, exemplary, or
38 // consequential damages (including, but not limited to, procurement of
39 // substitute goods or services; loss of use, data, or profits; or
40 // business interruption) however caused and on any theory of liability,
41 // whether in contract, strict liability, or tort (including negligence
42 // or otherwise) arising in any way out of the use of this software, even
43 // if advised of the possibility of such damage.
44 //
45 // ******************************************************* EndRiceCopyright *
46 
47 //***************************************************************************
48 //
49 // File:
50 // $HeadURL: https://hpctoolkit.googlecode.com/svn/branches/hpctoolkit-hpcserver/src/tool/hpcserver/Communication-MPI.cpp $
51 //
52 // Purpose:
53 // The MPI implementation of the methods in Communication.hpp. These methods
54 // are called mostly by the SocketServer.
55 //
56 // Description:
57 // [The set of functions, macros, etc. defined in the file]
58 //
59 //***************************************************************************
60 
61 
62 #include "Communication.hpp"
63 #include "MPICommunication.hpp"
64 #include "Constants.hpp"
65 #include "DebugUtils.hpp"
66 #include "Server.hpp"
67 #include "Slave.hpp"
68 
69 #include <mpi.h>
70 
71 #include <iostream> //For cerr, cout
72 #include <algorithm> //For copy
73 
74 using namespace std;
75 using namespace MPI;
76 
77 namespace TraceviewerServer
78 {
79 
80 void Communication::sendParseInfo(Time minBegTime, Time maxEndTime, int headerSize)
81 {
83  Info.command = INFO;
84  Info.minfo.minBegTime = minBegTime;
85  Info.minfo.maxEndTime = maxEndTime;
86  Info.minfo.headerSize = headerSize;
87  COMM_WORLD.Bcast(&Info, sizeof(Info), MPI_PACKED, MPICommunication::SOCKET_SERVER);
88 
89 }
90 
91 void Communication::sendParseOpenDB(string pathToDB)
92 {
94  cmdPathToDB.command = OPEN;
95  if (pathToDB.length() > MAX_DB_PATH_LENGTH)
96  {
97  cerr << "Path too long" << endl;
98  throw ERROR_PATH_TOO_LONG;
99  }
100  copy(pathToDB.begin(), pathToDB.end(), cmdPathToDB.ofile.path);
101  cmdPathToDB.ofile.path[pathToDB.size()] = '\0';
102 
103  COMM_WORLD.Bcast(&cmdPathToDB, sizeof(cmdPathToDB), MPI_PACKED,
104  MPICommunication::SOCKET_SERVER);
105 
106 }
107 void Communication::sendStartGetData(SpaceTimeDataController* contr, int processStart, int processEnd,
108  Time timeStart, Time timeEnd, int verticalResolution, int horizontalResolution)
109 {
111  toBcast.command = DATA;
112  toBcast.gdata.processStart = processStart;
113  toBcast.gdata.processEnd = processEnd;
114  toBcast.gdata.timeStart = timeStart;
115  toBcast.gdata.timeEnd = timeEnd;
116  toBcast.gdata.verticalResolution = verticalResolution;
117  toBcast.gdata.horizontalResolution = horizontalResolution;
118  COMM_WORLD.Bcast(&toBcast, sizeof(toBcast), MPI_PACKED,
119  MPICommunication::SOCKET_SERVER);
120 }
121 void Communication::sendEndGetData(DataSocketStream* stream, ProgressBar* prog, SpaceTimeDataController* controller)
122 {
123  int ranksDone = 1;//1 for the MPI rank that deals with the sockets
124  int size = COMM_WORLD.Get_size();
125 
126  bool first = false;
127 
128  while (ranksDone < size)
129  {
131  COMM_WORLD.Recv(&msg, sizeof(msg), MPI_PACKED, MPI_ANY_SOURCE, MPI_ANY_TAG);
132  if (msg.tag == SLAVE_REPLY)
133  {
134  if (first)
135  {
136  LOGTIMESTAMPEDMSG("First line computed.")
137  }
138 
139  stream->writeInt(msg.data.line);
140  stream->writeInt(msg.data.entries);
141  stream->writeLong(msg.data.begtime); // Begin time
142  stream->writeLong(msg.data.endtime); //End time
143  stream->writeInt(msg.data.compressedSize);
144 
145  char CompressedTraceLine[msg.data.compressedSize];
146  COMM_WORLD.Recv(CompressedTraceLine, msg.data.compressedSize, MPI_BYTE, msg.data.rankID,
147  MPI_ANY_TAG);
148 
149  stream->writeRawData(CompressedTraceLine, msg.data.compressedSize);
150 
151  stream->flush();
152  if (first)
153  {
154  LOGTIMESTAMPEDMSG("First line sent.")
155  }
156  first = false;
157  prog->incrementProgress();
158  }
159  else if (msg.tag == SLAVE_DONE)
160  {
161  DEBUGCOUT(1) << "Rank " << msg.done.rankID << " done" << endl;
162  ranksDone++;
163  if (ranksDone == 2)
164  {
165  LOGTIMESTAMPEDMSG("First rank done.")
166  }
167  }
168  }
169  LOGTIMESTAMPEDMSG("All data done.")
170 }
171 void Communication::sendStartFilter(int count, bool excludeMatches)
172 {
174  toBcast.command = FLTR;
175  toBcast.filt.count = count;
176  toBcast.filt.excludeMatches = excludeMatches;
177  COMM_WORLD.Bcast(&toBcast, sizeof(toBcast), MPI_PACKED,
178  MPICommunication::SOCKET_SERVER);
179 
180 }
181 void Communication::sendFilter(BinaryRepresentationOfFilter filt)
182 {
183  COMM_WORLD.Bcast(&filt, sizeof(filt), MPI_PACKED, MPICommunication::SOCKET_SERVER);
184 }
185 
186 bool Communication::basicInit(int argc, char** argv)
187 {
188  MPI::Init(argc, argv);
189 
190  int size;
191 
192  size = MPI::COMM_WORLD.Get_size();
193  if (size <= 1)
194  {
195  cout << "The MPI version of hpcserver must be run with more than one process. "<<
196  "If you are looking for a single threaded version, you can compile hpcserver without MPI. "<<
197  "See the hpctoolkit documentation for more information."<<endl;
198  return false;
199  }
200  return true;
201 }
202 void Communication::run()
203 {
204  int rank;
205  rank = MPI::COMM_WORLD.Get_rank();
208  else
210 }
211 void Communication::closeServer()
212 {
213  if (COMM_WORLD.Get_rank()==MPICommunication::SOCKET_SERVER)
214  {//The slaves participate in the bcast in the slave class before they parse it
215  MPICommunication::CommandMessage serverShutdown;
216  serverShutdown.command = DONE;
217  COMM_WORLD.Bcast(&serverShutdown, sizeof(serverShutdown), MPI_PACKED, MPICommunication::SOCKET_SERVER);
218  cout<<"Server done, closing..."<<endl;
219  }
220  MPI::Finalize();
221 }
222 }
void copy(const char *dst,...)
Definition: FileUtil.cpp:233
static const unsigned int MAX_DB_PATH_LENGTH
Definition: Constants.hpp:78
virtual void writeRawData(char *, int)
void incrementProgress(ulong tasks)
Definition: ProgressBar.cpp:86
#define DEBUGCOUT(a)
Definition: DebugUtils.hpp:72
#define LOGTIMESTAMPEDMSG(msg)
Definition: DebugUtils.hpp:75