1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  package org.apache.hadoop.hbase.master.cleaner;
12  
13  import static org.junit.Assert.assertEquals;
14  import static org.junit.Assert.assertFalse;
15  import static org.junit.Assert.assertTrue;
16  import static org.junit.Assert.fail;
17  import static org.mockito.Mockito.doThrow;
18  import static org.mockito.Mockito.spy;
19  
20  import com.google.common.collect.Lists;
21  
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Abortable;
35  import org.apache.hadoop.hbase.ChoreService;
36  import org.apache.hadoop.hbase.CoordinatedStateManager;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.Server;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
42  import org.apache.hadoop.hbase.client.ClusterConnection;
43  import org.apache.hadoop.hbase.replication.ReplicationException;
44  import org.apache.hadoop.hbase.replication.ReplicationFactory;
45  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
46  import org.apache.hadoop.hbase.replication.ReplicationPeers;
47  import org.apache.hadoop.hbase.replication.ReplicationQueues;
48  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
49  import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
50  import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
51  import org.apache.hadoop.hbase.replication.regionserver.Replication;
52  import org.apache.hadoop.hbase.testclassification.SmallTests;
53  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
54  import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56  import org.apache.zookeeper.KeeperException;
57  import org.apache.zookeeper.data.Stat;
58  import org.junit.After;
59  import org.junit.AfterClass;
60  import org.junit.Before;
61  import org.junit.BeforeClass;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  import org.mockito.Mockito;
65  
66  @Category({ SmallTests.class })
67  public class TestReplicationHFileCleaner {
68    private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
69    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
70    private static Server server;
71    private static ReplicationQueues rq;
72    private static ReplicationPeers rp;
73    private static final String peerId = "TestReplicationHFileCleaner";
74    private static Configuration conf = TEST_UTIL.getConfiguration();
75    static FileSystem fs = null;
76    Path root;
77  
78    
79  
80  
81    @BeforeClass
82    public static void setUpBeforeClass() throws Exception {
83      TEST_UTIL.startMiniZKCluster();
84      server = new DummyServer();
85      conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
86      Replication.decorateMasterConfiguration(conf);
87      rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
88      rp.init();
89  
90      rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
91      rq.init(server.getServerName().toString());
92      try {
93        fs = FileSystem.get(conf);
94      } finally {
95        if (fs != null) {
96          fs.close();
97        }
98      }
99    }
100 
101   
102 
103 
104   @AfterClass
105   public static void tearDownAfterClass() throws Exception {
106     TEST_UTIL.shutdownMiniZKCluster();
107   }
108 
109   @Before
110   public void setup() throws ReplicationException, IOException {
111     root = TEST_UTIL.getDataTestDirOnTestFS();
112     rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
113   }
114 
115   @After
116   public void cleanup() throws ReplicationException {
117     try {
118       fs.delete(root, true);
119     } catch (IOException e) {
120       LOG.warn("Failed to delete files recursively from path " + root);
121     }
122     rp.removePeer(peerId);
123   }
124 
125   @Test
126   public void testIsFileDeletable() throws IOException, ReplicationException {
127     
128     Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
129     fs.createNewFile(file);
130     
131     assertTrue("Test file not created!", fs.exists(file));
132     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
133     cleaner.setConf(conf);
134     
135     assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
136         + "for it in the queue.",
137       cleaner.isFileDeletable(fs.getFileStatus(file)));
138 
139     List<String> files = new ArrayList<String>(1);
140     files.add(file.getName());
141     
142     rq.addHFileRefs(peerId, files);
143     
144     assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node "
145         + "for it in the queue.",
146       cleaner.isFileDeletable(fs.getFileStatus(file)));
147   }
148 
149   @Test
150   public void testGetDeletableFiles() throws Exception {
151     
152     Path notDeletablefile = new Path(root, "testGetDeletableFiles_1");
153     fs.createNewFile(notDeletablefile);
154     assertTrue("Test file not created!", fs.exists(notDeletablefile));
155     Path deletablefile = new Path(root, "testGetDeletableFiles_2");
156     fs.createNewFile(deletablefile);
157     assertTrue("Test file not created!", fs.exists(deletablefile));
158 
159     List<FileStatus> files = new ArrayList<FileStatus>(2);
160     FileStatus f = new FileStatus();
161     f.setPath(deletablefile);
162     files.add(f);
163     f = new FileStatus();
164     f.setPath(notDeletablefile);
165     files.add(f);
166 
167     List<String> hfiles = new ArrayList<>(1);
168     hfiles.add(notDeletablefile.getName());
169     
170     rq.addHFileRefs(peerId, hfiles);
171 
172     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
173     cleaner.setConf(conf);
174     Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
175     int i = 0;
176     while (deletableFilesIterator.hasNext() && i < 2) {
177       i++;
178     }
179     
180     if (i > 2) {
181       fail("File " + notDeletablefile
182           + " should not be deletable as its hfile reference node is not added.");
183     }
184     assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
185   }
186 
187   
188 
189 
190 
191   @Test(timeout = 15000)
192   public void testForDifferntHFileRefsZnodeVersion() throws Exception {
193     
194     Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
195     fs.createNewFile(file);
196     
197     assertTrue("Test file not created!", fs.exists(file));
198     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
199     cleaner.setConf(conf);
200 
201     ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
202     
203     Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
204 
205     Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
206     Field rqc = cleanerClass.getDeclaredField("rqc");
207     rqc.setAccessible(true);
208     rqc.set(cleaner, replicationQueuesClient);
209 
210     cleaner.isFileDeletable(fs.getFileStatus(file));
211   }
212 
213   
214 
215 
216   @Test
217   public void testZooKeeperAbort() throws Exception {
218     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
219 
220     List<FileStatus> dummyFiles =
221         Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
222             "hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
223             "hfile2")));
224 
225     FaultyZooKeeperWatcher faultyZK =
226         new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
227     try {
228       faultyZK.init();
229       cleaner.setConf(conf, faultyZK);
230       
231       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
232       assertFalse(toDelete.iterator().hasNext());
233       assertFalse(cleaner.isStopped());
234     } finally {
235       faultyZK.close();
236     }
237 
238     
239     cleaner = new ReplicationHFileCleaner();
240     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
241     try {
242       cleaner.setConf(conf, zkw);
243       Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
244       Iterator<FileStatus> iter = filesToDelete.iterator();
245       assertTrue(iter.hasNext());
246       assertEquals(new Path("hfile1"), iter.next().getPath());
247       assertTrue(iter.hasNext());
248       assertEquals(new Path("hfile2"), iter.next().getPath());
249       assertFalse(iter.hasNext());
250     } finally {
251       zkw.close();
252     }
253   }
254 
255   static class DummyServer implements Server {
256 
257     @Override
258     public Configuration getConfiguration() {
259       return TEST_UTIL.getConfiguration();
260     }
261 
262     @Override
263     public ZooKeeperWatcher getZooKeeper() {
264       try {
265         return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
266       } catch (IOException e) {
267         e.printStackTrace();
268       }
269       return null;
270     }
271 
272     @Override
273     public CoordinatedStateManager getCoordinatedStateManager() {
274       return null;
275     }
276 
277     @Override
278     public ClusterConnection getConnection() {
279       return null;
280     }
281 
282     @Override
283     public MetaTableLocator getMetaTableLocator() {
284       return null;
285     }
286 
287     @Override
288     public ServerName getServerName() {
289       return ServerName.valueOf("regionserver,60020,000000");
290     }
291 
292     @Override
293     public void abort(String why, Throwable e) {
294     }
295 
296     @Override
297     public boolean isAborted() {
298       return false;
299     }
300 
301     @Override
302     public void stop(String why) {
303     }
304 
305     @Override
306     public boolean isStopped() {
307       return false;
308     }
309 
310     @Override
311     public ChoreService getChoreService() {
312       return null;
313     }
314   }
315 
316   static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
317     private RecoverableZooKeeper zk;
318     public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
319         throws ZooKeeperConnectionException, IOException {
320       super(conf, identifier, abortable);
321     }
322 
323     public void init() throws Exception {
324       this.zk = spy(super.getRecoverableZooKeeper());
325       doThrow(new KeeperException.ConnectionLossException())
326           .when(zk).getData("/hbase/replication/hfile-refs", null, new Stat());
327     }
328 
329     public RecoverableZooKeeper getRecoverableZooKeeper() {
330       return zk;
331     }
332   }
333 }