1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.util.Arrays;
21  import java.util.HashSet;
22  import java.util.Set;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  import org.apache.commons.lang.math.RandomUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HRegionLocation;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.client.Get;
32  
33  import org.apache.hadoop.hbase.client.Consistency;
34  import org.apache.hadoop.hbase.client.HTableInterface;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38  
39  
40  public class MultiThreadedReader extends MultiThreadedAction
41  {
42    private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
43  
44    protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
45    private final double verifyPercent;
46    protected volatile boolean aborted;
47  
48    protected MultiThreadedWriterBase writer = null;
49  
50    
51  
52  
53  
54  
55    private final AtomicLong numUniqueKeysVerified = new AtomicLong();
56  
57    
58  
59  
60  
61    public static final int DEFAULT_MAX_ERRORS = 10;
62  
63    
64  
65  
66  
67  
68  
69    public static final int DEFAULT_KEY_WINDOW = 0;
70  
71    
72  
73  
74    public static final int DEFAULT_BATCH_SIZE = 1; 
75  
76    protected AtomicLong numKeysVerified = new AtomicLong(0);
77    protected AtomicLong numReadErrors = new AtomicLong(0);
78    protected AtomicLong numReadFailures = new AtomicLong(0);
79    protected AtomicLong nullResult = new AtomicLong(0);
80    private int maxErrors = DEFAULT_MAX_ERRORS;
81    private int keyWindow = DEFAULT_KEY_WINDOW;
82    private int batchSize = DEFAULT_BATCH_SIZE;
83    private int regionReplicaId = -1; 
84  
85    public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
86        TableName tableName, double verifyPercent) throws IOException {
87      super(dataGen, conf, tableName, "R");
88      this.verifyPercent = verifyPercent;
89    }
90  
91    public void linkToWriter(MultiThreadedWriterBase writer) {
92      this.writer = writer;
93      writer.setTrackWroteKeys(true);
94    }
95  
96    public void setMaxErrors(int maxErrors) {
97      this.maxErrors = maxErrors;
98    }
99  
100   public void setKeyWindow(int keyWindow) {
101     this.keyWindow = keyWindow;
102   }
103 
104   public void setMultiGetBatchSize(int batchSize) {
105     this.batchSize = batchSize;
106   }
107 
108   public void setRegionReplicaId(int regionReplicaId) {
109     this.regionReplicaId = regionReplicaId;
110   }
111 
112   @Override
113   public void start(long startKey, long endKey, int numThreads) throws IOException {
114     super.start(startKey, endKey, numThreads);
115     if (verbose) {
116       LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
117     }
118 
119     addReaderThreads(numThreads);
120     startThreads(readers);
121   }
122 
123   protected void addReaderThreads(int numThreads) throws IOException {
124     for (int i = 0; i < numThreads; ++i) {
125       HBaseReaderThread reader = createReaderThread(i);
126       readers.add(reader);
127     }
128   }
129 
130   protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
131     HBaseReaderThread reader = new HBaseReaderThread(readerId);
132     Threads.setLoggingUncaughtExceptionHandler(reader);
133     return reader;
134   }
135 
136   public class HBaseReaderThread extends Thread {
137     protected final int readerId;
138     protected final Table table;
139 
140     
141     private long curKey;
142 
143     
144     protected long startTimeMs;
145 
146     
147     private boolean readingRandomKey;
148 
149     private boolean printExceptionTrace = true;
150 
151     
152 
153 
154 
155     public HBaseReaderThread(int readerId) throws IOException {
156       this.readerId = readerId;
157       table = createTable();
158       setName(getClass().getSimpleName() + "_" + readerId);
159     }
160 
161     protected HTableInterface createTable() throws IOException {
162       return connection.getTable(tableName);
163     }
164 
165     @Override
166     public void run() {
167       try {
168         runReader();
169       } finally {
170         closeTable();
171         numThreadsWorking.decrementAndGet();
172       }
173     }
174 
175     protected void closeTable() {
176       try {
177         if (table != null) {
178           table.close();
179         }
180       } catch (IOException e) {
181         LOG.error("Error closing table", e);
182       }
183     }
184 
185     private void runReader() {
186       if (verbose) {
187         LOG.info("Started thread #" + readerId + " for reads...");
188       }
189 
190       startTimeMs = System.currentTimeMillis();
191       curKey = startKey;
192       long [] keysForThisReader = new long[batchSize];
193       while (curKey < endKey && !aborted) {
194         int readingRandomKeyStartIndex = -1;
195         int numKeys = 0;
196         
197         do {
198           long k = getNextKeyToRead();
199           if (k < startKey || k >= endKey) {
200             numReadErrors.incrementAndGet();
201             throw new AssertionError("Load tester logic error: proposed key " +
202                 "to read " + k + " is out of range (startKey=" + startKey +
203                 ", endKey=" + endKey + ")");
204           }
205           if (k % numThreads != readerId ||
206               writer != null && writer.failedToWriteKey(k)) {
207             
208             
209             continue;
210           }
211           keysForThisReader[numKeys] = k;
212           if (readingRandomKey && readingRandomKeyStartIndex == -1) {
213             
214             readingRandomKeyStartIndex = numKeys;
215           }
216           numKeys++;
217         } while (numKeys < batchSize && curKey < endKey && !aborted);
218 
219         if (numKeys > 0) { 
220           readKey(keysForThisReader);
221           
222           numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
223               numKeys : readingRandomKeyStartIndex);
224         }
225       }
226     }
227 
228     
229 
230 
231 
232 
233     private long maxKeyWeCanRead() {
234       long insertedUpToKey = writer.wroteUpToKey();
235       if (insertedUpToKey >= endKey - 1) {
236         
237         
238         return endKey - 1;
239       }
240       return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
241     }
242 
243     protected long getNextKeyToRead() {
244       readingRandomKey = false;
245       if (writer == null || curKey <= maxKeyWeCanRead()) {
246         return curKey++;
247       }
248 
249       
250       long maxKeyToRead;
251       while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
252         
253         
254         
255         Threads.sleepWithoutInterrupt(50);
256       }
257 
258       if (curKey <= maxKeyToRead) {
259         
260         
261         return curKey++;
262       }
263 
264       
265       
266       
267       
268       readingRandomKey = true;
269       return startKey + Math.abs(RandomUtils.nextLong())
270           % (maxKeyToRead - startKey + 1);
271     }
272 
273     private Get[] readKey(long[] keysToRead) {
274       Get [] gets = new Get[keysToRead.length];
275       int i = 0;
276       for (long keyToRead : keysToRead) {
277         try {
278           gets[i] = createGet(keyToRead);
279           if (keysToRead.length == 1) {
280             queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead);
281           }
282           i++;
283         } catch (IOException e) {
284           numReadFailures.addAndGet(1);
285           LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
286               + ", time from start: "
287               + (System.currentTimeMillis() - startTimeMs) + " ms");
288           if (printExceptionTrace) {
289             LOG.warn(e);
290             printExceptionTrace = false;
291           }
292         }
293       }
294       if (keysToRead.length > 1) {
295         try {
296           queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead);
297         } catch (IOException e) {
298           numReadFailures.addAndGet(gets.length);
299           for (long keyToRead : keysToRead) {
300             LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
301                 + ", time from start: "
302                 + (System.currentTimeMillis() - startTimeMs) + " ms");
303           }
304           if (printExceptionTrace) {
305             LOG.warn(e);
306             printExceptionTrace = false;
307           }
308         }
309       }
310       return gets;
311     }
312 
313     protected Get createGet(long keyToRead) throws IOException {
314       Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
315       String cfsString = "";
316       byte[][] columnFamilies = dataGenerator.getColumnFamilies();
317       for (byte[] cf : columnFamilies) {
318         get.addFamily(cf);
319         if (verbose) {
320           if (cfsString.length() > 0) {
321             cfsString += ", ";
322           }
323           cfsString += "[" + Bytes.toStringBinary(cf) + "]";
324         }
325       }
326       get = dataGenerator.beforeGet(keyToRead, get);
327       if (regionReplicaId > 0) {
328         get.setReplicaId(regionReplicaId);
329         get.setConsistency(Consistency.TIMELINE);
330       }
331       if (verbose) {
332         LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
333       }
334       return get;
335     }
336 
337     public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
338       
339       long start = System.nanoTime();
340       
341       Result[] results = table.get(Arrays.asList(gets));
342       long end = System.nanoTime();
343       verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
344     }
345 
346     public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
347       
348 
349       long start = System.nanoTime();
350       
351       Result result = table.get(get);
352       long end = System.nanoTime();
353       verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
354     }
355 
356     protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
357         Result[] results, Table table, boolean isNullExpected)
358         throws IOException {
359       totalOpTimeMs.addAndGet(elapsedNano / 1000000);
360       numKeys.addAndGet(gets.length);
361       int i = 0;
362       for (Result result : results) {
363         verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
364             isNullExpected);
365       }
366     }
367 
368     protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
369         Result result, Table table, boolean isNullExpected)
370         throws IOException {
371       verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
372           new Result[]{result}, table, isNullExpected);
373     }
374 
375     private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
376         Result result, Table table, boolean isNullExpected) throws IOException {
377       if (!result.isEmpty()) {
378         if (verify) {
379           numKeysVerified.incrementAndGet();
380         }
381       } else {
382         HRegionLocation hloc = connection.getRegionLocation(tableName,
383           get.getRow(), false);
384         String rowKey = Bytes.toString(get.getRow());
385         LOG.info("Key = " + rowKey + ", Region location: " + hloc);
386         if(isNullExpected) {
387           nullResult.incrementAndGet();
388           LOG.debug("Null result obtained for the key ="+rowKey);
389           return;
390         }
391       }
392       boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
393       long numErrorsAfterThis = 0;
394       if (isOk) {
395         long cols = 0;
396         
397         for (byte[] cf : result.getMap().keySet()) {
398           cols += result.getFamilyMap(cf).size();
399         }
400         numCols.addAndGet(cols);
401       } else {
402         if (writer != null) {
403           LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
404         }
405         numErrorsAfterThis = numReadErrors.incrementAndGet();
406       }
407 
408       if (numErrorsAfterThis > maxErrors) {
409         LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
410         aborted = true;
411       }
412     }
413   }
414 
415   public long getNumReadFailures() {
416     return numReadFailures.get();
417   }
418 
419   public long getNumReadErrors() {
420     return numReadErrors.get();
421   }
422 
423   public long getNumKeysVerified() {
424     return numKeysVerified.get();
425   }
426 
427   public long getNumUniqueKeysVerified() {
428     return numUniqueKeysVerified.get();
429   }
430 
431   public long getNullResultsCount() {
432     return nullResult.get();
433   }
434 
435   @Override
436   protected String progressInfo() {
437     StringBuilder sb = new StringBuilder();
438     appendToStatus(sb, "verified", numKeysVerified.get());
439     appendToStatus(sb, "READ FAILURES", numReadFailures.get());
440     appendToStatus(sb, "READ ERRORS", numReadErrors.get());
441     appendToStatus(sb, "NULL RESULT", nullResult.get());
442     return sb.toString();
443   }
444 }