1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.util.Iterator;
26  import java.util.HashSet;
27  import java.util.Set;
28  import java.util.Random;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
37  import org.apache.hadoop.hbase.procedure2.Procedure;
38  import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
39  import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
40  import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
41  import org.apache.hadoop.hbase.testclassification.SmallTests;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.io.IOUtils;
44  
45  import org.junit.After;
46  import org.junit.Before;
47  import org.junit.Assert;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  
51  import static org.junit.Assert.assertEquals;
52  import static org.junit.Assert.assertFalse;
53  import static org.junit.Assert.assertTrue;
54  import static org.junit.Assert.fail;
55  
56  @Category(SmallTests.class)
57  public class TestWALProcedureStore {
58    private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class);
59  
60    private static final int PROCEDURE_STORE_SLOTS = 1;
61    private static final Procedure NULL_PROC = null;
62  
63    private WALProcedureStore procStore;
64  
65    private HBaseCommonTestingUtility htu;
66    private FileSystem fs;
67    private Path testDir;
68    private Path logDir;
69  
70    @Before
71    public void setUp() throws IOException {
72      htu = new HBaseCommonTestingUtility();
73      testDir = htu.getDataTestDir();
74      fs = testDir.getFileSystem(htu.getConfiguration());
75      assertTrue(testDir.depth() > 1);
76  
77      logDir = new Path(testDir, "proc-logs");
78      procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
79      procStore.start(PROCEDURE_STORE_SLOTS);
80      procStore.recoverLease();
81      procStore.load();
82    }
83  
84    @After
85    public void tearDown() throws IOException {
86      procStore.stop(false);
87      fs.delete(logDir, true);
88    }
89  
90    private Iterator<Procedure> storeRestart() throws Exception {
91      procStore.stop(false);
92      procStore.start(PROCEDURE_STORE_SLOTS);
93      procStore.recoverLease();
94      return procStore.load();
95    }
96  
97    @Test
98    public void testEmptyRoll() throws Exception {
99      for (int i = 0; i < 10; ++i) {
100       procStore.periodicRollForTesting();
101     }
102     FileStatus[] status = fs.listStatus(logDir);
103     assertEquals(1, status.length);
104   }
105 
106   @Test
107   public void testEmptyLogLoad() throws Exception {
108     Iterator<Procedure> loader = storeRestart();
109     assertEquals(0, countProcedures(loader));
110   }
111 
112   @Test
113   public void testLoad() throws Exception {
114     Set<Long> procIds = new HashSet<>();
115 
116     
117     Procedure proc1 = new TestSequentialProcedure();
118     procIds.add(proc1.getProcId());
119     procStore.insert(proc1, null);
120 
121     Procedure proc2 = new TestSequentialProcedure();
122     Procedure[] child2 = new Procedure[2];
123     child2[0] = new TestSequentialProcedure();
124     child2[1] = new TestSequentialProcedure();
125 
126     procIds.add(proc2.getProcId());
127     procIds.add(child2[0].getProcId());
128     procIds.add(child2[1].getProcId());
129     procStore.insert(proc2, child2);
130 
131     
132     verifyProcIdsOnRestart(procIds);
133 
134     
135     procStore.update(proc1);
136     procStore.update(child2[1]);
137     procStore.delete(child2[1].getProcId());
138     procIds.remove(child2[1].getProcId());
139 
140     
141     verifyProcIdsOnRestart(procIds);
142 
143     
144     procStore.stop(false);
145     FileStatus[] logs = fs.listStatus(logDir);
146     assertEquals(3, logs.length);
147     for (int i = 0; i < logs.length; ++i) {
148       corruptLog(logs[i], 4);
149     }
150     verifyProcIdsOnRestart(procIds);
151   }
152 
153   @Test
154   public void testNoTrailerDoubleRestart() throws Exception {
155     
156     Procedure proc0 = new TestSequentialProcedure();
157     procStore.insert(proc0, null);
158     Procedure proc1 = new TestSequentialProcedure();
159     procStore.insert(proc1, null);
160     Procedure proc2 = new TestSequentialProcedure();
161     procStore.insert(proc2, null);
162     procStore.rollWriterForTesting();
163 
164     
165     procStore.delete(proc1.getProcId());
166     procStore.rollWriterForTesting();
167 
168     
169     procStore.update(proc2);
170     procStore.rollWriterForTesting();
171 
172     
173     procStore.delete(proc2.getProcId());
174 
175     
176     procStore.stop(false);
177     FileStatus[] logs = fs.listStatus(logDir);
178     assertEquals(4, logs.length);
179     for (int i = 0; i < logs.length; ++i) {
180       corruptLog(logs[i], 4);
181     }
182 
183     
184     assertEquals(1, countProcedures(storeRestart()));
185 
186     
187     assertEquals(5, fs.listStatus(logDir).length);
188     assertEquals(1, countProcedures(storeRestart()));
189 
190     
191     procStore.delete(proc0.getProcId());
192     procStore.periodicRollForTesting();
193     assertEquals(1, fs.listStatus(logDir).length);
194     assertEquals(0, countProcedures(storeRestart()));
195   }
196 
197   @Test
198   public void testCorruptedTrailer() throws Exception {
199     
200     for (int i = 0; i < 100; ++i) {
201       procStore.insert(new TestSequentialProcedure(), null);
202     }
203 
204     
205     procStore.stop(false);
206 
207     
208     FileStatus[] logs = fs.listStatus(logDir);
209     assertEquals(1, logs.length);
210     corruptLog(logs[0], 4);
211 
212     int count = countProcedures(storeRestart());
213     assertEquals(100, count);
214   }
215 
216   @Test
217   public void testCorruptedEntries() throws Exception {
218     
219     for (int i = 0; i < 100; ++i) {
220       procStore.insert(new TestSequentialProcedure(), null);
221     }
222 
223     
224     procStore.stop(false);
225 
226     
227     
228     FileStatus[] logs = fs.listStatus(logDir);
229     assertEquals(1, logs.length);
230     corruptLog(logs[0], 1823);
231 
232     int count = countProcedures(storeRestart());
233     assertTrue(procStore.getCorruptedLogs() != null);
234     assertEquals(1, procStore.getCorruptedLogs().size());
235     assertEquals(85, count);
236   }
237 
238   @Test
239   public void testInsertUpdateDelete() throws Exception {
240     final int NTHREAD = 2;
241 
242     procStore.stop(false);
243     fs.delete(logDir, true);
244 
245     org.apache.hadoop.conf.Configuration conf =
246       new org.apache.hadoop.conf.Configuration(htu.getConfiguration());
247     conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
248     conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000);
249     conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
250 
251     fs.mkdirs(logDir);
252     procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
253     procStore.start(NTHREAD);
254     procStore.recoverLease();
255     assertEquals(0, countProcedures(procStore.load()));
256 
257     final long LAST_PROC_ID = 9999;
258     final Thread[] thread = new Thread[NTHREAD];
259     final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));
260     for (int i = 0; i < thread.length; ++i) {
261       thread[i] = new Thread() {
262         @Override
263         public void run() {
264           Random rand = new Random();
265           TestProcedure proc;
266           do {
267             proc = new TestProcedure(procCounter.addAndGet(1));
268             
269             procStore.insert(proc, null);
270             
271             for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
272               try { Thread.sleep(0, rand.nextInt(15)); } catch (InterruptedException e) {}
273               procStore.update(proc);
274             }
275             
276             procStore.delete(proc.getProcId());
277           } while (proc.getProcId() < LAST_PROC_ID);
278         }
279       };
280       thread[i].start();
281     }
282 
283     for (int i = 0; i < thread.length; ++i) {
284       thread[i].join();
285     }
286 
287     procStore.getStoreTracker().dump();
288     assertTrue(procCounter.get() >= LAST_PROC_ID);
289     assertTrue(procStore.getStoreTracker().isEmpty());
290     assertEquals(1, procStore.getActiveLogs().size());
291   }
292 
293   @Test
294   public void testRollAndRemove() throws IOException {
295     
296     Procedure proc1 = new TestSequentialProcedure();
297     procStore.insert(proc1, null);
298 
299     Procedure proc2 = new TestSequentialProcedure();
300     procStore.insert(proc2, null);
301 
302     
303     procStore.rollWriterForTesting();
304     assertEquals(2, procStore.getActiveLogs().size());
305 
306     
307     
308     procStore.update(proc1);
309     procStore.update(proc2);
310     assertEquals(1, procStore.getActiveLogs().size());
311 
312     
313     procStore.rollWriterForTesting();
314     assertEquals(2, procStore.getActiveLogs().size());
315 
316     
317     
318     procStore.delete(proc1.getProcId());
319     procStore.delete(proc2.getProcId());
320     assertEquals(1, procStore.getActiveLogs().size());
321   }
322 
323   @Test
324   public void testFileNotFoundDuringLeaseRecovery() throws IOException {
325     TestProcedure[] procs = new TestProcedure[3];
326     for (int i = 0; i < procs.length; ++i) {
327       procs[i] = new TestProcedure(i + 1, 0);
328       procStore.insert(procs[i], null);
329     }
330     procStore.rollWriterForTesting();
331     for (int i = 0; i < procs.length; ++i) {
332       procStore.update(procs[i]);
333       procStore.rollWriterForTesting();
334     }
335     procStore.stop(false);
336 
337     FileStatus[] status = fs.listStatus(logDir);
338     assertEquals(procs.length + 2, status.length);
339 
340     
341     procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
342         new WALProcedureStore.LeaseRecovery() {
343       private int count = 0;
344 
345       @Override
346       public void recoverFileLease(FileSystem fs, Path path) throws IOException {
347         if (++count <= 2) {
348           fs.delete(path, false);
349           LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
350           throw new FileNotFoundException("test file not found " + path);
351         }
352         LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
353       }
354     });
355 
356     procStore.start(PROCEDURE_STORE_SLOTS);
357     procStore.recoverLease();
358     int countProcs = countProcedures(procStore.load());
359     assertEquals(procs.length - 1, countProcs);
360     assertTrue(procStore.getCorruptedLogs() == null);
361   }
362 
363   private void corruptLog(final FileStatus logFile, final long dropBytes)
364       throws IOException {
365     assertTrue(logFile.getLen() > dropBytes);
366     LOG.debug("corrupt log " + logFile.getPath() +
367               " size=" + logFile.getLen() + " drop=" + dropBytes);
368     Path tmpPath = new Path(testDir, "corrupted.log");
369     InputStream in = fs.open(logFile.getPath());
370     OutputStream out =  fs.create(tmpPath);
371     IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
372     if (!fs.rename(tmpPath, logFile.getPath())) {
373       throw new IOException("Unable to rename");
374     }
375   }
376 
377   private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
378     int count = 0;
379     Iterator<Procedure> loader = storeRestart();
380     while (loader.hasNext()) {
381       Procedure proc = loader.next();
382       LOG.debug("loading procId=" + proc.getProcId());
383       assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId()));
384       count++;
385     }
386     assertEquals(procIds.size(), count);
387   }
388 
389   private void assertIsEmpty(Iterator<Procedure> iterator) {
390     assertEquals(0, countProcedures(iterator));
391   }
392 
393   private int countProcedures(Iterator<Procedure> iterator) {
394     int count = 0;
395     while (iterator != null && iterator.hasNext()) {
396       Procedure proc = iterator.next();
397       LOG.trace("loading procId=" + proc.getProcId());
398       count++;
399     }
400     return count;
401   }
402 
403   private void assertEmptyLogDir() {
404     try {
405       FileStatus[] status = fs.listStatus(logDir);
406       assertTrue("expected empty state-log dir", status == null || status.length == 0);
407     } catch (FileNotFoundException e) {
408       fail("expected the state-log dir to be present: " + logDir);
409     } catch (IOException e) {
410       fail("got en exception on state-log dir list: " + e.getMessage());
411     }
412   }
413 
414   public static class TestSequentialProcedure extends SequentialProcedure<Void> {
415     private static long seqid = 0;
416 
417     public TestSequentialProcedure() {
418       setProcId(++seqid);
419     }
420 
421     @Override
422     protected Procedure[] execute(Void env) { return null; }
423 
424     @Override
425     protected void rollback(Void env) { }
426 
427     @Override
428     protected boolean abort(Void env) { return false; }
429 
430     @Override
431     protected void serializeStateData(final OutputStream stream) throws IOException {
432       long procId = getProcId();
433       if (procId % 2 == 0) {
434         stream.write(Bytes.toBytes(procId));
435       }
436     }
437 
438     @Override
439     protected void deserializeStateData(InputStream stream) throws IOException {
440       long procId = getProcId();
441       if (procId % 2 == 0) {
442         byte[] bProcId = new byte[8];
443         assertEquals(8, stream.read(bProcId));
444         assertEquals(procId, Bytes.toLong(bProcId));
445       } else {
446         assertEquals(0, stream.available());
447       }
448     }
449   }
450 }