SpaceTimeDataControllerRemote.java
Go to the documentation of this file.00001 package edu.rice.cs.hpc.traceviewer.db.remote;
00002
00003 import java.io.DataOutputStream;
00004 import java.io.IOException;
00005 import java.io.InputStream;
00006 import java.util.concurrent.ConcurrentLinkedQueue;
00007 import java.util.concurrent.atomic.AtomicInteger;
00008
00009 import org.eclipse.jface.action.IStatusLineManager;
00010 import org.eclipse.ui.IWorkbenchWindow;
00011
00012 import edu.rice.cs.hpc.data.experiment.InvalExperimentException;
00013 import edu.rice.cs.hpc.data.experiment.extdata.IFilteredData;
00014 import edu.rice.cs.hpc.remote.data.RemoteFilteredBaseData;
00015 import edu.rice.cs.hpc.data.experiment.extdata.TraceName;
00016 import edu.rice.cs.hpc.traceviewer.data.timeline.ProcessTimeline;
00017 import edu.rice.cs.hpc.traceviewer.data.util.Debugger;
00018 import edu.rice.cs.hpc.traceviewer.db.remote.DecompressionThread.DecompressionItemToDo;
00019 import edu.rice.cs.hpc.traceviewer.spaceTimeData.SpaceTimeDataController;
00020
00021
00022
00023
00024
00025
00026
00027
00028 public class SpaceTimeDataControllerRemote extends SpaceTimeDataController
00029 {
00030 final RemoteDataRetriever dataRetriever;
00031
00032 private final TraceName[] valuesX;
00033 private final DataOutputStream server;
00034
00035 private ConcurrentLinkedQueue<Integer> timelineToRender;
00036
00037 public SpaceTimeDataControllerRemote(RemoteDataRetriever _dataRet, IWorkbenchWindow _window,
00038 IStatusLineManager _statusMgr, InputStream expStream, String Name, int _numTraces, TraceName[] valuesX, DataOutputStream connectionToServer)
00039 throws InvalExperimentException, Exception
00040 {
00041 super(_window, expStream, Name);
00042 dataRetriever = _dataRet;
00043
00044 this.valuesX = valuesX;
00045 server = connectionToServer;
00046
00047 super.dataTrace = createFilteredBaseData();
00048 }
00049
00050
00051 @Override
00052 public IFilteredData createFilteredBaseData() {
00053 final int headerSize = exp.getTraceAttribute().dbHeaderSize;
00054 return new RemoteFilteredBaseData(valuesX, headerSize, server);
00055 }
00056
00061 @Override
00062 public void fillTracesWithData (boolean changedBounds, int numThreadsToLaunch)
00063 throws IOException {
00064 if (changedBounds) {
00065
00066 DecompressionThread[] workThreads = new DecompressionThread[numThreadsToLaunch];
00067 final int ranksExpected = Math.min(attributes.getProcessInterval(), attributes.numPixelsV);
00068
00069 final AtomicInteger ranksRemainingToDecompress = new AtomicInteger(ranksExpected);
00070 ptlService.setProcessTimeline(new ProcessTimeline[ranksExpected]);
00071
00072
00073
00074
00075 final ConcurrentLinkedQueue<DecompressionItemToDo> workToDo = new ConcurrentLinkedQueue<DecompressionItemToDo>();
00076 timelineToRender = new ConcurrentLinkedQueue<Integer>();
00077
00078 for (int i = 0; i < workThreads.length; i++) {
00079
00080 workThreads[i] = new DecompressionThread(ptlService, getScopeMap(),
00081 attributes, workToDo, timelineToRender, ranksRemainingToDecompress,
00082 new DecompressionThreadListener());
00083 workThreads[i].start();
00084 }
00085
00086
00087 dataRetriever.getData(attributes, getScopeMap(), workToDo);
00088 }
00089 }
00090
00091
00092
00093 @Override
00094 public void dispose() {
00095
00096 super.dispose();
00097
00098 }
00099
00100 @Override
00101 public void closeDB() {
00102 try {
00103 Debugger.printDebug(1, "Closing the connection");
00104 dataRetriever.closeConnection();
00105 } catch (IOException e) {
00106 System.out.println("Could not close the connection.");
00107 }
00108 }
00109
00110 @Override
00111 public ProcessTimeline getNextTrace(boolean changedBounds) {
00112 Integer nextIndex;
00113
00114 if (changedBounds) {
00115 int i = 0;
00116
00117
00118 while ((nextIndex = timelineToRender.poll()) == null) {
00119
00120
00121 if (lineNum.get() >= ptlService.getNumProcessTimeline())
00122 return null;
00123
00124
00125 if (i++ > RemoteDataRetriever.getTimeOut()) {
00126 throw new RuntimeException("Timeout in while waiting for data from decompression thread");
00127
00128 }
00129 try {
00130 Thread.sleep(RemoteDataRetriever.getTimeSleep());
00131
00132 } catch (InterruptedException e) {
00133 e.printStackTrace();
00134 throw new RuntimeException("Thread is interrupted");
00135 }
00136 }
00137 lineNum.getAndIncrement();
00138 }
00139 else{
00140 nextIndex = lineNum.getAndIncrement();
00141 if (nextIndex >= ptlService.getNumProcessTimeline())
00142 return null;
00143 }
00144 return ptlService.getProcessTimeline(nextIndex.intValue());
00145 }
00146
00147
00148 public int getHeaderSize() {
00149 final int headerSize = exp.getTraceAttribute().dbHeaderSize;
00150 return headerSize;
00151 }
00152
00153 private class DecompressionThreadListener implements IThreadListener
00154 {
00155
00156 @Override
00157 public void notify(String msg) {
00158 throw new RuntimeException(msg);
00159
00160 }
00161
00162 }
00163
00164 @Override
00165 public String getName() {
00166 return exp.getXMLExperimentFile().getAbsolutePath();
00167 }
00168 }