1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.UUID;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.testclassification.MediumTests;
33  import org.apache.hadoop.hbase.Waiter;
34  import org.apache.hadoop.hbase.client.Connection;
35  import org.apache.hadoop.hbase.client.ConnectionFactory;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
39  import org.apache.hadoop.hbase.wal.WAL.Entry;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
42  import org.apache.hadoop.hbase.util.Threads;
43  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
44  import org.junit.AfterClass;
45  import org.junit.Assert;
46  import org.junit.Before;
47  import org.junit.BeforeClass;
48  import org.junit.Ignore;
49  import org.junit.Test;
50  import org.junit.experimental.categories.Category;
51  
52  
53  
54  
55  @Category(MediumTests.class)
56  public class TestReplicationEndpoint extends TestReplicationBase {
57    static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class);
58  
59    static int numRegionServers;
60  
61    @BeforeClass
62    public static void setUpBeforeClass() throws Exception {
63      TestReplicationBase.setUpBeforeClass();
64      utility2.shutdownMiniCluster(); 
65      admin.removePeer("2");
66      numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
67    }
68  
69    @AfterClass
70    public static void tearDownAfterClass() throws Exception {
71      TestReplicationBase.tearDownAfterClass();
72      
73      Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
74    }
75  
76    @Before
77    public void setup() throws FailedLogCloseException, IOException {
78      ReplicationEndpointForTest.contructedCount.set(0);
79      ReplicationEndpointForTest.startedCount.set(0);
80      ReplicationEndpointForTest.replicateCount.set(0);
81      ReplicationEndpointReturningFalse.replicated.set(false);
82      ReplicationEndpointForTest.lastEntries = null;
83      for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
84        utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
85      }
86    }
87  
88    @Test (timeout=120000)
89    public void testCustomReplicationEndpoint() throws Exception {
90      
91      admin.addPeer("testCustomReplicationEndpoint",
92        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
93          .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
94  
95      
96      Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
97        @Override
98        public boolean evaluate() throws Exception {
99          return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
100       }
101     });
102 
103     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
104       @Override
105       public boolean evaluate() throws Exception {
106         return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
107       }
108     });
109 
110     Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
111 
112     
113     doPut(Bytes.toBytes("row42"));
114 
115     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
116       @Override
117       public boolean evaluate() throws Exception {
118         return ReplicationEndpointForTest.replicateCount.get() >= 1;
119       }
120     });
121 
122     doAssert(Bytes.toBytes("row42"));
123 
124     admin.removePeer("testCustomReplicationEndpoint");
125   }
126 
127   @Ignore @Test (timeout=120000)
128   public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
129     Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
130     Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
131     int peerCount = admin.getPeersCount();
132     final String id = "testReplicationEndpointReturnsFalseOnReplicate";
133     admin.addPeer(id,
134       new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
135         .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
136     
137     
138     
139     if (admin.getPeersCount() <= peerCount) {
140       LOG.info("Waiting on peercount to go up from " + peerCount);
141       Threads.sleep(100);
142     }
143     
144     doPut(row);
145 
146     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
147       @Override
148       public boolean evaluate() throws Exception {
149         
150         
151         int count = ReplicationEndpointForTest.replicateCount.get();
152         LOG.info("count=" + count);
153         return ReplicationEndpointReturningFalse.replicated.get();
154       }
155     });
156     if (ReplicationEndpointReturningFalse.ex.get() != null) {
157       throw ReplicationEndpointReturningFalse.ex.get();
158     }
159 
160     admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
161   }
162 
163   @Test (timeout=120000)
164   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
165     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
166       new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
167         .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
168     
169     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
170       doPut(connection, Bytes.toBytes("row1"));
171       doPut(connection, row);
172       doPut(connection, Bytes.toBytes("row2"));
173     }
174 
175     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
176       @Override
177       public boolean evaluate() throws Exception {
178         return ReplicationEndpointForTest.replicateCount.get() >= 1;
179       }
180     });
181 
182     Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
183     admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
184   }
185 
186 
187   private void doPut(byte[] row) throws IOException {
188     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
189       doPut(connection, row);
190     }
191   }
192 
193   private void doPut(final Connection connection, final byte [] row) throws IOException {
194     try (Table t = connection.getTable(tableName)) {
195       Put put = new Put(row);
196       put.add(famName, row, row);
197       t.put(put);
198     }
199   }
200 
201   private static void doAssert(byte[] row) throws Exception {
202     if (ReplicationEndpointForTest.lastEntries == null) {
203       return; 
204     }
205     Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
206     List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
207     Assert.assertEquals(1, cells.size());
208     Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
209       cells.get(0).getRowLength(), row, 0, row.length));
210   }
211 
212   public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
213     static UUID uuid = UUID.randomUUID();
214     static AtomicInteger contructedCount = new AtomicInteger();
215     static AtomicInteger startedCount = new AtomicInteger();
216     static AtomicInteger stoppedCount = new AtomicInteger();
217     static AtomicInteger replicateCount = new AtomicInteger();
218     static volatile List<Entry> lastEntries = null;
219 
220     public ReplicationEndpointForTest() {
221       contructedCount.incrementAndGet();
222     }
223 
224     @Override
225     public UUID getPeerUUID() {
226       return uuid;
227     }
228 
229     @Override
230     public boolean replicate(ReplicateContext replicateContext) {
231       replicateCount.incrementAndGet();
232       lastEntries = replicateContext.entries;
233       return true;
234     }
235 
236     @Override
237     protected void doStart() {
238       startedCount.incrementAndGet();
239       notifyStarted();
240     }
241 
242     @Override
243     protected void doStop() {
244       stoppedCount.incrementAndGet();
245       notifyStopped();
246     }
247   }
248 
249   public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
250     static int COUNT = 10;
251     static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
252     static AtomicBoolean replicated = new AtomicBoolean(false);
253     @Override
254     public boolean replicate(ReplicateContext replicateContext) {
255       try {
256         
257         doAssert(row);
258       } catch (Exception e) {
259         ex.set(e);
260       }
261 
262       super.replicate(replicateContext);
263       LOG.info("Replicated " + row + ", count=" + replicateCount.get());
264 
265       replicated.set(replicateCount.get() > COUNT); 
266       return replicated.get();
267     }
268   }
269 
270   
271   public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
272     static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
273 
274     @Override
275     public boolean replicate(ReplicateContext replicateContext) {
276       try {
277         super.replicate(replicateContext);
278         doAssert(row);
279       } catch (Exception e) {
280         ex.set(e);
281       }
282       return true;
283     }
284 
285     @Override
286     public WALEntryFilter getWALEntryfilter() {
287       return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
288         @Override
289         public Entry filter(Entry entry) {
290           ArrayList<Cell> cells = entry.getEdit().getCells();
291           int size = cells.size();
292           for (int i = size-1; i >= 0; i--) {
293             Cell cell = cells.get(i);
294             if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
295               row, 0, row.length)) {
296               cells.remove(i);
297             }
298           }
299           return entry;
300         }
301       });
302     }
303   }
304 }