SpaceTimeDataControllerRemote.java

Go to the documentation of this file.
00001 package edu.rice.cs.hpc.traceviewer.db.remote;
00002 
00003 import java.io.DataOutputStream;
00004 import java.io.IOException;
00005 import java.io.InputStream;
00006 import java.util.concurrent.ConcurrentLinkedQueue;
00007 import java.util.concurrent.atomic.AtomicInteger;
00008 
00009 import org.eclipse.jface.action.IStatusLineManager;
00010 import org.eclipse.ui.IWorkbenchWindow;
00011 
00012 import edu.rice.cs.hpc.data.experiment.InvalExperimentException;
00013 import edu.rice.cs.hpc.data.experiment.extdata.IFilteredData;
00014 import edu.rice.cs.hpc.remote.data.RemoteFilteredBaseData;
00015 import edu.rice.cs.hpc.data.experiment.extdata.TraceName;
00016 import edu.rice.cs.hpc.traceviewer.data.timeline.ProcessTimeline;
00017 import edu.rice.cs.hpc.traceviewer.data.util.Debugger;
00018 import edu.rice.cs.hpc.traceviewer.db.remote.DecompressionThread.DecompressionItemToDo;
00019 import edu.rice.cs.hpc.traceviewer.spaceTimeData.SpaceTimeDataController;
00020 
00021 
00022 /**************************************************
00023  * The remote data version of the Data controller
00024  * 
00025  * @author Philip Taffet
00026  * 
00027  *************************************************/
00028 public class SpaceTimeDataControllerRemote extends SpaceTimeDataController 
00029 {   
00030     final RemoteDataRetriever dataRetriever;
00031 
00032     private final TraceName[]  valuesX;
00033     private final DataOutputStream server;
00034     
00035     private ConcurrentLinkedQueue<Integer> timelineToRender;
00036 
00037     public SpaceTimeDataControllerRemote(RemoteDataRetriever _dataRet, IWorkbenchWindow _window,
00038             IStatusLineManager _statusMgr, InputStream expStream, String Name, int _numTraces, TraceName[] valuesX, DataOutputStream connectionToServer) 
00039                     throws InvalExperimentException, Exception 
00040     {
00041         super(_window, expStream, Name);
00042         dataRetriever = _dataRet;
00043 
00044         this.valuesX = valuesX;
00045         server = connectionToServer;
00046 
00047         super.dataTrace = createFilteredBaseData();
00048     }
00049 
00050     
00051     @Override
00052     public IFilteredData createFilteredBaseData() {
00053         final int headerSize = exp.getTraceAttribute().dbHeaderSize;
00054         return new RemoteFilteredBaseData(valuesX, headerSize, server);
00055     }
00056 
00061     @Override
00062     public void fillTracesWithData (boolean changedBounds, int numThreadsToLaunch) 
00063         throws IOException {
00064         if (changedBounds) {
00065             
00066             DecompressionThread[] workThreads = new DecompressionThread[numThreadsToLaunch];
00067             final int ranksExpected = Math.min(attributes.getProcessInterval(), attributes.numPixelsV);
00068             
00069             final AtomicInteger ranksRemainingToDecompress = new AtomicInteger(ranksExpected);
00070             ptlService.setProcessTimeline(new ProcessTimeline[ranksExpected]);
00071             
00072             // The variable workToDo needs to be accessible across different objects:
00073             // RemoteDataRetriever: producer
00074             // DecompressionThread: consumer
00075             final ConcurrentLinkedQueue<DecompressionItemToDo> workToDo = new ConcurrentLinkedQueue<DecompressionItemToDo>();
00076             timelineToRender  = new ConcurrentLinkedQueue<Integer>();
00077 
00078             for (int i = 0; i < workThreads.length; i++) {
00079 
00080                 workThreads[i] = new DecompressionThread(ptlService, getScopeMap(),
00081                         attributes, workToDo, timelineToRender, ranksRemainingToDecompress,
00082                         new DecompressionThreadListener());
00083                 workThreads[i].start();
00084             }
00085             
00086 
00087             dataRetriever.getData(attributes, getScopeMap(), workToDo);
00088         }
00089     }
00090 
00091     
00092     
00093     @Override
00094     public void dispose() {
00095         //closeDB();
00096         super.dispose();
00097 
00098     }
00099     
00100     @Override
00101     public void closeDB() {
00102         try {
00103             Debugger.printDebug(1, "Closing the connection");
00104             dataRetriever.closeConnection();
00105         } catch (IOException e) {
00106             System.out.println("Could not close the connection.");
00107         }
00108     }
00109 
00110     @Override
00111     public ProcessTimeline getNextTrace(boolean changedBounds) {
00112         Integer nextIndex;
00113 
00114         if (changedBounds) {
00115             int i = 0;
00116             
00117             // TODO: Should this be implemented with real locking?
00118             while ((nextIndex = timelineToRender.poll()) == null) {
00119                 //Make sure a different thread didn't get the last one while 
00120                 //this thread was waiting:
00121                 if (lineNum.get() >= ptlService.getNumProcessTimeline())
00122                     return null;
00123                 
00124                 // check for the timeout
00125                 if (i++ > RemoteDataRetriever.getTimeOut()) {
00126                     throw new RuntimeException("Timeout in while waiting for data from decompression thread");
00127                     //return null;
00128                 }
00129                 try {
00130                     Thread.sleep(RemoteDataRetriever.getTimeSleep());
00131 
00132                 } catch (InterruptedException e) {
00133                     e.printStackTrace();
00134                     throw new RuntimeException("Thread is interrupted");
00135                 }
00136             }
00137             lineNum.getAndIncrement();
00138         }
00139         else{
00140             nextIndex = lineNum.getAndIncrement();
00141             if (nextIndex >= ptlService.getNumProcessTimeline())
00142                 return null;
00143         }
00144         return ptlService.getProcessTimeline(nextIndex.intValue());
00145     }
00146 
00147 
00148     public int getHeaderSize() {
00149         final int headerSize = exp.getTraceAttribute().dbHeaderSize;
00150         return headerSize;
00151     }
00152     
00153     private class DecompressionThreadListener implements IThreadListener
00154     {
00155 
00156         @Override
00157         public void notify(String msg) {
00158             throw new RuntimeException(msg);
00159             //System.err.println("Error in Decompression: " + msg);
00160         }
00161         
00162     }
00163 
00164     @Override
00165     public String getName() {
00166         return exp.getXMLExperimentFile().getAbsolutePath();
00167     }
00168 }

Generated on 5 May 2015 for HPCVIEWER by  doxygen 1.6.1