1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.Random;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.commons.math.random.RandomData;
27  import org.apache.commons.math.random.RandomDataImpl;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32  import org.apache.hadoop.io.MapFile;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.io.WritableComparable;
35  
36  
37  
38  
39  
40  
41  
42  @Deprecated
43  public class MapFilePerformanceEvaluation {
44    protected final Configuration conf;
45    private static final int ROW_LENGTH = 10;
46    private static final int ROW_COUNT = 100000;
47  
48    static final Log LOG =
49      LogFactory.getLog(MapFilePerformanceEvaluation.class.getName());
50  
51    
52  
53  
54    public MapFilePerformanceEvaluation(final Configuration c) {
55      super();
56      this.conf = c;
57    }
58  
59    static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
60      String v = Integer.toString(i);
61      w.set(Bytes.toBytes("0000000000".substring(v.length()) + v));
62      return w;
63    }
64  
65    private void runBenchmarks() throws Exception {
66      final FileSystem fs = FileSystem.get(this.conf);
67      final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
68      if (fs.exists(mf)) {
69        fs.delete(mf, true);
70      }
71      runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT),
72          ROW_COUNT);
73  
74      PerformanceEvaluationCommons.concurrentReads(new Runnable() {
75        @Override
76        public void run() {
77          try {
78            runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
79              ROW_COUNT);
80          } catch (Exception e) {
81            e.printStackTrace();
82          }
83        }
84      });
85      PerformanceEvaluationCommons.concurrentReads(new Runnable() {
86        @Override
87        public void run() {
88          try {
89            runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
90                ROW_COUNT);
91          } catch (Exception e) {
92            e.printStackTrace();
93          }
94        }
95      });
96      PerformanceEvaluationCommons.concurrentReads(new Runnable() {
97        @Override
98        public void run() {
99          try {
100           runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
101               ROW_COUNT);
102         } catch (Exception e) {
103           e.printStackTrace();
104         }
105       }
106     });
107     PerformanceEvaluationCommons.concurrentReads(new Runnable() {
108       @Override
109       public void run() {
110         try {
111           runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
112               ROW_COUNT);
113         } catch (Exception e) {
114           e.printStackTrace();
115         }
116       }
117     });
118   }
119 
120   protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount)
121     throws Exception {
122     LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
123         rowCount + " rows.");
124     long elapsedTime = benchmark.run();
125     LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
126         rowCount + " rows took " + elapsedTime + "ms.");
127   }
128 
129   static abstract class RowOrientedBenchmark {
130 
131     protected final Configuration conf;
132     protected final FileSystem fs;
133     protected final Path mf;
134     protected final int totalRows;
135 
136     public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
137         int totalRows) {
138       this.conf = conf;
139       this.fs = fs;
140       this.mf = mf;
141       this.totalRows = totalRows;
142     }
143 
144     void setUp() throws Exception {
145       
146     }
147 
148     abstract void doRow(int i) throws Exception;
149 
150     protected int getReportingPeriod() {
151       return this.totalRows / 10;
152     }
153 
154     void tearDown() throws Exception {
155       
156     }
157 
158     
159 
160 
161 
162 
163     long run() throws Exception {
164       long elapsedTime;
165       setUp();
166       long startTime = System.currentTimeMillis();
167       try {
168         for (int i = 0; i < totalRows; i++) {
169           if (i > 0 && i % getReportingPeriod() == 0) {
170             LOG.info("Processed " + i + " rows.");
171           }
172           doRow(i);
173         }
174         elapsedTime = System.currentTimeMillis() - startTime;
175       } finally {
176         tearDown();
177       }
178       return elapsedTime;
179     }
180 
181   }
182 
183   static class SequentialWriteBenchmark extends RowOrientedBenchmark {
184 
185     protected MapFile.Writer writer;
186     private Random random = new Random();
187     private byte[] bytes = new byte[ROW_LENGTH];
188     private ImmutableBytesWritable key = new ImmutableBytesWritable();
189     private ImmutableBytesWritable value = new ImmutableBytesWritable();
190 
191     public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
192         int totalRows) {
193       super(conf, fs, mf, totalRows);
194     }
195 
196     @Override
197     void setUp() throws Exception {
198       writer = new MapFile.Writer(conf, fs, mf.toString(),
199         ImmutableBytesWritable.class, ImmutableBytesWritable.class);
200     }
201 
202     @Override
203     void doRow(int i) throws Exception {
204       value.set(generateValue());
205       writer.append(format(i, key), value);
206     }
207 
208     private byte[] generateValue() {
209       random.nextBytes(bytes);
210       return bytes;
211     }
212 
213     @Override
214     protected int getReportingPeriod() {
215       return this.totalRows; 
216     }
217 
218     @Override
219     void tearDown() throws Exception {
220       writer.close();
221     }
222 
223   }
224 
225   static abstract class ReadBenchmark extends RowOrientedBenchmark {
226     ImmutableBytesWritable key = new ImmutableBytesWritable();
227     ImmutableBytesWritable value = new ImmutableBytesWritable();
228 
229     protected MapFile.Reader reader;
230 
231     public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
232         int totalRows) {
233       super(conf, fs, mf, totalRows);
234     }
235 
236     @Override
237     void setUp() throws Exception {
238       reader = new MapFile.Reader(fs, mf.toString(), conf);
239     }
240 
241     @Override
242     void tearDown() throws Exception {
243       reader.close();
244     }
245 
246   }
247 
248   static class SequentialReadBenchmark extends ReadBenchmark {
249     ImmutableBytesWritable verify = new ImmutableBytesWritable();
250 
251     public SequentialReadBenchmark(Configuration conf, FileSystem fs,
252         Path mf, int totalRows) {
253       super(conf, fs, mf, totalRows);
254     }
255 
256     @Override
257     void doRow(int i) throws Exception {
258       this.reader.next(key, value);
259       PerformanceEvaluationCommons.assertKey(this.key.get(),
260         format(i, this.verify).get());
261       PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, value.getLength());
262     }
263 
264     @Override
265     protected int getReportingPeriod() {
266       return this.totalRows; 
267     }
268 
269   }
270 
271   static class UniformRandomReadBenchmark extends ReadBenchmark {
272 
273     private Random random = new Random();
274 
275     public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
276         Path mf, int totalRows) {
277       super(conf, fs, mf, totalRows);
278     }
279 
280     @Override
281     void doRow(int i) throws Exception {
282       ImmutableBytesWritable k = getRandomRow();
283       ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
284       PerformanceEvaluationCommons.assertValueSize(r.getLength(), ROW_LENGTH);
285     }
286 
287     private ImmutableBytesWritable getRandomRow() {
288       return format(random.nextInt(totalRows), key);
289     }
290 
291   }
292 
293   static class UniformRandomSmallScan extends ReadBenchmark {
294     private Random random = new Random();
295 
296     public UniformRandomSmallScan(Configuration conf, FileSystem fs,
297         Path mf, int totalRows) {
298       super(conf, fs, mf, totalRows/10);
299     }
300 
301     @Override
302     void doRow(int i) throws Exception {
303       ImmutableBytesWritable ibw = getRandomRow();
304       WritableComparable<?> wc = this.reader.getClosest(ibw, this.value);
305       if (wc == null) {
306         throw new NullPointerException();
307       }
308       PerformanceEvaluationCommons.assertKey(ibw.get(),
309         ((ImmutableBytesWritable)wc).get());
310       
311       for (int ii = 0; ii < 29; ii++) {
312         this.reader.next(this.key, this.value);
313         PerformanceEvaluationCommons.assertValueSize(this.value.getLength(), ROW_LENGTH);
314       }
315     }
316 
317     private ImmutableBytesWritable getRandomRow() {
318       return format(random.nextInt(totalRows), key);
319     }
320   }
321 
322   static class GaussianRandomReadBenchmark extends ReadBenchmark {
323     private RandomData randomData = new RandomDataImpl();
324 
325     public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
326         Path mf, int totalRows) {
327       super(conf, fs, mf, totalRows);
328     }
329 
330     @Override
331     void doRow(int i) throws Exception {
332       ImmutableBytesWritable k = getGaussianRandomRow();
333       ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
334       PerformanceEvaluationCommons.assertValueSize(r.getLength(), ROW_LENGTH);
335     }
336 
337     private ImmutableBytesWritable getGaussianRandomRow() {
338       int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
339           (double)totalRows / 10.0);
340       return format(r, key);
341     }
342 
343   }
344 
345   
346 
347 
348 
349 
350   public static void main(String[] args) throws Exception {
351     new MapFilePerformanceEvaluation(HBaseConfiguration.create()).
352       runBenchmarks();
353   }
354 }