1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.FileNotFoundException;
27  import java.io.IOException;
28  import java.lang.reflect.Method;
29  import java.security.PrivilegedExceptionAction;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableSet;
37  import java.util.Set;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.fs.FSDataInputStream;
46  import org.apache.hadoop.fs.FSDataOutputStream;
47  import org.apache.hadoop.fs.FileStatus;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.FileUtil;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.fs.PathFilter;
52  import org.apache.hadoop.hbase.Cell;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseTestingUtility;
55  import org.apache.hadoop.hbase.HConstants;
56  import org.apache.hadoop.hbase.HRegionInfo;
57  import org.apache.hadoop.hbase.KeyValue;
58  import org.apache.hadoop.hbase.testclassification.LargeTests;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61  import org.apache.hadoop.hbase.regionserver.HRegion;
62  import org.apache.hadoop.hbase.wal.WAL.Entry;
63  import org.apache.hadoop.hbase.wal.WAL.Reader;
64  import org.apache.hadoop.hbase.wal.WALProvider.Writer;
65  import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
66  import org.apache.hadoop.hbase.security.User;
67  import org.apache.hadoop.hbase.util.Bytes;
68  import org.apache.hadoop.hbase.util.CancelableProgressable;
69  import org.apache.hadoop.hbase.util.FSUtils;
70  import org.apache.hadoop.hbase.util.Threads;
71  import org.apache.hadoop.hdfs.DFSTestUtil;
72  import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
73  import org.apache.hadoop.ipc.RemoteException;
74  import org.junit.After;
75  import org.junit.AfterClass;
76  import org.junit.Before;
77  import org.junit.BeforeClass;
78  import org.junit.Rule;
79  import org.junit.rules.TestName;
80  import org.junit.Test;
81  import org.junit.experimental.categories.Category;
82  import org.mockito.Mockito;
83  import org.mockito.invocation.InvocationOnMock;
84  import org.mockito.stubbing.Answer;
85  
86  import com.google.common.base.Joiner;
87  import com.google.common.collect.ImmutableList;
88  
89  
90  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
91  import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
92  import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
93  import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
94  
95  
96  
97  
98  @Category(LargeTests.class)
99  public class TestWALSplit {
100   {
101     
102     
103     
104     
105     
106   }
107   private final static Log LOG = LogFactory.getLog(TestWALSplit.class);
108 
109   private static Configuration conf;
110   private FileSystem fs;
111 
112   protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
113 
114   private Path HBASEDIR;
115   private Path WALDIR;
116   private Path OLDLOGDIR;
117   private Path CORRUPTDIR;
118   private Path TABLEDIR;
119 
120   private static final int NUM_WRITERS = 10;
121   private static final int ENTRIES = 10; 
122 
123   private static final TableName TABLE_NAME =
124       TableName.valueOf("t1");
125   private static final byte[] FAMILY = "f1".getBytes();
126   private static final byte[] QUALIFIER = "q1".getBytes();
127   private static final byte[] VALUE = "v1".getBytes();
128   private static final String WAL_FILE_PREFIX = "wal.dat.";
129   private static List<String> REGIONS = new ArrayList<String>();
130   private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
131   private static String ROBBER;
132   private static String ZOMBIE;
133   private static String [] GROUP = new String [] {"supergroup"};
134   private RecoveryMode mode;
135 
136   static enum Corruptions {
137     INSERT_GARBAGE_ON_FIRST_LINE,
138     INSERT_GARBAGE_IN_THE_MIDDLE,
139     APPEND_GARBAGE,
140     TRUNCATE,
141     TRUNCATE_TRAILER
142   }
143 
144   @BeforeClass
145   public static void setUpBeforeClass() throws Exception {
146     conf = TEST_UTIL.getConfiguration();
147     conf.setClass("hbase.regionserver.hlog.writer.impl",
148       InstrumentedLogWriter.class, Writer.class);
149     conf.setBoolean("dfs.support.broken.append", true);
150     conf.setBoolean("dfs.support.append", true);
151     
152     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
153     
154     Map<String, String []> u2g_map = new HashMap<String, String []>(2);
155     ROBBER = User.getCurrent().getName() + "-robber";
156     ZOMBIE = User.getCurrent().getName() + "-zombie";
157     u2g_map.put(ROBBER, GROUP);
158     u2g_map.put(ZOMBIE, GROUP);
159     DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
160     conf.setInt("dfs.heartbeat.interval", 1);
161     TEST_UTIL.startMiniDFSCluster(2);
162   }
163 
164   @AfterClass
165   public static void tearDownAfterClass() throws Exception {
166     TEST_UTIL.shutdownMiniDFSCluster();
167   }
168 
169   @Rule
170   public TestName name = new TestName();
171   private WALFactory wals = null;
172 
173   @Before
174   public void setUp() throws Exception {
175     LOG.info("Cleaning up cluster for new test.");
176     fs = TEST_UTIL.getDFSCluster().getFileSystem();
177     HBASEDIR = TEST_UTIL.createRootDir();
178     OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
179     CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
180     TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
181     REGIONS.clear();
182     Collections.addAll(REGIONS, "bbb", "ccc");
183     InstrumentedLogWriter.activateFailure = false;
184     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
185         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
186     wals = new WALFactory(conf, null, name.getMethodName());
187     WALDIR = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
188     
189   }
190 
191   @After
192   public void tearDown() throws Exception {
193     try {
194       wals.close();
195     } catch(IOException exception) {
196       
197       LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
198          " you see a failure look here.");
199       LOG.debug("exception details", exception);
200     } finally {
201       wals = null;
202       fs.delete(HBASEDIR, true);
203     }
204   }
205 
206   
207 
208 
209 
210 
211 
212   @Test (timeout=300000)
213   public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
214     final AtomicLong counter = new AtomicLong(0);
215     AtomicBoolean stop = new AtomicBoolean(false);
216     
217     final String region = REGIONS.get(0);
218     final int numWriters = 3;
219     Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
220     try {
221       long startCount = counter.get();
222       zombie.start();
223       
224       while (startCount == counter.get()) Threads.sleep(1);
225       
226       Threads.sleep(1000);
227       final Configuration conf2 = HBaseConfiguration.create(this.conf);
228       final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
229       int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
230         @Override
231         public Integer run() throws Exception {
232           StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
233               .append("):\n");
234           for (FileStatus status : fs.listStatus(WALDIR)) {
235             ls.append("\t").append(status.toString()).append("\n");
236           }
237           LOG.debug(ls);
238           LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
239           WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
240           LOG.info("Finished splitting out from under zombie.");
241           Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
242           assertEquals("wrong number of split files for region", numWriters, logfiles.length);
243           int count = 0;
244           for (Path logfile: logfiles) {
245             count += countWAL(logfile);
246           }
247           return count;
248         }
249       });
250       LOG.info("zombie=" + counter.get() + ", robber=" + count);
251       assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
252         "Zombie could write " + counter.get() + " and logfile had only " + count,
253         counter.get() == count || counter.get() + 1 == count);
254     } finally {
255       stop.set(true);
256       zombie.interrupt();
257       Threads.threadDumpingIsAlive(zombie);
258     }
259   }
260 
261   
262 
263 
264 
265 
266 
267   class ZombieLastLogWriterRegionServer extends Thread {
268     final AtomicLong editsCount;
269     final AtomicBoolean stop;
270     final int numOfWriters;
271     
272 
273 
274     final String region;
275     final User user;
276 
277     public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
278         final String region, final int writers)
279         throws IOException, InterruptedException {
280       super("ZombieLastLogWriterRegionServer");
281       setDaemon(true);
282       this.stop = stop;
283       this.editsCount = counter;
284       this.region = region;
285       this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
286       numOfWriters = writers;
287     }
288 
289     @Override
290     public void run() {
291       try {
292         doWriting();
293       } catch (IOException e) {
294         LOG.warn(getName() + " Writer exiting " + e);
295       } catch (InterruptedException e) {
296         LOG.warn(getName() + " Writer exiting " + e);
297       }
298     }
299 
300     private void doWriting() throws IOException, InterruptedException {
301       this.user.runAs(new PrivilegedExceptionAction<Object>() {
302         @Override
303         public Object run() throws Exception {
304           
305           
306           int walToKeepOpen = numOfWriters - 1;
307           
308           
309           Writer writer = null;
310           try {
311             writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
312           } catch (IOException e1) {
313             throw new RuntimeException("Failed", e1);
314           }
315           
316           editsCount.addAndGet(numOfWriters * ENTRIES);
317           loop(writer);
318           
319           
320           try {
321             writer.close();
322             fail("Writing closing after parsing should give an error.");
323           } catch (IOException exception) {
324             LOG.debug("ignoring error when closing final writer.", exception);
325           }
326           return null;
327         }
328       });
329     }
330 
331     private void loop(final Writer writer) {
332       byte [] regionBytes = Bytes.toBytes(this.region);
333       while (!stop.get()) {
334         try {
335           long seq = appendEntry(writer, TABLE_NAME, regionBytes,
336               ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0);
337           long count = editsCount.incrementAndGet();
338           LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
339           try {
340             Thread.sleep(1);
341           } catch (InterruptedException e) {
342             
343           }
344         } catch (IOException ex) {
345           LOG.error(getName() + " ex " + ex.toString());
346           if (ex instanceof RemoteException) {
347             LOG.error("Juliet: got RemoteException " + ex.getMessage() +
348               " while writing " + (editsCount.get() + 1));
349           } else {
350             LOG.error(getName() + " failed to write....at " + editsCount.get());
351             fail("Failed to write " + editsCount.get());
352           }
353           break;
354         } catch (Throwable t) {
355           LOG.error(getName() + " HOW? " + t);
356           LOG.debug("exception details", t);
357           break;
358         }
359       }
360       LOG.info(getName() + " Writer exiting");
361     }
362   }
363 
364   
365 
366 
367 
368   @Test (timeout=300000)
369   public void testRecoveredEditsPathForMeta() throws IOException {
370     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
371     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
372     Path regiondir = new Path(tdir,
373         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
374     fs.mkdirs(regiondir);
375     long now = System.currentTimeMillis();
376     Entry entry =
377         new Entry(new WALKey(encoded,
378             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
379       new WALEdit());
380     Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
381     String parentOfParent = p.getParent().getParent().getName();
382     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
383   }
384 
385   
386 
387 
388 
389   @Test (timeout=300000)
390   public void testOldRecoveredEditsFileSidelined() throws IOException {
391     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
392     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
393     Path regiondir = new Path(tdir,
394         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
395     fs.mkdirs(regiondir);
396     long now = System.currentTimeMillis();
397     Entry entry =
398         new Entry(new WALKey(encoded,
399             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
400       new WALEdit());
401     Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
402     assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
403     fs.createNewFile(parent); 
404 
405     Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
406     String parentOfParent = p.getParent().getParent().getName();
407     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
408     WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
409   }
410 
411   private void useDifferentDFSClient() throws IOException {
412     
413     
414     fs.initialize(fs.getUri(), conf);
415   }
416 
417   @Test (timeout=300000)
418   public void testSplitPreservesEdits() throws IOException{
419     final String REGION = "region__1";
420     REGIONS.clear();
421     REGIONS.add(REGION);
422 
423     generateWALs(1, 10, -1);
424     useDifferentDFSClient();
425     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
426     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
427     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
428     assertEquals(1, splitLog.length);
429 
430     assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
431   }
432 
433   
434 
435 
436 
437   private int splitAndCount(final int expectedFiles, final int expectedEntries)
438       throws IOException {
439     useDifferentDFSClient();
440     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
441     int result = 0;
442     for (String region : REGIONS) {
443       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
444       assertEquals(expectedFiles, logfiles.length);
445       int count = 0;
446       for (Path logfile: logfiles) {
447         count += countWAL(logfile);
448       }
449       if (-1 != expectedEntries) {
450         assertEquals(expectedEntries, count);
451       }
452       result += count;
453     }
454     return result;
455   }
456 
457   @Test (timeout=300000)
458   public void testEmptyLogFiles() throws IOException {
459     testEmptyLogFiles(true);
460   }
461 
462   @Test (timeout=300000)
463   public void testEmptyOpenLogFiles() throws IOException {
464     testEmptyLogFiles(false);
465   }
466 
467   private void testEmptyLogFiles(final boolean close) throws IOException {
468     injectEmptyFile(".empty", close);
469     generateWALs(Integer.MAX_VALUE);
470     injectEmptyFile("empty", close);
471     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 
472   }
473 
474   @Test (timeout=300000)
475   public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
476     
477     generateWALs(5);
478     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
479   }
480 
481   @Test (timeout=300000)
482   public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
483     conf.setBoolean(HBASE_SKIP_ERRORS, true);
484     generateWALs(Integer.MAX_VALUE);
485     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
486             Corruptions.APPEND_GARBAGE, true);
487     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
488   }
489 
490   @Test (timeout=300000)
491   public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
492     conf.setBoolean(HBASE_SKIP_ERRORS, true);
493     generateWALs(Integer.MAX_VALUE);
494     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
495             Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
496     splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); 
497   }
498 
499   @Test (timeout=300000)
500   public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
501     conf.setBoolean(HBASE_SKIP_ERRORS, true);
502     generateWALs(Integer.MAX_VALUE);
503     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
504             Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
505     
506     
507     
508     int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
509     int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
510     int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
511     assertTrue("The file up to the corrupted area hasn't been parsed",
512         REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
513   }
514 
515   @Test (timeout=300000)
516   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
517     conf.setBoolean(HBASE_SKIP_ERRORS, true);
518     for (FaultySequenceFileLogReader.FailureType  failureType :
519         FaultySequenceFileLogReader.FailureType.values()) {
520       final Set<String> walDirContents = splitCorruptWALs(failureType);
521       final Set<String> archivedLogs = new HashSet<String>();
522       final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
523       for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
524         archived.append("\n\t").append(log.toString());
525         archivedLogs.add(log.getPath().getName());
526       }
527       LOG.debug(archived.toString());
528       assertEquals(failureType.name() + ": expected to find all of our wals corrupt.",
529           walDirContents, archivedLogs);
530     }
531   }
532 
533   
534 
535 
536 
537   private Set<String> splitCorruptWALs(final FaultySequenceFileLogReader.FailureType failureType)
538       throws IOException {
539     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
540         Reader.class);
541     InstrumentedLogWriter.activateFailure = false;
542 
543     try {
544       conf.setClass("hbase.regionserver.hlog.reader.impl",
545           FaultySequenceFileLogReader.class, Reader.class);
546       conf.set("faultysequencefilelogreader.failuretype", failureType.name());
547       
548       try {
549         wals.shutdown();
550       } catch (IOException exception) {
551         
552         LOG.debug("Ignoring problem closing WALFactory.", exception);
553       }
554       wals.close();
555       try {
556         for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
557           fs.delete(log.getPath(), true);
558         }
559       } catch (FileNotFoundException exception) {
560         LOG.debug("no previous CORRUPTDIR to clean.");
561       }
562       
563       wals = new WALFactory(conf, null, name.getMethodName());
564       generateWALs(-1);
565       
566       final Set<String> walDirContents = new HashSet<String>();
567       for (FileStatus status : fs.listStatus(WALDIR)) {
568         walDirContents.add(status.getPath().getName());
569       }
570       useDifferentDFSClient();
571       WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
572       return walDirContents;
573     } finally {
574       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
575           Reader.class);
576     }
577   }
578 
579   @Test (timeout=300000, expected = IOException.class)
580   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
581       throws IOException {
582     conf.setBoolean(HBASE_SKIP_ERRORS, false);
583     splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
584   }
585 
586   @Test (timeout=300000)
587   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
588       throws IOException {
589     conf.setBoolean(HBASE_SKIP_ERRORS, false);
590     try {
591       splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
592     } catch (IOException e) {
593       LOG.debug("split with 'skip errors' set to 'false' correctly threw");
594     }
595     assertEquals("if skip.errors is false all files should remain in place",
596         NUM_WRITERS + 1 
597   }
598 
599   private void ignoreCorruption(final Corruptions corruption, final int entryCount,
600       final int expectedCount) throws IOException {
601     conf.setBoolean(HBASE_SKIP_ERRORS, false);
602 
603     final String REGION = "region__1";
604     REGIONS.clear();
605     REGIONS.add(REGION);
606 
607     Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
608     generateWALs(1, entryCount, -1);
609     corruptWAL(c1, corruption, true);
610 
611     useDifferentDFSClient();
612     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
613 
614     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
615     assertEquals(1, splitLog.length);
616 
617     int actualCount = 0;
618     Reader in = wals.createReader(fs, splitLog[0]);
619     @SuppressWarnings("unused")
620     Entry entry;
621     while ((entry = in.next()) != null) ++actualCount;
622     assertEquals(expectedCount, actualCount);
623     in.close();
624 
625     
626     FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
627     assertEquals(archivedLogs.length, 0);
628 
629   }
630 
631   @Test (timeout=300000)
632   public void testEOFisIgnored() throws IOException {
633     int entryCount = 10;
634     ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
635   }
636 
637   @Test (timeout=300000)
638   public void testCorruptWALTrailer() throws IOException {
639     int entryCount = 10;
640     ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
641   }
642 
643   @Test (timeout=300000)
644   public void testLogsGetArchivedAfterSplit() throws IOException {
645     conf.setBoolean(HBASE_SKIP_ERRORS, false);
646     generateWALs(-1);
647     useDifferentDFSClient();
648     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
649     FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
650     assertEquals("wrong number of files in the archive log", NUM_WRITERS + 1 
651         archivedLogs.length);
652   }
653 
654   @Test (timeout=300000)
655   public void testSplit() throws IOException {
656     generateWALs(-1);
657     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
658   }
659 
660   @Test (timeout=300000)
661   public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
662   throws IOException {
663     generateWALs(-1);
664     useDifferentDFSClient();
665     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
666     FileStatus [] statuses = null;
667     try {
668       statuses = fs.listStatus(WALDIR);
669       if (statuses != null) {
670         fail("Files left in log dir: " +
671             Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
672       }
673     } catch (FileNotFoundException e) {
674       
675     }
676   }
677 
678   @Test(timeout=300000, expected = IOException.class)
679   public void testSplitWillFailIfWritingToRegionFails() throws Exception {
680     
681     Writer writer = generateWALs(4);
682     useDifferentDFSClient();
683 
684     String region = "break";
685     Path regiondir = new Path(TABLEDIR, region);
686     fs.mkdirs(regiondir);
687 
688     InstrumentedLogWriter.activateFailure = false;
689     appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
690         ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
691     writer.close();
692 
693     try {
694       InstrumentedLogWriter.activateFailure = true;
695       WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
696     } catch (IOException e) {
697       assertTrue(e.getMessage().
698         contains("This exception is instrumented and should only be thrown for testing"));
699       throw e;
700     } finally {
701       InstrumentedLogWriter.activateFailure = false;
702     }
703   }
704 
705   @Test (timeout=300000)
706   public void testSplitDeletedRegion() throws IOException {
707     REGIONS.clear();
708     String region = "region_that_splits";
709     REGIONS.add(region);
710 
711     generateWALs(1);
712     useDifferentDFSClient();
713 
714     Path regiondir = new Path(TABLEDIR, region);
715     fs.delete(regiondir, true);
716     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
717     assertFalse(fs.exists(regiondir));
718   }
719 
720   @Test (timeout=300000)
721   public void testIOEOnOutputThread() throws Exception {
722     conf.setBoolean(HBASE_SKIP_ERRORS, false);
723 
724     generateWALs(-1);
725     useDifferentDFSClient();
726     FileStatus[] logfiles = fs.listStatus(WALDIR);
727     assertTrue("There should be some log file",
728       logfiles != null && logfiles.length > 0);
729     
730     
731     
732     int largestLogFile = 0;
733     long largestSize = 0;
734     for (int i = 0; i < logfiles.length; i++) {
735       if (logfiles[i].getLen() > largestSize) {
736         largestLogFile = i;
737         largestSize = logfiles[i].getLen();
738       }
739     }
740     assertTrue("There should be some log greater than size 0.", 0 < largestSize);
741     
742     WALSplitter logSplitter = new WALSplitter(wals,
743         conf, HBASEDIR, fs, null, null, this.mode) {
744       @Override
745       protected Writer createWriter(Path logfile) throws IOException {
746         Writer mockWriter = Mockito.mock(Writer.class);
747         Mockito.doThrow(new IOException("Injected")).when(
748           mockWriter).append(Mockito.<Entry>any());
749         return mockWriter;
750       }
751     };
752     
753     
754     final AtomicBoolean stop = new AtomicBoolean(false);
755     final Thread someOldThread = new Thread("Some-old-thread") {
756       @Override
757       public void run() {
758         while(!stop.get()) Threads.sleep(10);
759       }
760     };
761     someOldThread.setDaemon(true);
762     someOldThread.start();
763     final Thread t = new Thread("Background-thread-dumper") {
764       public void run() {
765         try {
766           Threads.threadDumpingIsAlive(someOldThread);
767         } catch (InterruptedException e) {
768           e.printStackTrace();
769         }
770       }
771     };
772     t.setDaemon(true);
773     t.start();
774     try {
775       logSplitter.splitLogFile(logfiles[largestLogFile], null);
776       fail("Didn't throw!");
777     } catch (IOException ioe) {
778       assertTrue(ioe.toString().contains("Injected"));
779     } finally {
780       
781       stop.set(true);
782     }
783   }
784 
785   
786 
787 
788   private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
789     generateWALs(-1);
790     useDifferentDFSClient();
791 
792     try {
793       WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
794       assertEquals(NUM_WRITERS + 1 
795       assertFalse(fs.exists(WALDIR));
796     } catch (IOException e) {
797       fail("There shouldn't be any exception but: " + e.toString());
798     }
799   }
800 
801   
802   @Test (timeout=300000)
803   public void testMovedWALDuringRecovery() throws Exception {
804     
805     
806     FileSystem spiedFs = Mockito.spy(fs);
807     
808     
809     Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
810         when(spiedFs).append(Mockito.<Path>any());
811     retryOverHdfsProblem(spiedFs);
812   }
813 
814   @Test (timeout=300000)
815   public void testRetryOpenDuringRecovery() throws Exception {
816     FileSystem spiedFs = Mockito.spy(fs);
817     
818     
819     
820     
821     
822     
823     
824     
825     
826     Mockito.doAnswer(new Answer<FSDataInputStream>() {
827       private final String[] errors = new String[] {
828         "Cannot obtain block length", "Could not obtain the last block",
829         "Blocklist for " + OLDLOGDIR + " has changed"};
830       private int count = 0;
831 
832       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
833             if (count < 3) {
834                 throw new IOException(errors[count++]);
835             }
836             return (FSDataInputStream)invocation.callRealMethod();
837         }
838     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
839     retryOverHdfsProblem(spiedFs);
840   }
841 
842   @Test (timeout=300000)
843   public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
844     generateWALs(1, 10, -1);
845     FileStatus logfile = fs.listStatus(WALDIR)[0];
846     useDifferentDFSClient();
847 
848     final AtomicInteger count = new AtomicInteger();
849 
850     CancelableProgressable localReporter
851       = new CancelableProgressable() {
852         @Override
853         public boolean progress() {
854           count.getAndIncrement();
855           return false;
856         }
857       };
858 
859     FileSystem spiedFs = Mockito.spy(fs);
860     Mockito.doAnswer(new Answer<FSDataInputStream>() {
861       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
862         Thread.sleep(1500); 
863         return (FSDataInputStream)invocation.callRealMethod();
864       }
865     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
866 
867     try {
868       conf.setInt("hbase.splitlog.report.period", 1000);
869       boolean ret = WALSplitter.splitLogFile(
870         HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals);
871       assertFalse("Log splitting should failed", ret);
872       assertTrue(count.get() > 0);
873     } catch (IOException e) {
874       fail("There shouldn't be any exception but: " + e.toString());
875     } finally {
876       
877       conf.setInt("hbase.splitlog.report.period", 59000);
878     }
879   }
880 
881   
882 
883 
884 
885   @Test (timeout=300000)
886   public void testThreading() throws Exception {
887     doTestThreading(20000, 128*1024*1024, 0);
888   }
889 
890   
891 
892 
893 
894   @Test (timeout=300000)
895   public void testThreadingSlowWriterSmallBuffer() throws Exception {
896     doTestThreading(200, 1024, 50);
897   }
898 
899   
900 
901 
902 
903 
904 
905 
906 
907 
908 
909 
910 
911   private void doTestThreading(final int numFakeEdits,
912       final int bufferSize,
913       final int writerSlowness) throws Exception {
914 
915     Configuration localConf = new Configuration(conf);
916     localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
917 
918     
919     Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
920     FSDataOutputStream out = fs.create(logPath);
921     out.close();
922 
923     
924     final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
925     makeRegionDirs(regions);
926 
927     
928     WALSplitter logSplitter = new WALSplitter(wals,
929         localConf, HBASEDIR, fs, null, null, this.mode) {
930 
931       
932       @Override
933       protected Writer createWriter(Path logfile) throws IOException {
934         Writer mockWriter = Mockito.mock(Writer.class);
935         Mockito.doAnswer(new Answer<Void>() {
936           int expectedIndex = 0;
937 
938           @Override
939           public Void answer(InvocationOnMock invocation) {
940             if (writerSlowness > 0) {
941               try {
942                 Thread.sleep(writerSlowness);
943               } catch (InterruptedException ie) {
944                 Thread.currentThread().interrupt();
945               }
946             }
947             Entry entry = (Entry) invocation.getArguments()[0];
948             WALEdit edit = entry.getEdit();
949             List<Cell> cells = edit.getCells();
950             assertEquals(1, cells.size());
951             Cell cell = cells.get(0);
952 
953             
954             assertEquals(expectedIndex, Bytes.toInt(cell.getRow()));
955             expectedIndex++;
956             return null;
957           }
958         }).when(mockWriter).append(Mockito.<Entry>any());
959         return mockWriter;
960       }
961 
962       
963       @Override
964       protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
965           throws IOException {
966         Reader mockReader = Mockito.mock(Reader.class);
967         Mockito.doAnswer(new Answer<Entry>() {
968           int index = 0;
969 
970           @Override
971           public Entry answer(InvocationOnMock invocation) throws Throwable {
972             if (index >= numFakeEdits) return null;
973 
974             
975             int regionIdx = index % regions.size();
976             byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
977 
978             Entry ret = createTestEntry(TABLE_NAME, region,
979                 Bytes.toBytes((int)(index / regions.size())),
980                 FAMILY, QUALIFIER, VALUE, index);
981             index++;
982             return ret;
983           }
984         }).when(mockReader).next();
985         return mockReader;
986       }
987     };
988 
989     logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
990 
991     
992     Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
993     for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
994       LOG.info("Got " + entry.getValue() + " output edits for region " +
995           Bytes.toString(entry.getKey()));
996       assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
997     }
998     assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
999   }
1000 
1001   
1002   @Test (timeout=300000)
1003   public void testSplitLogFileDeletedRegionDir() throws IOException {
1004     LOG.info("testSplitLogFileDeletedRegionDir");
1005     final String REGION = "region__1";
1006     REGIONS.clear();
1007     REGIONS.add(REGION);
1008 
1009     generateWALs(1, 10, -1);
1010     useDifferentDFSClient();
1011 
1012     Path regiondir = new Path(TABLEDIR, REGION);
1013     LOG.info("Region directory is" + regiondir);
1014     fs.delete(regiondir, true);
1015     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1016     assertFalse(fs.exists(regiondir));
1017   }
1018 
1019   @Test (timeout=300000)
1020   public void testSplitLogFileEmpty() throws IOException {
1021     LOG.info("testSplitLogFileEmpty");
1022     injectEmptyFile(".empty", true);
1023     useDifferentDFSClient();
1024 
1025     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1026     Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1027     assertFalse(fs.exists(tdir));
1028 
1029     assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1030   }
1031 
1032   @Test (timeout=300000)
1033   public void testSplitLogFileMultipleRegions() throws IOException {
1034     LOG.info("testSplitLogFileMultipleRegions");
1035     generateWALs(1, 10, -1);
1036     splitAndCount(1, 10);
1037   }
1038 
1039   @Test (timeout=300000)
1040   public void testSplitLogFileFirstLineCorruptionLog()
1041   throws IOException {
1042     conf.setBoolean(HBASE_SKIP_ERRORS, true);
1043     generateWALs(1, 10, -1);
1044     FileStatus logfile = fs.listStatus(WALDIR)[0];
1045 
1046     corruptWAL(logfile.getPath(),
1047         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1048 
1049     useDifferentDFSClient();
1050     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1051 
1052     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
1053         "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
1054     assertEquals(1, fs.listStatus(corruptDir).length);
1055   }
1056 
1057   
1058 
1059 
1060 
1061   @Test (timeout=300000)
1062   public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1063     LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1064     
1065     String regionName = "r0";
1066     final Path regiondir = new Path(TABLEDIR, regionName);
1067     REGIONS.clear();
1068     REGIONS.add(regionName);
1069     generateWALs(-1);
1070 
1071     wals.getWAL(Bytes.toBytes(regionName));
1072     FileStatus[] logfiles = fs.listStatus(WALDIR);
1073     assertTrue("There should be some log file",
1074       logfiles != null && logfiles.length > 0);
1075 
1076     WALSplitter logSplitter = new WALSplitter(wals,
1077         conf, HBASEDIR, fs, null, null, this.mode) {
1078       @Override
1079       protected Writer createWriter(Path logfile)
1080       throws IOException {
1081         Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
1082         
1083         
1084         
1085         NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
1086         if (files != null && !files.isEmpty()) {
1087           for (Path file : files) {
1088             if (!this.fs.delete(file, false)) {
1089               LOG.error("Failed delete of " + file);
1090             } else {
1091               LOG.debug("Deleted recovered.edits file=" + file);
1092             }
1093           }
1094         }
1095         return writer;
1096       }
1097     };
1098     try{
1099       logSplitter.splitLogFile(logfiles[0], null);
1100     } catch (IOException e) {
1101       LOG.info(e);
1102       fail("Throws IOException when spliting "
1103           + "log, it is most likely because writing file does not "
1104           + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1105     }
1106     if (fs.exists(CORRUPTDIR)) {
1107       if (fs.listStatus(CORRUPTDIR).length > 0) {
1108         fail("There are some corrupt logs, "
1109                 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1110       }
1111     }
1112   }
1113 
1114   private Writer generateWALs(int leaveOpen) throws IOException {
1115     return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen);
1116   }
1117 
1118   private void makeRegionDirs(List<String> regions) throws IOException {
1119     for (String region : regions) {
1120       LOG.debug("Creating dir for region " + region);
1121       fs.mkdirs(new Path(TABLEDIR, region));
1122     }
1123   }
1124 
1125   
1126 
1127 
1128 
1129   private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1130     makeRegionDirs(REGIONS);
1131     fs.mkdirs(WALDIR);
1132     Writer [] ws = new Writer[writers];
1133     int seq = 0;
1134     for (int i = 0; i < writers; i++) {
1135       ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1136       for (int j = 0; j < entries; j++) {
1137         int prefix = 0;
1138         for (String region : REGIONS) {
1139           String row_key = region + prefix++ + i + j;
1140           appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
1141               VALUE, seq++);
1142         }
1143       }
1144       if (i != leaveOpen) {
1145         ws[i].close();
1146         LOG.info("Closing writer " + i);
1147       }
1148     }
1149     if (leaveOpen < 0 || leaveOpen >= writers) {
1150       return null;
1151     }
1152     return ws[leaveOpen];
1153   }
1154 
1155   private Path[] getLogForRegion(Path rootdir, TableName table, String region)
1156   throws IOException {
1157     Path tdir = FSUtils.getTableDir(rootdir, table);
1158     @SuppressWarnings("deprecation")
1159     Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1160       Bytes.toString(region.getBytes())));
1161     FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1162       @Override
1163       public boolean accept(Path p) {
1164         if (WALSplitter.isSequenceIdFile(p)) {
1165           return false;
1166         }
1167         return true;
1168       }
1169     });
1170     Path[] paths = new Path[files.length];
1171     for (int i = 0; i < files.length; i++) {
1172       paths[i] = files[i].getPath();
1173     }
1174     return paths;
1175   }
1176 
1177   private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1178     FSDataOutputStream out;
1179     int fileSize = (int) fs.listStatus(path)[0].getLen();
1180 
1181     FSDataInputStream in = fs.open(path);
1182     byte[] corrupted_bytes = new byte[fileSize];
1183     in.readFully(0, corrupted_bytes, 0, fileSize);
1184     in.close();
1185 
1186     switch (corruption) {
1187       case APPEND_GARBAGE:
1188         fs.delete(path, false);
1189         out = fs.create(path);
1190         out.write(corrupted_bytes);
1191         out.write("-----".getBytes());
1192         closeOrFlush(close, out);
1193         break;
1194 
1195       case INSERT_GARBAGE_ON_FIRST_LINE:
1196         fs.delete(path, false);
1197         out = fs.create(path);
1198         out.write(0);
1199         out.write(corrupted_bytes);
1200         closeOrFlush(close, out);
1201         break;
1202 
1203       case INSERT_GARBAGE_IN_THE_MIDDLE:
1204         fs.delete(path, false);
1205         out = fs.create(path);
1206         int middle = (int) Math.floor(corrupted_bytes.length / 2);
1207         out.write(corrupted_bytes, 0, middle);
1208         out.write(0);
1209         out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1210         closeOrFlush(close, out);
1211         break;
1212 
1213       case TRUNCATE:
1214         fs.delete(path, false);
1215         out = fs.create(path);
1216         out.write(corrupted_bytes, 0, fileSize
1217           - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1218         closeOrFlush(close, out);
1219         break;
1220 
1221       case TRUNCATE_TRAILER:
1222         fs.delete(path, false);
1223         out = fs.create(path);
1224         out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);
1225         closeOrFlush(close, out);
1226         break;
1227     }
1228   }
1229 
1230   private void closeOrFlush(boolean close, FSDataOutputStream out)
1231   throws IOException {
1232     if (close) {
1233       out.close();
1234     } else {
1235       Method syncMethod = null;
1236       try {
1237         syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1238       } catch (NoSuchMethodException e) {
1239         try {
1240           syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1241         } catch (NoSuchMethodException ex) {
1242           throw new IOException("This version of Hadoop supports " +
1243               "neither Syncable.sync() nor Syncable.hflush().");
1244         }
1245       }
1246       try {
1247         syncMethod.invoke(out, new Object[]{});
1248       } catch (Exception e) {
1249         throw new IOException(e);
1250       }
1251       
1252     }
1253   }
1254 
1255   private int countWAL(Path log) throws IOException {
1256     int count = 0;
1257     Reader in = wals.createReader(fs, log);
1258     while (in.next() != null) {
1259       count++;
1260     }
1261     in.close();
1262     return count;
1263   }
1264 
1265   public static long appendEntry(Writer writer, TableName table, byte[] region,
1266                           byte[] row, byte[] family, byte[] qualifier,
1267                           byte[] value, long seq)
1268           throws IOException {
1269     LOG.info(Thread.currentThread().getName() + " append");
1270     writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1271     LOG.info(Thread.currentThread().getName() + " sync");
1272     writer.sync();
1273     return seq;
1274   }
1275 
1276   private static Entry createTestEntry(
1277       TableName table, byte[] region,
1278       byte[] row, byte[] family, byte[] qualifier,
1279       byte[] value, long seq) {
1280     long time = System.nanoTime();
1281     WALEdit edit = new WALEdit();
1282     seq++;
1283     edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1284     return new Entry(new WALKey(region, table, seq, time,
1285         HConstants.DEFAULT_CLUSTER_ID), edit);
1286   }
1287 
1288   private void injectEmptyFile(String suffix, boolean closeFile)
1289           throws IOException {
1290     Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix),
1291         conf);
1292     if (closeFile) writer.close();
1293   }
1294 
1295   private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1296     Reader in1, in2;
1297     in1 = wals.createReader(fs, p1);
1298     in2 = wals.createReader(fs, p2);
1299     Entry entry1;
1300     Entry entry2;
1301     while ((entry1 = in1.next()) != null) {
1302       entry2 = in2.next();
1303       if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1304               (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1305         return false;
1306       }
1307     }
1308     in1.close();
1309     in2.close();
1310     return true;
1311   }
1312 }