1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.Set;
23  
24  import org.apache.commons.cli.CommandLine;
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
31  import org.apache.hadoop.hbase.regionserver.HStore;
32  import org.apache.hadoop.hbase.regionserver.StoreEngine;
33  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
34  import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
35  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.MultiThreadedAction;
38  import org.apache.hadoop.hbase.util.MultiThreadedReader;
39  import org.apache.hadoop.hbase.util.MultiThreadedWriter;
40  import org.apache.hadoop.hbase.util.RegionSplitter;
41  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
42  import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
43  import org.junit.Assert;
44  
45  
46  
47  
48  
49  @InterfaceAudience.Private
50  public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
51    private static final Log LOG = LogFactory.getLog(StripeCompactionsPerformanceEvaluation.class);
52    private static final TableName TABLE_NAME =
53      TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName());
54    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
55    private static final int MIN_NUM_SERVERS = 1;
56  
57    
58    private static final String DATAGEN_KEY = "datagen";
59    private static final String ITERATIONS_KEY = "iters";
60    private static final String PRELOAD_COUNT_KEY = "pwk";
61    private static final String WRITE_COUNT_KEY = "wk";
62    private static final String WRITE_THREADS_KEY = "wt";
63    private static final String READ_THREADS_KEY = "rt";
64    private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
65    private static final String SPLIT_SIZE_KEY = "splitsize";
66    private static final String SPLIT_PARTS_KEY = "splitparts";
67    private static final String VALUE_SIZE_KEY = "valsize";
68    private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
69  
70    
71    private LoadTestDataGenerator dataGen;
72    private int iterationCount;
73    private long preloadKeys;
74    private long writeKeys;
75    private int writeThreads;
76    private int readThreads;
77    private Long initialStripeCount;
78    private Long splitSize;
79    private Long splitParts;
80  
81    private static final String VALUE_SIZE_DEFAULT = "512:4096";
82  
83    protected IntegrationTestingUtility util = new IntegrationTestingUtility();
84  
85    @Override
86    protected void addOptions() {
87      addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
88      addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
89          + " sequences. The number of such shards per server is specified (default is 1).");
90      addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
91      addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
92      addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
93      addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
94      addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
95      addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
96      addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
97      addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
98      addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
99          + " default " + VALUE_SIZE_DEFAULT);
100   }
101 
102   @Override
103   protected void processOptions(CommandLine cmd) {
104     int minValueSize = 0, maxValueSize = 0;
105     String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
106     if (valueSize.contains(":")) {
107       String[] valueSizes = valueSize.split(":");
108       if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize);
109       minValueSize = Integer.parseInt(valueSizes[0]);
110       maxValueSize = Integer.parseInt(valueSizes[1]);
111     } else {
112       minValueSize = maxValueSize = Integer.parseInt(valueSize);
113     }
114     String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase();
115     if ("default".equals(datagen)) {
116       dataGen = new MultiThreadedAction.DefaultDataGenerator(
117           minValueSize, maxValueSize, 1, 1, new byte[][] { COLUMN_FAMILY });
118     } else if ("sequential".equals(datagen)) {
119       int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
120       dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
121     } else {
122       throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
123     }
124     iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
125     preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
126     writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
127     writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
128     readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
129     initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
130     splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
131     splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
132   }
133 
134   private Long getLongOrNull(CommandLine cmd, String option) {
135     if (!cmd.hasOption(option)) return null;
136     return Long.parseLong(cmd.getOptionValue(option));
137   }
138 
139   @Override
140   public Configuration getConf() {
141     Configuration c = super.getConf();
142     if (c == null && util != null) {
143       conf = util.getConfiguration();
144       c = conf;
145     }
146     return c;
147   }
148 
149   @Override
150   protected int doWork() throws Exception {
151     setUp();
152     try {
153       boolean isStripe = true;
154       for (int i = 0; i < iterationCount * 2; ++i) {
155         createTable(isStripe);
156         runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
157         isStripe = !isStripe;
158       }
159       return 0;
160     } finally {
161       tearDown();
162     }
163   }
164 
165 
166   private void setUp() throws Exception {
167     this.util = new IntegrationTestingUtility();
168     LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
169     util.initializeCluster(MIN_NUM_SERVERS);
170     LOG.debug("Done initializing/checking cluster");
171   }
172 
173   protected void deleteTable() throws Exception {
174     if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
175       LOG.info("Deleting table");
176       if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) {
177         util.getHBaseAdmin().disableTable(TABLE_NAME);
178       }
179       util.getHBaseAdmin().deleteTable(TABLE_NAME);
180       LOG.info("Deleted table");
181     }
182   }
183 
184   private void createTable(boolean isStripe) throws Exception {
185     createTable(createHtd(isStripe));
186   }
187 
188   private void tearDown() throws Exception {
189     deleteTable();
190     LOG.info("Restoring the cluster");
191     util.restoreCluster();
192     LOG.info("Done restoring the cluster");
193   }
194 
195   private void runOneTest(String description, Configuration conf) throws Exception {
196     int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
197     long startKey = (long)preloadKeys * numServers;
198     long endKey = startKey + (long)writeKeys * numServers;
199     status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
200         description, numServers, startKey, endKey));
201 
202     if (preloadKeys > 0) {
203       MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
204       long time = System.currentTimeMillis();
205       preloader.start(0, startKey, writeThreads);
206       preloader.waitForFinish();
207       if (preloader.getNumWriteFailures() > 0) {
208         throw new IOException("Preload failed");
209       }
210       int waitTime = (int)Math.min(preloadKeys / 100, 30000); 
211       status(description + " preload took " + (System.currentTimeMillis()-time)/1000
212           + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
213       Thread.sleep(waitTime);
214     }
215 
216     MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
217     MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
218     
219     reader.linkToWriter(writer);
220 
221     long testStartTime = System.currentTimeMillis();
222     writer.start(startKey, endKey, writeThreads);
223     reader.start(startKey, endKey, readThreads);
224     writer.waitForFinish();
225     reader.waitForFinish();
226     
227     
228     status("Readers and writers stopped for test " + description);
229 
230     boolean success = writer.getNumWriteFailures() == 0;
231     if (!success) {
232       LOG.error("Write failed");
233     } else {
234       success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
235       if (!success) {
236         LOG.error("Read failed");
237       }
238     }
239 
240     
241     
242 
243 
244 
245 
246 
247 
248 
249 
250 
251 
252 
253 
254 
255     status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
256     Assert.assertTrue(success);
257   }
258 
259   private static void status(String s) {
260     LOG.info("STATUS " + s);
261     System.out.println(s);
262   }
263 
264   private HTableDescriptor createHtd(boolean isStripe) throws Exception {
265     HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
266     htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
267     String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
268     htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
269     if (isStripe) {
270       htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
271       if (initialStripeCount != null) {
272         htd.setConfiguration(
273             StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
274         htd.setConfiguration(
275             HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
276       } else {
277         htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500");
278       }
279       if (splitSize != null) {
280         htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
281       }
282       if (splitParts != null) {
283         htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
284       }
285     } else {
286       htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); 
287     }
288     return htd;
289   }
290 
291   protected void createTable(HTableDescriptor htd) throws Exception {
292     deleteTable();
293     if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) {
294       LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
295       htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
296     }
297     byte[][] splits = new RegionSplitter.HexStringSplit().split(
298         util.getHBaseClusterInterface().getClusterStatus().getServersSize());
299     util.getHBaseAdmin().createTable(htd, splits);
300   }
301 
302   public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
303     private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
304     private static final int PAD_TO = 10;
305     private static final int PREFIX_PAD_TO = 7;
306 
307     private final int numPartitions;
308 
309     public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
310       super(minValueSize, maxValueSize);
311       this.numPartitions = numPartitions;
312     }
313 
314     @Override
315     public byte[] getDeterministicUniqueKey(long keyBase) {
316       String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
317       return Bytes.toBytes(getPrefix(keyBase) + num);
318     }
319 
320     private String getPrefix(long i) {
321       return StringUtils.leftPad(String.valueOf((int)(i % numPartitions)), PREFIX_PAD_TO, "0");
322     }
323 
324     @Override
325     public byte[][] getColumnFamilies() {
326       return new byte[][] { COLUMN_FAMILY };
327     }
328 
329     @Override
330     public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
331       return COLUMN_NAMES;
332     }
333 
334     @Override
335     public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
336       return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
337     }
338 
339     @Override
340     public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
341       return LoadTestKVGenerator.verify(value, rowKey, cf, column);
342     }
343 
344     @Override
345     public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
346       return true;
347     }
348   };
349 }