1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
28  import org.apache.hadoop.hbase.util.ConstantDelayQueue;
29  import org.apache.hadoop.hbase.util.LoadTestTool;
30  import org.apache.hadoop.hbase.util.MultiThreadedUpdater;
31  import org.apache.hadoop.hbase.util.MultiThreadedWriter;
32  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
33  import org.apache.hadoop.hbase.util.Threads;
34  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
35  import org.apache.hadoop.util.StringUtils;
36  import org.apache.hadoop.util.ToolRunner;
37  import org.junit.Assert;
38  import org.junit.Test;
39  import org.junit.experimental.categories.Category;
40  
41  import com.google.common.collect.Lists;
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  @Category(IntegrationTests.class)
71  public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest {
72  
73    private static final String TEST_NAME
74      = IntegrationTestRegionReplicaReplication.class.getSimpleName();
75  
76    private static final String OPT_READ_DELAY_MS = "read_delay_ms";
77  
78    private static final int DEFAULT_REGION_REPLICATION = 2;
79    private static final int SERVER_COUNT = 1; 
80    private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"};
81  
82    @Override
83    protected int getMinServerCount() {
84      return SERVER_COUNT;
85    }
86  
87    @Override
88    public void setConf(Configuration conf) {
89      conf.setIfUnset(
90        String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION),
91        String.valueOf(DEFAULT_REGION_REPLICATION));
92  
93      conf.setIfUnset(
94        String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES),
95        StringUtils.join(",", DEFAULT_COLUMN_FAMILIES));
96  
97      conf.setBoolean("hbase.table.sanity.checks", true);
98  
99      
100     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
101     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
102 
103     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); 
104     conf.setInt("hbase.hstore.blockingStoreFiles", 100);
105 
106     super.setConf(conf);
107   }
108 
109   @Override
110   @Test
111   public void testIngest() throws Exception {
112     runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
113   }
114 
115   
116 
117 
118 
119 
120   public static class DelayingMultiThreadedWriter extends MultiThreadedWriter {
121     private long delayMs;
122     public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
123         TableName tableName) throws IOException {
124       super(dataGen, conf, tableName);
125     }
126     @Override
127     protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
128       this.delayMs = conf.getLong(String.format("%s.%s",
129         IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
130       return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
131     }
132   }
133 
134   
135 
136 
137 
138 
139   public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater {
140     private long delayMs;
141     public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
142         TableName tableName, double updatePercent) throws IOException {
143       super(dataGen, conf, tableName, updatePercent);
144     }
145     @Override
146     protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
147       this.delayMs = conf.getLong(String.format("%s.%s",
148         IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
149       return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
150     }
151   }
152 
153   @Override
154   protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
155       int recordSize, int writeThreads, int readThreads) throws Exception {
156 
157     LOG.info("Running ingest");
158     LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
159 
160     
161     Threads.sleep(
162       getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
163         5000) + 1000);
164 
165     long start = System.currentTimeMillis();
166     String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
167     long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
168     long startKey = 0;
169 
170     long numKeys = getNumKeys(keysPerServerPerIter);
171     while (System.currentTimeMillis() - start < 0.9 * runtime) {
172       LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
173           ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
174 
175       int verifyPercent = 100;
176       int updatePercent = 20;
177       int ret = -1;
178       int regionReplicaId = conf.getInt(String.format("%s.%s"
179         , TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
180 
181       
182       List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
183       args.add("-write");
184       args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads));
185       args.add("-" + LoadTestTool.OPT_MULTIPUT);
186       args.add("-writer");
187       args.add(DelayingMultiThreadedWriter.class.getName()); 
188       args.add("-read");
189       args.add(String.format("%d:%d", verifyPercent, readThreads));
190       args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
191       args.add(String.valueOf(regionReplicaId));
192 
193       ret = loadTool.run(args.toArray(new String[args.size()]));
194       if (0 != ret) {
195         String errorMsg = "Load failed with error code " + ret;
196         LOG.error(errorMsg);
197         Assert.fail(errorMsg);
198       }
199 
200       args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
201       args.add("-update");
202       args.add(String.format("%s:%s:1", updatePercent, writeThreads));
203       args.add("-updater");
204       args.add(DelayingMultiThreadedUpdater.class.getName()); 
205       args.add("-read");
206       args.add(String.format("%d:%d", verifyPercent, readThreads));
207       args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
208       args.add(String.valueOf(regionReplicaId));
209 
210       ret = loadTool.run(args.toArray(new String[args.size()]));
211       if (0 != ret) {
212         String errorMsg = "Load failed with error code " + ret;
213         LOG.error(errorMsg);
214         Assert.fail(errorMsg);
215       }
216       startKey += numKeys;
217     }
218   }
219 
220   public static void main(String[] args) throws Exception {
221     Configuration conf = HBaseConfiguration.create();
222     IntegrationTestingUtility.setUseDistributedCluster(conf);
223     int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args);
224     System.exit(ret);
225   }
226 }