RemoteDataRetriever.java
Go to the documentation of this file.00001 package edu.rice.cs.hpc.traceviewer.db.remote;
00002
00003 import java.io.BufferedInputStream;
00004 import java.io.BufferedOutputStream;
00005 import java.io.DataInputStream;
00006 import java.io.DataOutputStream;
00007 import java.io.IOException;
00008 import java.net.Socket;
00009 import java.util.HashMap;
00010 import java.util.concurrent.ConcurrentLinkedQueue;
00011
00012 import org.eclipse.core.runtime.IProgressMonitor;
00013 import org.eclipse.core.runtime.IStatus;
00014 import org.eclipse.core.runtime.Status;
00015 import org.eclipse.core.runtime.jobs.Job;
00016 import org.eclipse.swt.widgets.Shell;
00017 import edu.rice.cs.hpc.traceviewer.data.graph.CallPath;
00018 import edu.rice.cs.hpc.traceviewer.data.util.Constants;
00019 import edu.rice.cs.hpc.traceviewer.data.util.Debugger;
00020 import edu.rice.cs.hpc.traceviewer.db.remote.DecompressionThread.DecompressionItemToDo;
00021 import edu.rice.cs.hpc.traceviewer.painter.ImageTraceAttributes;
00022
00033 public class RemoteDataRetriever {
00034
00035
00036
00037
00038
00039
00040 private static final int DATA = 0x44415441;
00041 private static final int HERE = 0x48455245;
00042
00043
00044
00045
00046 private static final int TIME_OUT = 2000;
00047
00048 private static final int TIME_SLEEP = 50;
00049
00050
00051
00052
00053
00054 private final Socket socket;
00055 DataInputStream receiver;
00056 BufferedInputStream rcvBacking;
00057 DataOutputStream sender;
00058
00059 final int compressionType;
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071 public RemoteDataRetriever(Socket _serverConnection, Shell _shell, int _compressionType) throws IOException {
00072 socket = _serverConnection;
00073
00074 compressionType = _compressionType;
00075
00076 rcvBacking = new BufferedInputStream(socket.getInputStream());
00077 receiver = new DataInputStream(rcvBacking);
00078
00079 sender = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
00080 }
00081
00082
00094 public void getData( ImageTraceAttributes attributes,
00095 HashMap<Integer, CallPath> _scopeMap,
00096 final ConcurrentLinkedQueue<DecompressionItemToDo> workToDo) throws IOException
00097 {
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107 int P0 = attributes.getProcessBegin();
00108 int Pn = attributes.getProcessEnd();
00109 long t0 = attributes.getTimeBegin();
00110 long tn = attributes.getTimeEnd();
00111 int vertRes = attributes.numPixelsV;
00112 int horizRes = attributes.numPixelsH;
00113
00114 Debugger.printTimestampDebug("Requesting data");
00115 requestData(P0, Pn, t0, tn, vertRes, horizRes);
00116 Debugger.printTimestampDebug("Data request finished");
00117
00118 int responseCommand = waitAndReadInt(receiver);
00119 if (responseCommand != HERE)
00120 throw new IOException("The server did not send back data");
00121
00122 Debugger.printTimestampDebug("Data receive begin");
00123
00124 final int ranksExpected = Math.min(Pn-P0, vertRes);
00125
00126 Job unpacker = new Job("Receiving data") {
00127
00128 @Override
00129 public IStatus run(IProgressMonitor monitor) {
00130 DataInputStream dataReader;
00131 int ranksReceived = 0;
00132 dataReader = receiver;
00133 boolean first = true;
00134
00135 try {
00136 while (ranksReceived < ranksExpected) {
00137
00138 int rankNumber = dataReader.readInt();
00139 if (first){
00140 Debugger.printTimestampDebug("First real data byte received.");
00141 first = false;
00142 }
00143 int length = dataReader.readInt();
00144
00145 long startTimeForThisTimeline = dataReader.readLong();
00146 long endTimeForThisTimeline = dataReader.readLong();
00147 int compressedSize = dataReader.readInt();
00148
00149
00150
00151
00152 if (compressedSize >0) {
00153 byte[] compressedTraceLine = new byte[compressedSize];
00154
00155 int numRead = 0;
00156 while (numRead < compressedSize) {
00157 numRead += dataReader.read(compressedTraceLine,
00158 numRead, compressedSize - numRead);
00159
00160 }
00161
00162 workToDo.add(
00163 new DecompressionThread.DecompressionItemToDo(
00164 compressedTraceLine, length,
00165 startTimeForThisTimeline,
00166 endTimeForThisTimeline, rankNumber,
00167 compressionType));
00168
00169 ranksReceived++;
00170 monitor.worked(1);
00171 }
00172 }
00173 } catch (IOException e) {
00174
00175 e.printStackTrace();
00176 }
00177 monitor.done();
00178
00179
00180
00181 Debugger.printTimestampDebug("Data receive end");
00182 return Status.OK_STATUS;
00183 }
00184 };
00185 unpacker.setUser(true);
00186 unpacker.schedule();
00187
00188 }
00189
00190
00191 private void requestData(int P0, int Pn, long t0, long tn, int vertRes,
00192 int horizRes) throws IOException {
00193 sender.writeInt(DATA);
00194 sender.writeInt(P0);
00195 sender.writeInt(Pn);
00196 sender.writeLong(t0);
00197 sender.writeLong(tn);
00198 sender.writeInt(vertRes);
00199 sender.writeInt(horizRes);
00200
00201 sender.flush();
00202 }
00203
00204
00205 static int waitAndReadInt(DataInputStream receiver)
00206 throws IOException {
00207 int nextCommand;
00208 int timeout = 0;
00209
00210
00211
00212 while (receiver.available() <= 4
00213 || ((nextCommand = receiver.readInt()) == 0)) {
00214
00215 if (timeout++ > TIME_OUT) {
00216 throw new IOException("Timeout: no response from the server.");
00217 }
00218 try {
00219 Thread.sleep(TIME_SLEEP);
00220 } catch (InterruptedException e) {
00221
00222 e.printStackTrace();
00223 }
00224 }
00225 if (receiver.available() < 4)
00226
00227
00228
00229
00230 {
00231 timeout = 0;
00232 if (timeout++ > TIME_OUT) {
00233 throw new IOException("Timeout while waiting for command: no response from the server.");
00234 }
00235
00236 receiver.read(new byte[receiver.available()]);
00237
00238 while (receiver.available() <= 0) {
00239
00240 try {
00241 Thread.sleep(TIME_SLEEP);
00242 } catch (InterruptedException e) {
00243 e.printStackTrace();
00244 }
00245 }
00246 nextCommand = receiver.readInt();
00247 }
00248 return nextCommand;
00249 }
00250 public void closeConnection() throws IOException {
00251 sender.writeInt(Constants.DONE);
00252 sender.flush();
00253 sender.close();
00254 receiver.close();
00255 socket.close();
00256 }
00257
00258
00259 static public int getTimeSleep() {
00260 return TIME_SLEEP;
00261 }
00262
00263 static public int getTimeOut() {
00264 return TIME_OUT;
00265 }
00266 }