1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.when;
28  
29  import java.io.ByteArrayOutputStream;
30  import java.io.IOException;
31  import java.io.PrintStream;
32  import java.net.URL;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.List;
36  
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.testclassification.MediumTests;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Durability;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HTable;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.client.Table;
57  import org.apache.hadoop.hbase.filter.Filter;
58  import org.apache.hadoop.hbase.filter.FilterBase;
59  import org.apache.hadoop.hbase.filter.PrefixFilter;
60  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
61  import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
62  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
63  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
64  import org.apache.hadoop.hbase.wal.WAL;
65  import org.apache.hadoop.hbase.wal.WALKey;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
68  import org.apache.hadoop.mapreduce.Job;
69  import org.apache.hadoop.mapreduce.Mapper.Context;
70  import org.apache.hadoop.util.GenericOptionsParser;
71  import org.junit.After;
72  import org.junit.AfterClass;
73  import org.junit.Assert;
74  import org.junit.Before;
75  import org.junit.BeforeClass;
76  import org.junit.Test;
77  import org.junit.experimental.categories.Category;
78  import org.mockito.invocation.InvocationOnMock;
79  import org.mockito.stubbing.Answer;
80  
81  
82  
83  
84  @Category(MediumTests.class)
85  public class TestImportExport {
86    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
87    private static final byte[] ROW1 = Bytes.toBytes("row1");
88    private static final byte[] ROW2 = Bytes.toBytes("row2");
89    private static final String FAMILYA_STRING = "a";
90    private static final String FAMILYB_STRING = "b";
91    private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
92    private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
93    private static final byte[] QUAL = Bytes.toBytes("q");
94    private static final String OUTPUT_DIR = "outputdir";
95    private static String FQ_OUTPUT_DIR;
96    private static final String EXPORT_BATCH_SIZE = "100";
97  
98    private static long now = System.currentTimeMillis();
99  
100   @BeforeClass
101   public static void beforeClass() throws Exception {
102     UTIL.startMiniCluster();
103     UTIL.startMiniMapReduceCluster();
104     FQ_OUTPUT_DIR =  new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
105   }
106 
107   @AfterClass
108   public static void afterClass() throws Exception {
109     UTIL.shutdownMiniMapReduceCluster();
110     UTIL.shutdownMiniCluster();
111   }
112 
113   @Before
114   @After
115   public void cleanup() throws Exception {
116     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
117     fs.delete(new Path(OUTPUT_DIR), true);
118   }
119 
120   
121 
122 
123 
124 
125 
126 
127 
128   boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
129     
130     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
131     Configuration conf = opts.getConfiguration();
132     args = opts.getRemainingArgs();
133     Job job = Export.createSubmittableJob(conf, args);
134     job.waitForCompletion(false);
135     return job.isSuccessful();
136   }
137 
138   
139 
140 
141 
142 
143 
144 
145 
146   boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
147     
148     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
149     Configuration conf = opts.getConfiguration();
150     args = opts.getRemainingArgs();
151     Job job = Import.createSubmittableJob(conf, args);
152     job.waitForCompletion(false);
153     return job.isSuccessful();
154   }
155 
156   
157 
158 
159 
160   @Test
161   public void testSimpleCase() throws Exception {
162     String EXPORT_TABLE = "exportSimpleCase";
163     Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);
164     Put p = new Put(ROW1);
165     p.add(FAMILYA, QUAL, now, QUAL);
166     p.add(FAMILYA, QUAL, now+1, QUAL);
167     p.add(FAMILYA, QUAL, now+2, QUAL);
168     t.put(p);
169     p = new Put(ROW2);
170     p.add(FAMILYA, QUAL, now, QUAL);
171     p.add(FAMILYA, QUAL, now+1, QUAL);
172     p.add(FAMILYA, QUAL, now+2, QUAL);
173     t.put(p);
174 
175     String[] args = new String[] {
176         EXPORT_TABLE,
177         FQ_OUTPUT_DIR,
178         "1000", 
179     };
180     assertTrue(runExport(args));
181 
182     String IMPORT_TABLE = "importTableSimpleCase";
183     t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);
184     args = new String[] {
185         "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
186         IMPORT_TABLE,
187         FQ_OUTPUT_DIR
188     };
189     assertTrue(runImport(args));
190 
191     Get g = new Get(ROW1);
192     g.setMaxVersions();
193     Result r = t.get(g);
194     assertEquals(3, r.size());
195     g = new Get(ROW2);
196     g.setMaxVersions();
197     r = t.get(g);
198     assertEquals(3, r.size());
199   }
200 
201   
202 
203 
204 
205 
206   @Test
207   public void testMetaExport() throws Exception {
208     String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
209     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
210     assertTrue(runExport(args));
211   }
212 
213   
214 
215 
216 
217   @Test
218   public void testImport94Table() throws Exception {
219     URL url = TestImportExport.class.getResource(
220         "exportedTableIn94Format");
221     Path importPath = new Path(url.getPath());
222     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
223     fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR
224         + "exportedTableIn94Format"));
225     String IMPORT_TABLE = "importTableExportedFrom94";
226     Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
227     String[] args = new String[] {
228         "-Dhbase.import.version=0.94" ,
229         IMPORT_TABLE, FQ_OUTPUT_DIR
230     };
231     assertTrue(runImport(args));
232 
233     
234 
235 
236 
237 
238 
239 
240 
241     assertEquals(5, UTIL.countRows(t));
242     t.close();
243   }
244 
245   
246 
247 
248    @Test
249    public void testExportScannerBatching() throws Exception {
250     String BATCH_TABLE = "exportWithBatch";
251     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(BATCH_TABLE));
252     desc.addFamily(new HColumnDescriptor(FAMILYA)
253         .setMaxVersions(1)
254     );
255     UTIL.getHBaseAdmin().createTable(desc);
256     Table t = new HTable(UTIL.getConfiguration(), desc.getTableName());
257 
258     Put p = new Put(ROW1);
259     p.add(FAMILYA, QUAL, now, QUAL);
260     p.add(FAMILYA, QUAL, now+1, QUAL);
261     p.add(FAMILYA, QUAL, now+2, QUAL);
262     p.add(FAMILYA, QUAL, now+3, QUAL);
263     p.add(FAMILYA, QUAL, now+4, QUAL);
264     t.put(p);
265 
266     String[] args = new String[] {
267         "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  
268         BATCH_TABLE,
269         FQ_OUTPUT_DIR
270     };
271     assertTrue(runExport(args));
272 
273     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
274     fs.delete(new Path(FQ_OUTPUT_DIR), true);
275     t.close();
276   }
277 
278   @Test
279   public void testWithDeletes() throws Exception {
280     String EXPORT_TABLE = "exportWithDeletes";
281     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
282     desc.addFamily(new HColumnDescriptor(FAMILYA)
283         .setMaxVersions(5)
284         .setKeepDeletedCells(true)
285     );
286     UTIL.getHBaseAdmin().createTable(desc);
287     Table t = new HTable(UTIL.getConfiguration(), desc.getTableName());
288 
289     Put p = new Put(ROW1);
290     p.add(FAMILYA, QUAL, now, QUAL);
291     p.add(FAMILYA, QUAL, now+1, QUAL);
292     p.add(FAMILYA, QUAL, now+2, QUAL);
293     p.add(FAMILYA, QUAL, now+3, QUAL);
294     p.add(FAMILYA, QUAL, now+4, QUAL);
295     t.put(p);
296 
297     Delete d = new Delete(ROW1, now+3);
298     t.delete(d);
299     d = new Delete(ROW1);
300     d.deleteColumns(FAMILYA, QUAL, now+2);
301     t.delete(d);
302 
303     String[] args = new String[] {
304         "-D" + Export.RAW_SCAN + "=true",
305         EXPORT_TABLE,
306         FQ_OUTPUT_DIR,
307         "1000", 
308     };
309     assertTrue(runExport(args));
310 
311     String IMPORT_TABLE = "importWithDeletes";
312     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
313     desc.addFamily(new HColumnDescriptor(FAMILYA)
314         .setMaxVersions(5)
315         .setKeepDeletedCells(true)
316     );
317     UTIL.getHBaseAdmin().createTable(desc);
318     t.close();
319     t = new HTable(UTIL.getConfiguration(), desc.getTableName());
320     args = new String[] {
321         IMPORT_TABLE,
322         FQ_OUTPUT_DIR
323     };
324     assertTrue(runImport(args));
325 
326     Scan s = new Scan();
327     s.setMaxVersions();
328     s.setRaw(true);
329     ResultScanner scanner = t.getScanner(s);
330     Result r = scanner.next();
331     Cell[] res = r.rawCells();
332     assertTrue(CellUtil.isDeleteFamily(res[0]));
333     assertEquals(now+4, res[1].getTimestamp());
334     assertEquals(now+3, res[2].getTimestamp());
335     assertTrue(CellUtil.isDelete(res[3]));
336     assertEquals(now+2, res[4].getTimestamp());
337     assertEquals(now+1, res[5].getTimestamp());
338     assertEquals(now, res[6].getTimestamp());
339     t.close();
340   }
341   
342   
343   @Test
344   public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
345     String EXPORT_TABLE = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
346     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
347     desc.addFamily(new HColumnDescriptor(FAMILYA)
348         .setMaxVersions(5)
349         .setKeepDeletedCells(true)
350     );
351     UTIL.getHBaseAdmin().createTable(desc);
352     HTable exportT = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
353 
354     
355     Put p = new Put(ROW1);
356     p.add(FAMILYA, QUAL, now, QUAL);
357     exportT.put(p);
358 
359     
360     Delete d = new Delete(ROW1, now+3);
361     exportT.delete(d);
362 
363     
364     p = new Put(ROW1);
365     p.add(FAMILYA, QUAL, now+5, "s".getBytes());
366     exportT.put(p);
367 
368     
369     d = new Delete(ROW1, now+7);
370     exportT.delete(d);
371     
372     
373     String[] args = new String[] {
374         "-D" + Export.RAW_SCAN + "=true",
375         EXPORT_TABLE,
376         FQ_OUTPUT_DIR,
377         "1000", 
378     };
379     assertTrue(runExport(args));
380 
381     String IMPORT_TABLE = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
382     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
383     desc.addFamily(new HColumnDescriptor(FAMILYA)
384         .setMaxVersions(5)
385         .setKeepDeletedCells(true)
386     );
387     UTIL.getHBaseAdmin().createTable(desc);
388     
389     HTable importT = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
390     args = new String[] {
391         IMPORT_TABLE,
392         FQ_OUTPUT_DIR
393     };
394     assertTrue(runImport(args));
395 
396     Scan s = new Scan();
397     s.setMaxVersions();
398     s.setRaw(true);
399     
400     ResultScanner importedTScanner = importT.getScanner(s);
401     Result importedTResult = importedTScanner.next();
402     
403     ResultScanner exportedTScanner = exportT.getScanner(s);
404     Result  exportedTResult =  exportedTScanner.next();
405     try
406     {
407       Result.compareResults(exportedTResult, importedTResult);
408     }
409     catch (Exception e) {
410       fail("Original and imported tables data comparision failed with error:"+e.getMessage());
411     }
412     finally
413     {
414       exportT.close();
415       importT.close();
416     }
417   }
418 
419   
420 
421 
422 
423   @Test
424   public void testWithFilter() throws Exception {
425     
426     String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
427     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
428     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
429     UTIL.getHBaseAdmin().createTable(desc);
430     Table exportTable = new HTable(UTIL.getConfiguration(), desc.getTableName());
431 
432     Put p1 = new Put(ROW1);
433     p1.add(FAMILYA, QUAL, now, QUAL);
434     p1.add(FAMILYA, QUAL, now + 1, QUAL);
435     p1.add(FAMILYA, QUAL, now + 2, QUAL);
436     p1.add(FAMILYA, QUAL, now + 3, QUAL);
437     p1.add(FAMILYA, QUAL, now + 4, QUAL);
438 
439     
440     Put p2 = new Put(ROW2);
441     p2.add(FAMILYA, QUAL, now, QUAL);
442 
443     exportTable.put(Arrays.asList(p1, p2));
444 
445     
446     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
447     assertTrue(runExport(args));
448 
449     
450     String IMPORT_TABLE = "importWithFilter";
451     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
452     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
453     UTIL.getHBaseAdmin().createTable(desc);
454 
455     Table importTable = new HTable(UTIL.getConfiguration(), desc.getTableName());
456     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
457         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
458         "1000" };
459     assertTrue(runImport(args));
460 
461     
462     PrefixFilter filter = new PrefixFilter(ROW1);
463     int count = getCount(exportTable, filter);
464 
465     Assert.assertEquals("Unexpected row count between export and import tables", count,
466       getCount(importTable, null));
467 
468     
469     
470 
471     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
472         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
473         FQ_OUTPUT_DIR, "1000" };
474     assertFalse(runImport(args));
475 
476     
477     exportTable.close();
478     importTable.close();
479   }
480 
481   
482 
483 
484 
485 
486 
487 
488 
489   private int getCount(Table table, Filter filter) throws IOException {
490     Scan scan = new Scan();
491     scan.setFilter(filter);
492     ResultScanner results = table.getScanner(scan);
493     int count = 0;
494     for (Result res : results) {
495       count += res.size();
496     }
497     results.close();
498     return count;
499   }
500   
501   
502 
503 
504   @Test
505   public void testImportMain() throws Exception {
506     PrintStream oldPrintStream = System.err;
507     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
508     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
509     System.setSecurityManager(newSecurityManager);
510     ByteArrayOutputStream data = new ByteArrayOutputStream();
511     String[] args = {};
512     System.setErr(new PrintStream(data));
513     try {
514       System.setErr(new PrintStream(data));
515       Import.main(args);
516       fail("should be SecurityException");
517     } catch (SecurityException e) {
518       assertEquals(-1, newSecurityManager.getExitCode());
519       assertTrue(data.toString().contains("Wrong number of arguments:"));
520       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
521       assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
522       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
523       assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
524     } finally {
525       System.setErr(oldPrintStream);
526       System.setSecurityManager(SECURITY_MANAGER);
527     }
528   }
529 
530   
531 
532 
533   @Test
534   public void testExportMain() throws Exception {
535     PrintStream oldPrintStream = System.err;
536     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
537     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
538     System.setSecurityManager(newSecurityManager);
539     ByteArrayOutputStream data = new ByteArrayOutputStream();
540     String[] args = {};
541     System.setErr(new PrintStream(data));
542     try {
543       System.setErr(new PrintStream(data));
544       Export.main(args);
545       fail("should be SecurityException");
546     } catch (SecurityException e) {
547       assertEquals(-1, newSecurityManager.getExitCode());
548       assertTrue(data.toString().contains("Wrong number of arguments:"));
549       assertTrue(data.toString().contains(
550               "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
551               "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
552       assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
553       assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
554       assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
555       assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
556       assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
557       assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
558     } finally {
559       System.setErr(oldPrintStream);
560       System.setSecurityManager(SECURITY_MANAGER);
561     }
562   }
563 
564   
565 
566 
567   @SuppressWarnings({ "unchecked", "rawtypes" })
568   @Test
569   public void testKeyValueImporter() throws Exception {
570     KeyValueImporter importer = new KeyValueImporter();
571     Configuration configuration = new Configuration();
572     Context ctx = mock(Context.class);
573     when(ctx.getConfiguration()).thenReturn(configuration);
574 
575     doAnswer(new Answer<Void>() {
576 
577       @Override
578       public Void answer(InvocationOnMock invocation) throws Throwable {
579         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
580         KeyValue key = (KeyValue) invocation.getArguments()[1];
581         assertEquals("Key", Bytes.toString(writer.get()));
582         assertEquals("row", Bytes.toString(key.getRow()));
583         return null;
584       }
585     }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
586 
587     importer.setup(ctx);
588     Result value = mock(Result.class);
589     KeyValue[] keys = {
590         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
591             Bytes.toBytes("value")),
592         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
593             Bytes.toBytes("value1")) };
594     when(value.rawCells()).thenReturn(keys);
595     importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
596 
597   }
598 
599   
600 
601 
602 
603   @Test
604   public void testAddFilterAndArguments() throws IOException {
605     Configuration configuration = new Configuration();
606 
607     List<String> args = new ArrayList<String>();
608     args.add("param1");
609     args.add("param2");
610 
611     Import.addFilterAndArguments(configuration, FilterBase.class, args);
612     assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 
613         configuration.get(Import.FILTER_CLASS_CONF_KEY));
614     assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
615   }
616 
617   @Test
618   public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
619     
620     String exportTableName = "exporttestDurability";
621     Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);
622 
623     
624     Put put = new Put(ROW1);
625     put.add(FAMILYA, QUAL, now, QUAL);
626     put.add(FAMILYA, QUAL, now + 1, QUAL);
627     put.add(FAMILYA, QUAL, now + 2, QUAL);
628     exportTable.put(put);
629 
630     put = new Put(ROW2);
631     put.add(FAMILYA, QUAL, now, QUAL);
632     put.add(FAMILYA, QUAL, now + 1, QUAL);
633     put.add(FAMILYA, QUAL, now + 2, QUAL);
634     exportTable.put(put);
635 
636     
637     String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
638     assertTrue(runExport(args));
639 
640     
641     String importTableName = "importTestDurability1";
642     Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
643 
644     
645     TableWALActionListener walListener = new TableWALActionListener(importTableName);
646     WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(null);
647     wal.registerWALActionsListener(walListener);
648 
649     
650     args =
651         new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
652             importTableName, FQ_OUTPUT_DIR };
653     assertTrue(runImport(args));
654     
655     assertTrue(!walListener.isWALVisited());
656     
657     assertTrue(getCount(importTable, null) == 2);
658 
659     
660     importTableName = "importTestDurability2";
661     importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
662     wal.unregisterWALActionsListener(walListener);
663     walListener = new TableWALActionListener(importTableName);
664     wal.registerWALActionsListener(walListener);
665     args = new String[] { importTableName, FQ_OUTPUT_DIR };
666     assertTrue(runImport(args));
667     
668     assertTrue(walListener.isWALVisited());
669     
670     assertTrue(getCount(importTable, null) == 2);
671   }
672 
673   
674 
675 
676 
677   private static class TableWALActionListener extends WALActionsListener.Base {
678 
679     private String tableName;
680     private boolean isVisited = false;
681 
682     public TableWALActionListener(String tableName) {
683       this.tableName = tableName;
684     }
685 
686     @Override
687     public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
688       if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
689         isVisited = true;
690       }
691     }
692 
693     public boolean isWALVisited() {
694       return isVisited;
695     }
696   }  
697 }