1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  package org.apache.hadoop.hbase.replication;
21  
22  import static org.junit.Assert.assertArrayEquals;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.assertFalse;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.util.List;
30  import java.util.Map;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.HBaseConfiguration;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.testclassification.LargeTests;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.client.Admin;
43  import org.apache.hadoop.hbase.client.Connection;
44  import org.apache.hadoop.hbase.client.ConnectionFactory;
45  import org.apache.hadoop.hbase.client.Delete;
46  import org.apache.hadoop.hbase.client.Get;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Result;
49  import org.apache.hadoop.hbase.client.Table;
50  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
51  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
54  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
55  import org.junit.AfterClass;
56  import org.junit.BeforeClass;
57  import org.junit.Test;
58  import org.junit.experimental.categories.Category;
59  
60  @Category(LargeTests.class)
61  public class TestPerTableCFReplication {
62  
63    private static final Log LOG = LogFactory.getLog(TestPerTableCFReplication.class);
64  
65    private static Configuration conf1;
66    private static Configuration conf2;
67    private static Configuration conf3;
68  
69    private static HBaseTestingUtility utility1;
70    private static HBaseTestingUtility utility2;
71    private static HBaseTestingUtility utility3;
72    private static final long SLEEP_TIME = 500;
73    private static final int NB_RETRIES = 100;
74  
75    private static final TableName tableName = TableName.valueOf("test");
76    private static final TableName tabAName = TableName.valueOf("TA");
77    private static final TableName tabBName = TableName.valueOf("TB");
78    private static final TableName tabCName = TableName.valueOf("TC");
79    private static final byte[] famName = Bytes.toBytes("f");
80    private static final byte[] f1Name = Bytes.toBytes("f1");
81    private static final byte[] f2Name = Bytes.toBytes("f2");
82    private static final byte[] f3Name = Bytes.toBytes("f3");
83    private static final byte[] row1 = Bytes.toBytes("row1");
84    private static final byte[] row2 = Bytes.toBytes("row2");
85    private static final byte[] noRepfamName = Bytes.toBytes("norep");
86    private static final byte[] val = Bytes.toBytes("myval");
87  
88    private static HTableDescriptor table;
89    private static HTableDescriptor tabA;
90    private static HTableDescriptor tabB;
91    private static HTableDescriptor tabC;
92  
93    @BeforeClass
94    public static void setUpBeforeClass() throws Exception {
95      conf1 = HBaseConfiguration.create();
96      conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
97      
98      
99      conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
100     conf1.setInt("replication.source.size.capacity", 1024);
101     conf1.setLong("replication.source.sleepforretries", 100);
102     conf1.setInt("hbase.regionserver.maxlogs", 10);
103     conf1.setLong("hbase.master.logcleaner.ttl", 10);
104     conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
105     conf1.setBoolean("dfs.support.append", true);
106     conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
107     conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
108         "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
109 
110     utility1 = new HBaseTestingUtility(conf1);
111     utility1.startMiniZKCluster();
112     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
113     new ZooKeeperWatcher(conf1, "cluster1", null, true);
114 
115     conf2 = new Configuration(conf1);
116     conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
117 
118     conf3 = new Configuration(conf1);
119     conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
120 
121     utility2 = new HBaseTestingUtility(conf2);
122     utility2.setZkCluster(miniZK);
123     new ZooKeeperWatcher(conf2, "cluster3", null, true);
124 
125     utility3 = new HBaseTestingUtility(conf3);
126     utility3.setZkCluster(miniZK);
127     new ZooKeeperWatcher(conf3, "cluster3", null, true);
128 
129     table = new HTableDescriptor(tableName);
130     HColumnDescriptor fam = new HColumnDescriptor(famName);
131     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
132     table.addFamily(fam);
133     fam = new HColumnDescriptor(noRepfamName);
134     table.addFamily(fam);
135 
136     tabA = new HTableDescriptor(tabAName);
137     fam = new HColumnDescriptor(f1Name);
138     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
139     tabA.addFamily(fam);
140     fam = new HColumnDescriptor(f2Name);
141     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
142     tabA.addFamily(fam);
143     fam = new HColumnDescriptor(f3Name);
144     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
145     tabA.addFamily(fam);
146 
147     tabB = new HTableDescriptor(tabBName);
148     fam = new HColumnDescriptor(f1Name);
149     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
150     tabB.addFamily(fam);
151     fam = new HColumnDescriptor(f2Name);
152     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
153     tabB.addFamily(fam);
154     fam = new HColumnDescriptor(f3Name);
155     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
156     tabB.addFamily(fam);
157 
158     tabC = new HTableDescriptor(tabCName);
159     fam = new HColumnDescriptor(f1Name);
160     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
161     tabC.addFamily(fam);
162     fam = new HColumnDescriptor(f2Name);
163     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
164     tabC.addFamily(fam);
165     fam = new HColumnDescriptor(f3Name);
166     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
167     tabC.addFamily(fam);
168 
169     utility1.startMiniCluster();
170     utility2.startMiniCluster();
171     utility3.startMiniCluster();
172   }
173 
174   @AfterClass
175   public static void tearDownAfterClass() throws Exception {
176     utility3.shutdownMiniCluster();
177     utility2.shutdownMiniCluster();
178     utility1.shutdownMiniCluster();
179   }
180 
181   @Test
182   public void testParseTableCFsFromConfig() {
183     Map<TableName, List<String>> tabCFsMap = null;
184 
185     
186     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null);
187     assertEquals(null, tabCFsMap);
188 
189     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("");
190     assertEquals(null, tabCFsMap);
191 
192     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("   ");
193     assertEquals(null, tabCFsMap);
194 
195     TableName tab1 = TableName.valueOf("tab1");
196     TableName tab2 = TableName.valueOf("tab2");
197     TableName tab3 = TableName.valueOf("tab3");
198 
199     
200     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1");
201     assertEquals(1, tabCFsMap.size()); 
202     assertTrue(tabCFsMap.containsKey(tab1));   
203     assertFalse(tabCFsMap.containsKey(tab2));  
204     assertEquals(null, tabCFsMap.get(tab1));   
205 
206     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1");
207     assertEquals(1, tabCFsMap.size()); 
208     assertTrue(tabCFsMap.containsKey(tab2));   
209     assertFalse(tabCFsMap.containsKey(tab1));  
210     assertEquals(1, tabCFsMap.get(tab2).size());   
211     assertEquals("cf1", tabCFsMap.get(tab2).get(0));
212 
213     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3");
214     assertEquals(1, tabCFsMap.size()); 
215     assertTrue(tabCFsMap.containsKey(tab3));   
216     assertFalse(tabCFsMap.containsKey(tab1));  
217     assertEquals(2, tabCFsMap.get(tab3).size());   
218     assertTrue(tabCFsMap.get(tab3).contains("cf1"));
219     assertTrue(tabCFsMap.get(tab3).contains("cf3"));
220 
221     
222     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
223     
224     assertEquals(3, tabCFsMap.size());
225     assertTrue(tabCFsMap.containsKey(tab1));
226     assertTrue(tabCFsMap.containsKey(tab2));
227     assertTrue(tabCFsMap.containsKey(tab3));
228     
229     assertEquals(null, tabCFsMap.get(tab1));
230     
231     assertEquals(1, tabCFsMap.get(tab2).size());
232     assertEquals("cf1", tabCFsMap.get(tab2).get(0));
233     
234     assertEquals(2, tabCFsMap.get(tab3).size());
235     assertTrue(tabCFsMap.get(tab3).contains("cf1"));
236     assertTrue(tabCFsMap.get(tab3).contains("cf3"));
237 
238     
239     
240     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
241       "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
242     
243     assertEquals(3, tabCFsMap.size());
244     assertTrue(tabCFsMap.containsKey(tab1));
245     assertTrue(tabCFsMap.containsKey(tab2));
246     assertTrue(tabCFsMap.containsKey(tab3));
247     
248     assertEquals(null, tabCFsMap.get(tab1));
249     
250     assertEquals(1, tabCFsMap.get(tab2).size());
251     assertEquals("cf1", tabCFsMap.get(tab2).get(0));
252     
253     assertEquals(2, tabCFsMap.get(tab3).size());
254     assertTrue(tabCFsMap.get(tab3).contains("cf1"));
255     assertTrue(tabCFsMap.get(tab3).contains("cf3"));
256 
257     
258     
259     tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
260       "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
261     
262     assertEquals(1, tabCFsMap.size()); 
263     assertFalse(tabCFsMap.containsKey(tab1));
264     assertFalse(tabCFsMap.containsKey(tab2));
265     assertTrue(tabCFsMap.containsKey(tab3));
266    
267     assertEquals(2, tabCFsMap.get(tab3).size());
268     assertTrue(tabCFsMap.get(tab3).contains("cf1"));
269     assertTrue(tabCFsMap.get(tab3).contains("cf3"));
270  }
271 
272   @Test(timeout=300000)
273   public void testPerTableCFReplication() throws Exception {
274     LOG.info("testPerTableCFReplication");
275     ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
276     Connection connection1 = ConnectionFactory.createConnection(conf1);
277     Connection connection2 = ConnectionFactory.createConnection(conf2);
278     Connection connection3 = ConnectionFactory.createConnection(conf3);
279     try {
280       Admin admin1 = connection1.getAdmin();
281       Admin admin2 = connection2.getAdmin();
282       Admin admin3 = connection3.getAdmin();
283 
284       admin1.createTable(tabA);
285       admin1.createTable(tabB);
286       admin1.createTable(tabC);
287       admin2.createTable(tabA);
288       admin2.createTable(tabB);
289       admin2.createTable(tabC);
290       admin3.createTable(tabA);
291       admin3.createTable(tabB);
292       admin3.createTable(tabC);
293 
294       Table htab1A = connection1.getTable(tabAName);
295       Table htab2A = connection2.getTable(tabAName);
296       Table htab3A = connection3.getTable(tabAName);
297 
298       Table htab1B = connection1.getTable(tabBName);
299       Table htab2B = connection2.getTable(tabBName);
300       Table htab3B = connection3.getTable(tabBName);
301 
302       Table htab1C = connection1.getTable(tabCName);
303       Table htab2C = connection2.getTable(tabCName);
304       Table htab3C = connection3.getTable(tabCName);
305 
306       
307       replicationAdmin.addPeer("2", utility2.getClusterKey(), "TC;TB:f1,f3");
308       replicationAdmin.addPeer("3", utility3.getClusterKey(), "TA;TB:f1,f2");
309 
310       
311       putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
312       ensureRowNotReplicated(row1, f1Name, htab2A);
313       deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
314 
315       putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
316       ensureRowNotReplicated(row1, f2Name, htab2A);
317       deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
318 
319       putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
320       ensureRowNotReplicated(row1, f3Name, htab2A);
321       deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
322 
323       
324       putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
325       deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
326 
327       
328       putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
329       ensureRowNotReplicated(row1, f2Name, htab2B);
330       deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
331 
332       
333       putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
334       ensureRowNotReplicated(row1, f3Name, htab3B);
335       deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
336 
337       
338       putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
339       ensureRowNotReplicated(row1, f1Name, htab3C);
340       deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
341 
342       putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
343       ensureRowNotReplicated(row1, f2Name, htab3C);
344       deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
345 
346       putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
347       ensureRowNotReplicated(row1, f3Name, htab3C);
348       deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
349 
350       
351       replicationAdmin.setPeerTableCFs("2", "TA:f1,f2; TC:f2,f3");
352       replicationAdmin.setPeerTableCFs("3", "TB; TC:f3");
353 
354       
355       putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
356       ensureRowNotReplicated(row2, f1Name, htab3A);
357       deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
358       
359       putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
360       ensureRowNotReplicated(row2, f2Name, htab3A);
361       deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
362       
363       putAndWaitWithFamily(row2, f3Name, htab1A);
364       ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
365       deleteAndWaitWithFamily(row2, f3Name, htab1A);
366 
367       
368       putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
369       ensureRowNotReplicated(row2, f1Name, htab2B);
370       deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
371 
372       putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
373       ensureRowNotReplicated(row2, f2Name, htab2B);
374       deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
375 
376       putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
377       ensureRowNotReplicated(row2, f3Name, htab2B);
378       deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
379 
380       
381       putAndWaitWithFamily(row2, f1Name, htab1C);
382       ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
383       deleteAndWaitWithFamily(row2, f1Name, htab1C);
384       
385       putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
386       ensureRowNotReplicated(row2, f2Name, htab3C);
387       deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
388       
389       putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
390       deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
391     } finally {
392       connection1.close();
393       connection2.close();
394       connection3.close();
395     }
396  }
397 
398   private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
399     Get get = new Get(row);
400     get.addFamily(fam);
401     for (Table table : tables) {
402       Result res = table.get(get);
403       assertEquals(0, res.size());
404     }
405   }
406 
407   private void deleteAndWaitWithFamily(byte[] row, byte[] fam,
408       Table source, Table... targets)
409     throws Exception {
410     Delete del = new Delete(row);
411     del.deleteFamily(fam);
412     source.delete(del);
413 
414     Get get = new Get(row);
415     get.addFamily(fam);
416     for (int i = 0; i < NB_RETRIES; i++) {
417       if (i==NB_RETRIES-1) {
418         fail("Waited too much time for del replication");
419       }
420       boolean removedFromAll = true;
421       for (Table target : targets) {
422         Result res = target.get(get);
423         if (res.size() >= 1) {
424           LOG.info("Row not deleted");
425           removedFromAll = false;
426           break;
427         }
428       }
429       if (removedFromAll) {
430         break;
431       } else {
432         Thread.sleep(SLEEP_TIME);
433       }
434     }
435   }
436 
437   private void putAndWaitWithFamily(byte[] row, byte[] fam,
438       Table source, Table... targets)
439     throws Exception {
440     Put put = new Put(row);
441     put.add(fam, row, val);
442     source.put(put);
443 
444     Get get = new Get(row);
445     get.addFamily(fam);
446     for (int i = 0; i < NB_RETRIES; i++) {
447       if (i==NB_RETRIES-1) {
448         fail("Waited too much time for put replication");
449       }
450       boolean replicatedToAll = true;
451       for (Table target : targets) {
452         Result res = target.get(get);
453         if (res.size() == 0) {
454           LOG.info("Row not available");
455           replicatedToAll = false;
456           break;
457         } else {
458           assertEquals(res.size(), 1);
459           assertArrayEquals(res.value(), val);
460         }
461       }
462       if (replicatedToAll) {
463         break;
464       } else {
465         Thread.sleep(SLEEP_TIME);
466       }
467     }
468   }
469 }