DecompressionThread.java
Go to the documentation of this file.00001 package edu.rice.cs.hpc.traceviewer.db.remote;
00002
00003 import java.io.ByteArrayInputStream;
00004 import java.io.DataInputStream;
00005 import java.io.IOException;
00006 import java.util.HashMap;
00007 import java.util.concurrent.ConcurrentLinkedQueue;
00008 import java.util.concurrent.atomic.AtomicInteger;
00009 import java.util.zip.InflaterInputStream;
00010
00011 import edu.rice.cs.hpc.traceviewer.data.db.TraceDataByRank;
00012 import edu.rice.cs.hpc.traceviewer.data.db.DataRecord;
00013 import edu.rice.cs.hpc.traceviewer.painter.ImageTraceAttributes;
00014 import edu.rice.cs.hpc.traceviewer.services.ProcessTimelineService;
00015 import edu.rice.cs.hpc.traceviewer.data.graph.CallPath;
00016 import edu.rice.cs.hpc.traceviewer.data.timeline.ProcessTimeline;
00017 import edu.rice.cs.hpc.traceviewer.data.util.Constants;
00018 import edu.rice.cs.hpc.traceviewer.data.util.Debugger;
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 public class DecompressionThread extends Thread {
00033
00034 final private ConcurrentLinkedQueue<DecompressionItemToDo> workToDo;
00035 final private ConcurrentLinkedQueue<Integer> timelinesAvailableForRendering;
00036
00037 final ProcessTimelineService timelineServ;
00038 final HashMap<Integer, CallPath> scopeMap;
00039
00040 final ImageTraceAttributes attributes;
00041
00042 private final IThreadListener listener;
00043
00044 public final static int COMPRESSION_TYPE_MASK = 0xFFFF;
00045 public final static short ZLIB_COMPRESSSED = 1;
00046
00047 static boolean first = true;
00048
00049 final private AtomicInteger ranksRemainingToDecompress;
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 public DecompressionThread(
00064 ProcessTimelineService ptlService,
00065 HashMap<Integer, CallPath> _scopeMap,
00066 ImageTraceAttributes attributes,
00067 ConcurrentLinkedQueue<DecompressionItemToDo> workToDo,
00068 ConcurrentLinkedQueue<Integer> timelinesAvailableForRendering,
00069 AtomicInteger ranksRemainingToDecompress,
00070 IThreadListener listener) {
00071 timelineServ = ptlService;
00072 scopeMap = _scopeMap;
00073
00074 this.attributes = attributes;
00075 this.workToDo = workToDo;
00076 this.timelinesAvailableForRendering = timelinesAvailableForRendering;
00077 this.ranksRemainingToDecompress = ranksRemainingToDecompress;
00078 this.listener = listener;
00079 }
00080
00081
00082
00091
00092
00093
00094
00095 @Override
00096 public void run() {
00097 int i = 0;
00098 while (ranksRemainingToDecompress.get() > 0)
00099 {
00100 DecompressionItemToDo wi = workToDo.poll();
00101 if (wi == null)
00102 {
00103 if ( i++ > RemoteDataRetriever.getTimeOut() ) {
00104
00105 break;
00106 }
00107
00108
00109 try {
00110 Thread.sleep( RemoteDataRetriever.getTimeSleep() );
00111
00112 } catch (InterruptedException e) {
00113
00114 e.printStackTrace();
00115 break;
00116 }
00117 } else {
00118 i = 0;
00119 if (first){
00120 first = false;
00121 Debugger.printTimestampDebug("First decompression beginning.");
00122 }
00123 ranksRemainingToDecompress.getAndDecrement();
00124 DecompressionItemToDo toDecomp = (DecompressionItemToDo)wi;
00125 try {
00126 decompress(toDecomp);
00127 } catch (IOException e) {
00128
00129 Debugger.printDebug(1, "IO Exception in decompression algorithm.");
00130 e.printStackTrace();
00131 break;
00132 }
00133 }
00134 }
00135 if (ranksRemainingToDecompress.get() > 0) {
00136 listener.notify("Decompression error due to time out");
00137 }
00138 }
00139
00140 private void decompress(DecompressionItemToDo toDecomp) throws IOException
00141 {
00142 DataRecord[] ranksData = readTimeCPIDArray(toDecomp.packet, toDecomp.itemCount, toDecomp.startTime, toDecomp.endTime, toDecomp.compressed);
00143 TraceDataByRank dataAsTraceDBR = new TraceDataByRank(ranksData);
00144
00145 int lineNumber = toDecomp.rankNumber;
00146
00147
00148
00149
00150
00151 ProcessTimeline ptl = new ProcessTimeline(dataAsTraceDBR, scopeMap, lineNumber,
00152 attributes.numPixelsH, attributes.getTimeInterval(), attributes.getTimeBegin());
00153
00154 timelineServ.setProcessTimeline(lineNumber, ptl);
00155 timelinesAvailableForRendering.add(lineNumber);
00156 }
00157
00168 private DataRecord[] readTimeCPIDArray(byte[] packedTraceLine, int length, long t0, long tn, int compressed) throws IOException {
00169
00170 DataInputStream decompressor;
00171 if ((compressed & COMPRESSION_TYPE_MASK) == ZLIB_COMPRESSSED)
00172 decompressor= new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(packedTraceLine)));
00173 else
00174 decompressor = new DataInputStream(new ByteArrayInputStream(packedTraceLine));
00175 DataRecord[] toReturn = new DataRecord[length];
00176 long currentTime = t0;
00177 for (int i = 0; i < toReturn.length; i++) {
00178
00179
00180
00181
00182
00183
00184
00185
00186 int deltaT = decompressor.readInt();
00187 currentTime += deltaT;
00188 int CPID = decompressor.readInt();
00189
00190
00191 toReturn[i] = new DataRecord(currentTime, CPID, Constants.dataIdxNULL);
00192 }
00193 decompressor.close();
00194 return toReturn;
00195 }
00196
00197
00198
00199
00200 public static class DecompressionItemToDo {
00201 final byte[] packet;
00202 final int itemCount;
00203 final long startTime, endTime;
00204 final int rankNumber;
00205 final int compressed;
00206 public DecompressionItemToDo(byte[] _packet, int _itemCount, long _startTime, long _endTime, int _rankNumber, int _compressionType) {
00207 packet = _packet;
00208 itemCount = _itemCount;
00209 startTime = _startTime;
00210 endTime = _endTime;
00211 rankNumber = _rankNumber;
00212 compressed = _compressionType;
00213 }
00214 }
00215 }