1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableSet;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  import java.util.UUID;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.CompletionService;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.ExecutorCompletionService;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.ThreadFactory;
44  import java.util.concurrent.ThreadPoolExecutor;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.concurrent.atomic.AtomicLong;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.regex.Matcher;
50  import java.util.regex.Pattern;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.fs.FileAlreadyExistsException;
56  import org.apache.hadoop.fs.FileStatus;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.fs.PathFilter;
60  import org.apache.hadoop.hbase.Cell;
61  import org.apache.hadoop.hbase.CellScanner;
62  import org.apache.hadoop.hbase.CellUtil;
63  import org.apache.hadoop.hbase.CoordinatedStateException;
64  import org.apache.hadoop.hbase.CoordinatedStateManager;
65  import org.apache.hadoop.hbase.HBaseConfiguration;
66  import org.apache.hadoop.hbase.HConstants;
67  import org.apache.hadoop.hbase.HRegionInfo;
68  import org.apache.hadoop.hbase.HRegionLocation;
69  import org.apache.hadoop.hbase.RemoteExceptionHandler;
70  import org.apache.hadoop.hbase.ServerName;
71  import org.apache.hadoop.hbase.TableName;
72  import org.apache.hadoop.hbase.TableNotFoundException;
73  import org.apache.hadoop.hbase.TableStateManager;
74  import org.apache.hadoop.hbase.classification.InterfaceAudience;
75  import org.apache.hadoop.hbase.client.ConnectionUtils;
76  import org.apache.hadoop.hbase.client.Delete;
77  import org.apache.hadoop.hbase.client.Durability;
78  import org.apache.hadoop.hbase.client.HConnection;
79  import org.apache.hadoop.hbase.client.HConnectionManager;
80  import org.apache.hadoop.hbase.client.Mutation;
81  import org.apache.hadoop.hbase.client.Put;
82  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
83  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
84  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
85  import org.apache.hadoop.hbase.io.HeapSize;
86  import org.apache.hadoop.hbase.master.SplitLogManager;
87  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
88  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
89  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90  import org.apache.hadoop.hbase.protobuf.RequestConverter;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
92  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
94  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
96  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
97  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
98  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
99  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
100 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
101 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
102 import org.apache.hadoop.hbase.regionserver.HRegion;
103 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
104 
105 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
106 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
107 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
110 import org.apache.hadoop.hbase.util.Bytes;
111 import org.apache.hadoop.hbase.util.CancelableProgressable;
112 import org.apache.hadoop.hbase.util.ClassSize;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.FSUtils;
115 import org.apache.hadoop.hbase.util.Pair;
116 import org.apache.hadoop.hbase.util.Threads;
117 import org.apache.hadoop.hbase.wal.WAL.Entry;
118 import org.apache.hadoop.hbase.wal.WAL.Reader;
119 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
120 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
121 import org.apache.hadoop.io.MultipleIOException;
122 
123 import com.google.common.annotations.VisibleForTesting;
124 import com.google.common.base.Preconditions;
125 import com.google.common.collect.Lists;
126 import com.google.protobuf.ServiceException;
127 
128 
129 
130 
131 
132 
133 @InterfaceAudience.Private
134 public class WALSplitter {
135   static final Log LOG = LogFactory.getLog(WALSplitter.class);
136 
137   
138   public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
139 
140   
141   protected final Path rootDir;
142   protected final FileSystem fs;
143   protected final Configuration conf;
144 
145   
146   
147   PipelineController controller;
148   OutputSink outputSink;
149   EntryBuffers entryBuffers;
150 
151   private Set<TableName> disablingOrDisabledTables =
152       new HashSet<TableName>();
153   private BaseCoordinatedStateManager csm;
154   private final WALFactory walFactory;
155 
156   private MonitoredTask status;
157 
158   
159   protected final LastSequenceId sequenceIdChecker;
160 
161   protected boolean distributedLogReplay;
162 
163   
164   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
165 
166   
167   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
168       new ConcurrentHashMap<String, Map<byte[], Long>>();
169 
170   
171   protected String failedServerName = "";
172 
173   
174   private final int numWriterThreads;
175 
176   
177   private final int minBatchSize;
178 
179   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
180       FileSystem fs, LastSequenceId idChecker,
181       CoordinatedStateManager csm, RecoveryMode mode) {
182     this.conf = HBaseConfiguration.create(conf);
183     String codecClassName = conf
184         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
185     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
186     this.rootDir = rootDir;
187     this.fs = fs;
188     this.sequenceIdChecker = idChecker;
189     this.csm = (BaseCoordinatedStateManager)csm;
190     this.walFactory = factory;
191     this.controller = new PipelineController();
192 
193     entryBuffers = new EntryBuffers(controller,
194         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
195             128*1024*1024));
196 
197     
198     
199     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
200     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
201 
202     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
203     if (csm != null && this.distributedLogReplay) {
204       outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
205     } else {
206       if (this.distributedLogReplay) {
207         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
208       }
209       this.distributedLogReplay = false;
210       outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
211     }
212 
213   }
214 
215   
216 
217 
218 
219 
220 
221 
222 
223 
224 
225 
226 
227 
228 
229 
230 
231   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
232       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
233       CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
234     WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
235     return s.splitLogFile(logfile, reporter);
236   }
237 
238   
239   
240   
241   
242   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
243       FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
244     final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
245         Collections.singletonList(logDir), null);
246     List<Path> splits = new ArrayList<Path>();
247     if (logfiles != null && logfiles.length > 0) {
248       for (FileStatus logfile: logfiles) {
249         WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
250             RecoveryMode.LOG_SPLITTING);
251         if (s.splitLogFile(logfile, null)) {
252           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
253           if (s.outputSink.splits != null) {
254             splits.addAll(s.outputSink.splits);
255           }
256         }
257       }
258     }
259     if (!fs.delete(logDir, true)) {
260       throw new IOException("Unable to delete src dir: " + logDir);
261     }
262     return splits;
263   }
264 
265   
266 
267 
268 
269   boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
270     Preconditions.checkState(status == null);
271     Preconditions.checkArgument(logfile.isFile(),
272         "passed in file status is for something other than a regular file.");
273     boolean isCorrupted = false;
274     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
275       SPLIT_SKIP_ERRORS_DEFAULT);
276     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
277     Path logPath = logfile.getPath();
278     boolean outputSinkStarted = false;
279     boolean progress_failed = false;
280     int editsCount = 0;
281     int editsSkipped = 0;
282 
283     status =
284         TaskMonitor.get().createStatus(
285           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
286     Reader in = null;
287     try {
288       long logLength = logfile.getLen();
289       LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
290       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
291       status.setStatus("Opening log file");
292       if (reporter != null && !reporter.progress()) {
293         progress_failed = true;
294         return false;
295       }
296       try {
297         in = getReader(logfile, skipErrors, reporter);
298       } catch (CorruptedLogFileException e) {
299         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
300         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
301         isCorrupted = true;
302       }
303       if (in == null) {
304         LOG.warn("Nothing to split in log file " + logPath);
305         return true;
306       }
307       if (csm != null) {
308         try {
309           TableStateManager tsm = csm.getTableStateManager();
310           disablingOrDisabledTables = tsm.getTablesInStates(
311           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
312         } catch (CoordinatedStateException e) {
313           throw new IOException("Can't get disabling/disabled tables", e);
314         }
315       }
316       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
317       int numOpenedFilesLastCheck = 0;
318       outputSink.setReporter(reporter);
319       outputSink.startWriterThreads();
320       outputSinkStarted = true;
321       Entry entry;
322       Long lastFlushedSequenceId = -1L;
323       ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
324       failedServerName = (serverName == null) ? "" : serverName.getServerName();
325       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
326         byte[] region = entry.getKey().getEncodedRegionName();
327         String key = Bytes.toString(region);
328         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
329         if (lastFlushedSequenceId == null) {
330           if (this.distributedLogReplay) {
331             RegionStoreSequenceIds ids =
332                 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
333                   key);
334             if (ids != null) {
335               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
336             }
337           } else if (sequenceIdChecker != null) {
338             RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
339             Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
340             for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
341               maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
342                 storeSeqId.getSequenceId());
343             }
344             regionMaxSeqIdInStores.put(key, maxSeqIdInStores);
345             lastFlushedSequenceId = ids.getLastFlushedSequenceId();
346           }
347           if (lastFlushedSequenceId == null) {
348             lastFlushedSequenceId = -1L;
349           }
350           lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
351         }
352         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
353           editsSkipped++;
354           continue;
355         }
356         entryBuffers.appendEntry(entry);
357         editsCount++;
358         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
359         
360         if (editsCount % interval == 0
361             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
362           numOpenedFilesLastCheck = this.getNumOpenWriters();
363           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
364               + " edits, skipped " + editsSkipped + " edits.";
365           status.setStatus("Split " + countsStr);
366           if (reporter != null && !reporter.progress()) {
367             progress_failed = true;
368             return false;
369           }
370         }
371       }
372     } catch (InterruptedException ie) {
373       IOException iie = new InterruptedIOException();
374       iie.initCause(ie);
375       throw iie;
376     } catch (CorruptedLogFileException e) {
377       LOG.warn("Could not parse, corrupted log file " + logPath, e);
378       csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
379         logfile.getPath().getName(), fs);
380       isCorrupted = true;
381     } catch (IOException e) {
382       e = RemoteExceptionHandler.checkIOException(e);
383       throw e;
384     } finally {
385       LOG.debug("Finishing writing output logs and closing down.");
386       try {
387         if (null != in) {
388           in.close();
389         }
390       } catch (IOException exception) {
391         LOG.warn("Could not close wal reader: " + exception.getMessage());
392         LOG.debug("exception details", exception);
393       }
394       try {
395         if (outputSinkStarted) {
396           
397           
398           progress_failed = true;
399           progress_failed = outputSink.finishWritingAndClose() == null;
400         }
401       } finally {
402         String msg =
403             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
404                 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
405                 ", length=" + logfile.getLen() + 
406                 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
407         LOG.info(msg);
408         status.markComplete(msg);
409       }
410     }
411     return !progress_failed;
412   }
413 
414   
415 
416 
417 
418 
419 
420 
421 
422 
423 
424 
425   public static void finishSplitLogFile(String logfile,
426       Configuration conf)  throws IOException {
427     Path rootdir = FSUtils.getRootDir(conf);
428     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
429     Path logPath;
430     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
431       logPath = new Path(logfile);
432     } else {
433       logPath = new Path(rootdir, logfile);
434     }
435     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
436   }
437 
438   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
439       Path logPath, Configuration conf) throws IOException {
440     List<Path> processedLogs = new ArrayList<Path>();
441     List<Path> corruptedLogs = new ArrayList<Path>();
442     FileSystem fs;
443     fs = rootdir.getFileSystem(conf);
444     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
445       corruptedLogs.add(logPath);
446     } else {
447       processedLogs.add(logPath);
448     }
449     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
450     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
451     fs.delete(stagingDir, true);
452   }
453 
454   
455 
456 
457 
458 
459 
460 
461 
462 
463 
464 
465 
466   private static void archiveLogs(
467       final List<Path> corruptedLogs,
468       final List<Path> processedLogs, final Path oldLogDir,
469       final FileSystem fs, final Configuration conf) throws IOException {
470     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
471         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
472 
473     if (!fs.mkdirs(corruptDir)) {
474       LOG.info("Unable to mkdir " + corruptDir);
475     }
476     fs.mkdirs(oldLogDir);
477 
478     
479     
480     for (Path corrupted : corruptedLogs) {
481       Path p = new Path(corruptDir, corrupted.getName());
482       if (fs.exists(corrupted)) {
483         if (!fs.rename(corrupted, p)) {
484           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
485         } else {
486           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
487         }
488       }
489     }
490 
491     for (Path p : processedLogs) {
492       Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
493       if (fs.exists(p)) {
494         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
495           LOG.warn("Unable to move  " + p + " to " + newPath);
496         } else {
497           LOG.info("Archived processed log " + p + " to " + newPath);
498         }
499       }
500     }
501   }
502 
503   
504 
505 
506 
507 
508 
509 
510 
511 
512 
513 
514 
515   @SuppressWarnings("deprecation")
516   static Path getRegionSplitEditsPath(final FileSystem fs,
517       final Entry logEntry, final Path rootDir, boolean isCreate)
518   throws IOException {
519     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
520     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
521     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
522     Path dir = getRegionDirRecoveredEditsDir(regiondir);
523 
524     if (!fs.exists(regiondir)) {
525       LOG.info("This region's directory doesn't exist: "
526           + regiondir.toString() + ". It is very likely that it was" +
527           " already split so it's safe to discard those edits.");
528       return null;
529     }
530     if (fs.exists(dir) && fs.isFile(dir)) {
531       Path tmp = new Path("/tmp");
532       if (!fs.exists(tmp)) {
533         fs.mkdirs(tmp);
534       }
535       tmp = new Path(tmp,
536         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
537       LOG.warn("Found existing old file: " + dir + ". It could be some "
538         + "leftover of an old installation. It should be a folder instead. "
539         + "So moving it to " + tmp);
540       if (!fs.rename(dir, tmp)) {
541         LOG.warn("Failed to sideline old file " + dir);
542       }
543     }
544 
545     if (isCreate && !fs.exists(dir)) {
546       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
547     }
548     
549     
550     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
551     fileName = getTmpRecoveredEditsFileName(fileName);
552     return new Path(dir, fileName);
553   }
554 
555   static String getTmpRecoveredEditsFileName(String fileName) {
556     return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
557   }
558 
559   
560 
561 
562 
563 
564 
565 
566 
567   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
568       Long maximumEditLogSeqNum) {
569     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
570     return new Path(srcPath.getParent(), fileName);
571   }
572 
573   static String formatRecoveredEditsFileName(final long seqid) {
574     return String.format("%019d", seqid);
575   }
576 
577   private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
578   private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
579 
580   
581 
582 
583 
584 
585 
586   public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
587     return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
588   }
589 
590   
591 
592 
593 
594 
595 
596 
597 
598 
599   public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
600       final Path regiondir) throws IOException {
601     NavigableSet<Path> filesSorted = new TreeSet<Path>();
602     Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
603     if (!fs.exists(editsdir))
604       return filesSorted;
605     FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
606       @Override
607       public boolean accept(Path p) {
608         boolean result = false;
609         try {
610           
611           
612           
613           
614           Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
615           result = fs.isFile(p) && m.matches();
616           
617           
618           if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
619             result = false;
620           }
621           
622           if (isSequenceIdFile(p)) {
623             result = false;
624           }
625         } catch (IOException e) {
626           LOG.warn("Failed isFile check on " + p);
627         }
628         return result;
629       }
630     });
631     if (files == null) {
632       return filesSorted;
633     }
634     for (FileStatus status : files) {
635       filesSorted.add(status.getPath());
636     }
637     return filesSorted;
638   }
639 
640   
641 
642 
643 
644 
645 
646 
647 
648 
649   public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
650       throws IOException {
651     Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
652         + System.currentTimeMillis());
653     if (!fs.rename(edits, moveAsideName)) {
654       LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
655     }
656     return moveAsideName;
657   }
658 
659   private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
660   private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
661   private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
662 
663   
664 
665 
666   @VisibleForTesting
667   public static boolean isSequenceIdFile(final Path file) {
668     return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
669         || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
670   }
671 
672   
673 
674 
675 
676 
677 
678 
679 
680 
681   public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
682       long newSeqId, long saftyBumper) throws IOException {
683 
684     Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
685     long maxSeqId = 0;
686     FileStatus[] files = null;
687     if (fs.exists(editsdir)) {
688       files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
689         @Override
690         public boolean accept(Path p) {
691           return isSequenceIdFile(p);
692         }
693       });
694       if (files != null) {
695         for (FileStatus status : files) {
696           String fileName = status.getPath().getName();
697           try {
698             Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
699                 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
700             maxSeqId = Math.max(tmpSeqId, maxSeqId);
701           } catch (NumberFormatException ex) {
702             LOG.warn("Invalid SeqId File Name=" + fileName);
703           }
704         }
705       }
706     }
707     if (maxSeqId > newSeqId) {
708       newSeqId = maxSeqId;
709     }
710     newSeqId += saftyBumper; 
711 
712     
713     Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
714     if (newSeqId != maxSeqId) {
715       try {
716         if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
717           throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
718         }
719         if (LOG.isDebugEnabled()) {
720           LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
721               + ", maxSeqId=" + maxSeqId);
722         }
723       } catch (FileAlreadyExistsException ignored) {
724         
725       }
726     }
727     
728     if (files != null) {
729       for (FileStatus status : files) {
730         if (newSeqIdFile.equals(status.getPath())) {
731           continue;
732         }
733         fs.delete(status.getPath(), false);
734       }
735     }
736     return newSeqId;
737   }
738 
739   
740 
741 
742 
743 
744 
745 
746 
747   protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
748       throws IOException, CorruptedLogFileException {
749     Path path = file.getPath();
750     long length = file.getLen();
751     Reader in;
752 
753     
754     
755     
756     if (length <= 0) {
757       LOG.warn("File " + path + " might be still open, length is 0");
758     }
759 
760     try {
761       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
762       try {
763         in = getReader(path, reporter);
764       } catch (EOFException e) {
765         if (length <= 0) {
766           
767           
768           
769           
770           
771           LOG.warn("Could not open " + path + " for reading. File is empty", e);
772           return null;
773         } else {
774           
775           return null;
776         }
777       }
778     } catch (IOException e) {
779       if (e instanceof FileNotFoundException) {
780         
781         LOG.warn("File " + path + " doesn't exist anymore.", e);
782         return null;
783       }
784       if (!skipErrors || e instanceof InterruptedIOException) {
785         throw e; 
786       }
787       CorruptedLogFileException t =
788         new CorruptedLogFileException("skipErrors=true Could not open wal " +
789             path + " ignoring");
790       t.initCause(e);
791       throw t;
792     }
793     return in;
794   }
795 
796   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
797   throws CorruptedLogFileException, IOException {
798     try {
799       return in.next();
800     } catch (EOFException eof) {
801       
802       LOG.info("EOF from wal " + path + ".  continuing");
803       return null;
804     } catch (IOException e) {
805       
806       
807       if (e.getCause() != null &&
808           (e.getCause() instanceof ParseException ||
809            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
810         LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
811            + path + ".  continuing");
812         return null;
813       }
814       if (!skipErrors) {
815         throw e;
816       }
817       CorruptedLogFileException t =
818         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
819             " while parsing wal " + path + ". Marking as corrupted");
820       t.initCause(e);
821       throw t;
822     }
823   }
824 
825   
826 
827 
828 
829   protected Writer createWriter(Path logfile)
830       throws IOException {
831     return walFactory.createRecoveredEditsWriter(fs, logfile);
832   }
833 
834   
835 
836 
837 
838   protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
839     return walFactory.createReader(fs, curLogFile, reporter);
840   }
841 
842   
843 
844 
845   private int getNumOpenWriters() {
846     int result = 0;
847     if (this.outputSink != null) {
848       result += this.outputSink.getNumOpenWriters();
849     }
850     return result;
851   }
852 
853   
854 
855 
856   public static class PipelineController {
857     
858     
859     AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
860 
861     
862     
863     public final Object dataAvailable = new Object();
864 
865     void writerThreadError(Throwable t) {
866       thrown.compareAndSet(null, t);
867     }
868 
869     
870 
871 
872     void checkForErrors() throws IOException {
873       Throwable thrown = this.thrown.get();
874       if (thrown == null) return;
875       if (thrown instanceof IOException) {
876         throw new IOException(thrown);
877       } else {
878         throw new RuntimeException(thrown);
879       }
880     }
881   }
882 
883   
884 
885 
886 
887 
888 
889 
890   public static class EntryBuffers {
891     PipelineController controller;
892 
893     Map<byte[], RegionEntryBuffer> buffers =
894       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
895 
896     
897 
898 
899     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
900 
901     long totalBuffered = 0;
902     long maxHeapUsage;
903 
904     public EntryBuffers(PipelineController controller, long maxHeapUsage) {
905       this.controller = controller;
906       this.maxHeapUsage = maxHeapUsage;
907     }
908 
909     
910 
911 
912 
913 
914 
915 
916     public void appendEntry(Entry entry) throws InterruptedException, IOException {
917       WALKey key = entry.getKey();
918 
919       RegionEntryBuffer buffer;
920       long incrHeap;
921       synchronized (this) {
922         buffer = buffers.get(key.getEncodedRegionName());
923         if (buffer == null) {
924           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
925           buffers.put(key.getEncodedRegionName(), buffer);
926         }
927         incrHeap= buffer.appendEntry(entry);
928       }
929 
930       
931       synchronized (controller.dataAvailable) {
932         totalBuffered += incrHeap;
933         while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
934           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
935           controller.dataAvailable.wait(2000);
936         }
937         controller.dataAvailable.notifyAll();
938       }
939       controller.checkForErrors();
940     }
941 
942     
943 
944 
945     synchronized RegionEntryBuffer getChunkToWrite() {
946       long biggestSize = 0;
947       byte[] biggestBufferKey = null;
948 
949       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
950         long size = entry.getValue().heapSize();
951         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
952           biggestSize = size;
953           biggestBufferKey = entry.getKey();
954         }
955       }
956       if (biggestBufferKey == null) {
957         return null;
958       }
959 
960       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
961       currentlyWriting.add(biggestBufferKey);
962       return buffer;
963     }
964 
965     void doneWriting(RegionEntryBuffer buffer) {
966       synchronized (this) {
967         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
968         assert removed;
969       }
970       long size = buffer.heapSize();
971 
972       synchronized (controller.dataAvailable) {
973         totalBuffered -= size;
974         
975         controller.dataAvailable.notifyAll();
976       }
977     }
978 
979     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
980       return currentlyWriting.contains(region);
981     }
982 
983     public void waitUntilDrained() {
984       synchronized (controller.dataAvailable) {
985         while (totalBuffered > 0) {
986           try {
987             controller.dataAvailable.wait(2000);
988           } catch (InterruptedException e) {
989             LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
990             Thread.interrupted();
991             break;
992           }
993         }
994       }
995     }
996   }
997 
998   
999 
1000 
1001 
1002 
1003 
1004   public static class RegionEntryBuffer implements HeapSize {
1005     long heapInBuffer = 0;
1006     List<Entry> entryBuffer;
1007     TableName tableName;
1008     byte[] encodedRegionName;
1009 
1010     RegionEntryBuffer(TableName tableName, byte[] region) {
1011       this.tableName = tableName;
1012       this.encodedRegionName = region;
1013       this.entryBuffer = new LinkedList<Entry>();
1014     }
1015 
1016     long appendEntry(Entry entry) {
1017       internify(entry);
1018       entryBuffer.add(entry);
1019       long incrHeap = entry.getEdit().heapSize() +
1020         ClassSize.align(2 * ClassSize.REFERENCE) + 
1021         0; 
1022       heapInBuffer += incrHeap;
1023       return incrHeap;
1024     }
1025 
1026     private void internify(Entry entry) {
1027       WALKey k = entry.getKey();
1028       k.internTableName(this.tableName);
1029       k.internEncodedRegionName(this.encodedRegionName);
1030     }
1031 
1032     @Override
1033     public long heapSize() {
1034       return heapInBuffer;
1035     }
1036 
1037     public byte[] getEncodedRegionName() {
1038       return encodedRegionName;
1039     }
1040 
1041     public List<Entry> getEntryBuffer() {
1042       return entryBuffer;
1043     }
1044 
1045     public TableName getTableName() {
1046       return tableName;
1047     }
1048   }
1049 
1050   public static class WriterThread extends Thread {
1051     private volatile boolean shouldStop = false;
1052     private PipelineController controller;
1053     private EntryBuffers entryBuffers;
1054     private OutputSink outputSink = null;
1055 
1056     WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1057       super(Thread.currentThread().getName() + "-Writer-" + i);
1058       this.controller = controller;
1059       this.entryBuffers = entryBuffers;
1060       outputSink = sink;
1061     }
1062 
1063     @Override
1064     public void run()  {
1065       try {
1066         doRun();
1067       } catch (Throwable t) {
1068         LOG.error("Exiting thread", t);
1069         controller.writerThreadError(t);
1070       }
1071     }
1072 
1073     private void doRun() throws IOException {
1074       LOG.debug("Writer thread " + this + ": starting");
1075       while (true) {
1076         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1077         if (buffer == null) {
1078           
1079           synchronized (controller.dataAvailable) {
1080             if (shouldStop && !this.outputSink.flush()) {
1081               return;
1082             }
1083             try {
1084               controller.dataAvailable.wait(500);
1085             } catch (InterruptedException ie) {
1086               if (!shouldStop) {
1087                 throw new RuntimeException(ie);
1088               }
1089             }
1090           }
1091           continue;
1092         }
1093 
1094         assert buffer != null;
1095         try {
1096           writeBuffer(buffer);
1097         } finally {
1098           entryBuffers.doneWriting(buffer);
1099         }
1100       }
1101     }
1102 
1103     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1104       outputSink.append(buffer);
1105     }
1106 
1107     void finish() {
1108       synchronized (controller.dataAvailable) {
1109         shouldStop = true;
1110         controller.dataAvailable.notifyAll();
1111       }
1112     }
1113   }
1114 
1115   
1116 
1117 
1118 
1119   public static abstract class OutputSink {
1120 
1121     protected PipelineController controller;
1122     protected EntryBuffers entryBuffers;
1123 
1124     protected Map<byte[], SinkWriter> writers = Collections
1125         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1126 
1127     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1128         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1129 
1130     protected final List<WriterThread> writerThreads = Lists.newArrayList();
1131 
1132     
1133     protected final Set<byte[]> blacklistedRegions = Collections
1134         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1135 
1136     protected boolean closeAndCleanCompleted = false;
1137 
1138     protected boolean writersClosed = false;
1139 
1140     protected final int numThreads;
1141 
1142     protected CancelableProgressable reporter = null;
1143 
1144     protected AtomicLong skippedEdits = new AtomicLong();
1145 
1146     protected List<Path> splits = null;
1147 
1148     public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1149       numThreads = numWriters;
1150       this.controller = controller;
1151       this.entryBuffers = entryBuffers;
1152     }
1153 
1154     void setReporter(CancelableProgressable reporter) {
1155       this.reporter = reporter;
1156     }
1157 
1158     
1159 
1160 
1161     public synchronized void startWriterThreads() {
1162       for (int i = 0; i < numThreads; i++) {
1163         WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1164         t.start();
1165         writerThreads.add(t);
1166       }
1167     }
1168 
1169     
1170 
1171 
1172 
1173     void updateRegionMaximumEditLogSeqNum(Entry entry) {
1174       synchronized (regionMaximumEditLogSeqNum) {
1175         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1176             .getEncodedRegionName());
1177         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1178           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1179               .getLogSeqNum());
1180         }
1181       }
1182     }
1183 
1184     Long getRegionMaximumEditLogSeqNum(byte[] region) {
1185       return regionMaximumEditLogSeqNum.get(region);
1186     }
1187 
1188     
1189 
1190 
1191     int getNumOpenWriters() {
1192       return this.writers.size();
1193     }
1194 
1195     long getSkippedEdits() {
1196       return this.skippedEdits.get();
1197     }
1198 
1199     
1200 
1201 
1202 
1203 
1204     protected boolean finishWriting(boolean interrupt) throws IOException {
1205       LOG.debug("Waiting for split writer threads to finish");
1206       boolean progress_failed = false;
1207       for (WriterThread t : writerThreads) {
1208         t.finish();
1209       }
1210       if (interrupt) {
1211         for (WriterThread t : writerThreads) {
1212           t.interrupt(); 
1213         }
1214       }
1215 
1216       for (WriterThread t : writerThreads) {
1217         if (!progress_failed && reporter != null && !reporter.progress()) {
1218           progress_failed = true;
1219         }
1220         try {
1221           t.join();
1222         } catch (InterruptedException ie) {
1223           IOException iie = new InterruptedIOException();
1224           iie.initCause(ie);
1225           throw iie;
1226         }
1227       }
1228       controller.checkForErrors();
1229       LOG.info("Split writers finished");
1230       return (!progress_failed);
1231     }
1232 
1233     public abstract List<Path> finishWritingAndClose() throws IOException;
1234 
1235     
1236 
1237 
1238     public abstract Map<byte[], Long> getOutputCounts();
1239 
1240     
1241 
1242 
1243     public abstract int getNumberOfRecoveredRegions();
1244 
1245     
1246 
1247 
1248 
1249     public abstract void append(RegionEntryBuffer buffer) throws IOException;
1250 
1251     
1252 
1253 
1254 
1255     public boolean flush() throws IOException {
1256       return false;
1257     }
1258   }
1259 
1260   
1261 
1262 
1263   class LogRecoveredEditsOutputSink extends OutputSink {
1264 
1265     public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1266         int numWriters) {
1267       
1268       
1269       
1270       
1271       
1272       super(controller, entryBuffers, numWriters);
1273     }
1274 
1275     
1276 
1277 
1278 
1279     @Override
1280     public List<Path> finishWritingAndClose() throws IOException {
1281       boolean isSuccessful = false;
1282       List<Path> result = null;
1283       try {
1284         isSuccessful = finishWriting(false);
1285       } finally {
1286         result = close();
1287         List<IOException> thrown = closeLogWriters(null);
1288         if (thrown != null && !thrown.isEmpty()) {
1289           throw MultipleIOException.createIOException(thrown);
1290         }
1291       }
1292       if (isSuccessful) {
1293         splits = result;
1294       }
1295       return splits;
1296     }
1297 
1298     
1299 
1300 
1301 
1302     private List<Path> close() throws IOException {
1303       Preconditions.checkState(!closeAndCleanCompleted);
1304 
1305       final List<Path> paths = new ArrayList<Path>();
1306       final List<IOException> thrown = Lists.newArrayList();
1307       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1308         TimeUnit.SECONDS, new ThreadFactory() {
1309           private int count = 1;
1310 
1311           @Override
1312           public Thread newThread(Runnable r) {
1313             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1314             return t;
1315           }
1316         });
1317       CompletionService<Void> completionService =
1318         new ExecutorCompletionService<Void>(closeThreadPool);
1319       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1320         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1321         completionService.submit(new Callable<Void>() {
1322           @Override
1323           public Void call() throws Exception {
1324             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1325             LOG.debug("Closing " + wap.p);
1326             try {
1327               wap.w.close();
1328             } catch (IOException ioe) {
1329               LOG.error("Couldn't close log at " + wap.p, ioe);
1330               thrown.add(ioe);
1331               return null;
1332             }
1333             if (LOG.isDebugEnabled()) {
1334               LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1335                 + " edits, skipped " + wap.editsSkipped + " edits in "
1336                 + (wap.nanosSpent / 1000 / 1000) + "ms");
1337             }
1338             if (wap.editsWritten == 0) {
1339               
1340               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1341                 LOG.warn("Failed deleting empty " + wap.p);
1342                 throw new IOException("Failed deleting empty  " + wap.p);
1343               }
1344               return null;
1345             }
1346 
1347             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1348               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1349             try {
1350               if (!dst.equals(wap.p) && fs.exists(dst)) {
1351                 LOG.warn("Found existing old edits file. It could be the "
1352                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1353                     + fs.getFileStatus(dst).getLen());
1354                 if (!fs.delete(dst, false)) {
1355                   LOG.warn("Failed deleting of old " + dst);
1356                   throw new IOException("Failed deleting of old " + dst);
1357                 }
1358               }
1359               
1360               
1361               
1362               if (fs.exists(wap.p)) {
1363                 if (!fs.rename(wap.p, dst)) {
1364                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1365                 }
1366                 LOG.info("Rename " + wap.p + " to " + dst);
1367               }
1368             } catch (IOException ioe) {
1369               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1370               thrown.add(ioe);
1371               return null;
1372             }
1373             paths.add(dst);
1374             return null;
1375           }
1376         });
1377       }
1378 
1379       boolean progress_failed = false;
1380       try {
1381         for (int i = 0, n = this.writers.size(); i < n; i++) {
1382           Future<Void> future = completionService.take();
1383           future.get();
1384           if (!progress_failed && reporter != null && !reporter.progress()) {
1385             progress_failed = true;
1386           }
1387         }
1388       } catch (InterruptedException e) {
1389         IOException iie = new InterruptedIOException();
1390         iie.initCause(e);
1391         throw iie;
1392       } catch (ExecutionException e) {
1393         throw new IOException(e.getCause());
1394       } finally {
1395         closeThreadPool.shutdownNow();
1396       }
1397 
1398       if (!thrown.isEmpty()) {
1399         throw MultipleIOException.createIOException(thrown);
1400       }
1401       writersClosed = true;
1402       closeAndCleanCompleted = true;
1403       if (progress_failed) {
1404         return null;
1405       }
1406       return paths;
1407     }
1408 
1409     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1410       if (writersClosed) {
1411         return thrown;
1412       }
1413 
1414       if (thrown == null) {
1415         thrown = Lists.newArrayList();
1416       }
1417       try {
1418         for (WriterThread t : writerThreads) {
1419           while (t.isAlive()) {
1420             t.shouldStop = true;
1421             t.interrupt();
1422             try {
1423               t.join(10);
1424             } catch (InterruptedException e) {
1425               IOException iie = new InterruptedIOException();
1426               iie.initCause(e);
1427               throw iie;
1428             }
1429           }
1430         }
1431       } finally {
1432         synchronized (writers) {
1433           WriterAndPath wap = null;
1434           for (SinkWriter tmpWAP : writers.values()) {
1435             try {
1436               wap = (WriterAndPath) tmpWAP;
1437               wap.w.close();
1438             } catch (IOException ioe) {
1439               LOG.error("Couldn't close log at " + wap.p, ioe);
1440               thrown.add(ioe);
1441               continue;
1442             }
1443             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1444                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1445           }
1446         }
1447         writersClosed = true;
1448       }
1449 
1450       return thrown;
1451     }
1452 
1453     
1454 
1455 
1456 
1457 
1458     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1459       byte region[] = entry.getKey().getEncodedRegionName();
1460       WriterAndPath ret = (WriterAndPath) writers.get(region);
1461       if (ret != null) {
1462         return ret;
1463       }
1464       
1465       
1466       if (blacklistedRegions.contains(region)) {
1467         return null;
1468       }
1469       ret = createWAP(region, entry, rootDir);
1470       if (ret == null) {
1471         blacklistedRegions.add(region);
1472         return null;
1473       }
1474       writers.put(region, ret);
1475       return ret;
1476     }
1477 
1478     
1479 
1480 
1481     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1482       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1483       if (regionedits == null) {
1484         return null;
1485       }
1486       if (fs.exists(regionedits)) {
1487         LOG.warn("Found old edits file. It could be the "
1488             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1489             + fs.getFileStatus(regionedits).getLen());
1490         if (!fs.delete(regionedits, false)) {
1491           LOG.warn("Failed delete of old " + regionedits);
1492         }
1493       }
1494       Writer w = createWriter(regionedits);
1495       LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1496       return (new WriterAndPath(regionedits, w));
1497     }
1498 
1499     private void filterCellByStore(Entry logEntry) {
1500       Map<byte[], Long> maxSeqIdInStores =
1501           regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1502       if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1503         return;
1504       }
1505       
1506       
1507       ArrayList<Cell> keptCells = new ArrayList<Cell>(logEntry.getEdit().getCells().size());
1508       for (Cell cell : logEntry.getEdit().getCells()) {
1509         if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1510           keptCells.add(cell);
1511         } else {
1512           byte[] family = CellUtil.cloneFamily(cell);
1513           Long maxSeqId = maxSeqIdInStores.get(family);
1514           
1515           
1516           if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) {
1517             keptCells.add(cell);
1518           }
1519         }
1520       }
1521 
1522       
1523       
1524       
1525       logEntry.getEdit().setCells(keptCells);
1526     }
1527 
1528     @Override
1529     public void append(RegionEntryBuffer buffer) throws IOException {
1530       List<Entry> entries = buffer.entryBuffer;
1531       if (entries.isEmpty()) {
1532         LOG.warn("got an empty buffer, skipping");
1533         return;
1534       }
1535 
1536       WriterAndPath wap = null;
1537 
1538       long startTime = System.nanoTime();
1539       try {
1540         int editsCount = 0;
1541 
1542         for (Entry logEntry : entries) {
1543           if (wap == null) {
1544             wap = getWriterAndPath(logEntry);
1545             if (wap == null) {
1546               if (LOG.isDebugEnabled()) {
1547                 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1548               }
1549               return;
1550             }
1551           }
1552           filterCellByStore(logEntry);
1553           if (!logEntry.getEdit().isEmpty()) {
1554             wap.w.append(logEntry);
1555             this.updateRegionMaximumEditLogSeqNum(logEntry);
1556             editsCount++;
1557           } else {
1558             wap.incrementSkippedEdits(1);
1559           }
1560         }
1561         
1562         wap.incrementEdits(editsCount);
1563         wap.incrementNanoTime(System.nanoTime() - startTime);
1564       } catch (IOException e) {
1565         e = RemoteExceptionHandler.checkIOException(e);
1566         LOG.fatal(" Got while writing log entry to log", e);
1567         throw e;
1568       }
1569     }
1570 
1571     
1572 
1573 
1574     @Override
1575     public Map<byte[], Long> getOutputCounts() {
1576       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1577       synchronized (writers) {
1578         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1579           ret.put(entry.getKey(), entry.getValue().editsWritten);
1580         }
1581       }
1582       return ret;
1583     }
1584 
1585     @Override
1586     public int getNumberOfRecoveredRegions() {
1587       return writers.size();
1588     }
1589   }
1590 
1591   
1592 
1593 
1594   public abstract static class SinkWriter {
1595     
1596     long editsWritten = 0;
1597     
1598     long editsSkipped = 0;
1599     
1600     long nanosSpent = 0;
1601 
1602     void incrementEdits(int edits) {
1603       editsWritten += edits;
1604     }
1605 
1606     void incrementSkippedEdits(int skipped) {
1607       editsSkipped += skipped;
1608     }
1609 
1610     void incrementNanoTime(long nanos) {
1611       nanosSpent += nanos;
1612     }
1613   }
1614 
1615   
1616 
1617 
1618 
1619   private final static class WriterAndPath extends SinkWriter {
1620     final Path p;
1621     final Writer w;
1622 
1623     WriterAndPath(final Path p, final Writer w) {
1624       this.p = p;
1625       this.w = w;
1626     }
1627   }
1628 
1629   
1630 
1631 
1632   class LogReplayOutputSink extends OutputSink {
1633     private static final double BUFFER_THRESHOLD = 0.35;
1634     private static final String KEY_DELIMITER = "#";
1635 
1636     private long waitRegionOnlineTimeOut;
1637     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1638     private final Map<String, RegionServerWriter> writers =
1639         new ConcurrentHashMap<String, RegionServerWriter>();
1640     
1641     private final Map<String, HRegionLocation> onlineRegions =
1642         new ConcurrentHashMap<String, HRegionLocation>();
1643 
1644     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1645         .synchronizedMap(new TreeMap<TableName, HConnection>());
1646     
1647 
1648 
1649 
1650     private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1651         new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1652     private List<Throwable> thrown = new ArrayList<Throwable>();
1653 
1654     
1655     
1656     
1657     
1658     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1659     private boolean hasEditsInDisablingOrDisabledTables = false;
1660 
1661     public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1662         int numWriters) {
1663       super(controller, entryBuffers, numWriters);
1664       this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1665           ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1666       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1667         entryBuffers, numWriters);
1668       this.logRecoveredEditsOutputSink.setReporter(reporter);
1669     }
1670 
1671     @Override
1672     public void append(RegionEntryBuffer buffer) throws IOException {
1673       List<Entry> entries = buffer.entryBuffer;
1674       if (entries.isEmpty()) {
1675         LOG.warn("got an empty buffer, skipping");
1676         return;
1677       }
1678 
1679       
1680       if (disablingOrDisabledTables.contains(buffer.tableName)) {
1681         
1682         logRecoveredEditsOutputSink.append(buffer);
1683         hasEditsInDisablingOrDisabledTables = true;
1684         
1685         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1686         return;
1687       }
1688 
1689       
1690       groupEditsByServer(entries);
1691 
1692       
1693       String maxLocKey = null;
1694       int maxSize = 0;
1695       List<Pair<HRegionLocation, Entry>> maxQueue = null;
1696       synchronized (this.serverToBufferQueueMap) {
1697         for (String key : this.serverToBufferQueueMap.keySet()) {
1698           List<Pair<HRegionLocation, Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1699           if (curQueue.size() > maxSize) {
1700             maxSize = curQueue.size();
1701             maxQueue = curQueue;
1702             maxLocKey = key;
1703           }
1704         }
1705         if (maxSize < minBatchSize
1706             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1707           
1708           return;
1709         } else if (maxSize > 0) {
1710           this.serverToBufferQueueMap.remove(maxLocKey);
1711         }
1712       }
1713 
1714       if (maxSize > 0) {
1715         processWorkItems(maxLocKey, maxQueue);
1716       }
1717     }
1718 
1719     private void addToRecoveredRegions(String encodedRegionName) {
1720       if (!recoveredRegions.contains(encodedRegionName)) {
1721         recoveredRegions.add(encodedRegionName);
1722       }
1723     }
1724 
1725     
1726 
1727 
1728 
1729     private void groupEditsByServer(List<Entry> entries) throws IOException {
1730       Set<TableName> nonExistentTables = null;
1731       Long cachedLastFlushedSequenceId = -1l;
1732       for (Entry entry : entries) {
1733         WALEdit edit = entry.getEdit();
1734         TableName table = entry.getKey().getTablename();
1735         
1736         entry.getKey().setScopes(null);
1737         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1738         
1739         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1740           this.skippedEdits.incrementAndGet();
1741           continue;
1742         }
1743 
1744         Map<byte[], Long> maxStoreSequenceIds = null;
1745         boolean needSkip = false;
1746         HRegionLocation loc = null;
1747         String locKey = null;
1748         List<Cell> cells = edit.getCells();
1749         List<Cell> skippedCells = new ArrayList<Cell>();
1750         HConnection hconn = this.getConnectionByTableName(table);
1751 
1752         for (Cell cell : cells) {
1753           byte[] row = CellUtil.cloneRow(cell);
1754           byte[] family = CellUtil.cloneFamily(cell);
1755           boolean isCompactionEntry = false;
1756           if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1757             CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1758             if (compaction != null && compaction.hasRegionName()) {
1759               try {
1760                 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1761                   .toByteArray());
1762                 row = regionName[1]; 
1763                 family = compaction.getFamilyName().toByteArray();
1764                 isCompactionEntry = true;
1765               } catch (Exception ex) {
1766                 LOG.warn("Unexpected exception received, ignoring " + ex);
1767                 skippedCells.add(cell);
1768                 continue;
1769               }
1770             } else {
1771               skippedCells.add(cell);
1772               continue;
1773             }
1774           }
1775 
1776           try {
1777             loc =
1778                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1779                   encodeRegionNameStr);
1780             
1781             if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1782               loc.getRegionInfo().getEncodedName())) {
1783               LOG.info("Not replaying a compaction marker for an older region: "
1784                   + encodeRegionNameStr);
1785               needSkip = true;
1786             }
1787           } catch (TableNotFoundException ex) {
1788             
1789             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1790                 + encodeRegionNameStr);
1791             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1792             if (nonExistentTables == null) {
1793               nonExistentTables = new TreeSet<TableName>();
1794             }
1795             nonExistentTables.add(table);
1796             this.skippedEdits.incrementAndGet();
1797             needSkip = true;
1798             break;
1799           }
1800 
1801           cachedLastFlushedSequenceId =
1802               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1803           if (cachedLastFlushedSequenceId != null
1804               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1805             
1806             this.skippedEdits.incrementAndGet();
1807             needSkip = true;
1808             break;
1809           } else {
1810             if (maxStoreSequenceIds == null) {
1811               maxStoreSequenceIds =
1812                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1813             }
1814             if (maxStoreSequenceIds != null) {
1815               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1816               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1817                 
1818                 skippedCells.add(cell);
1819                 continue;
1820               }
1821             }
1822           }
1823         }
1824 
1825         
1826         if (loc == null || needSkip) continue;
1827 
1828         if (!skippedCells.isEmpty()) {
1829           cells.removeAll(skippedCells);
1830         }
1831 
1832         synchronized (serverToBufferQueueMap) {
1833           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1834           List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1835           if (queue == null) {
1836             queue =
1837                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1838             serverToBufferQueueMap.put(locKey, queue);
1839           }
1840           queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1841         }
1842         
1843         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1844       }
1845     }
1846 
1847     
1848 
1849 
1850 
1851 
1852     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1853         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1854       
1855       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1856       if(loc != null) return loc;
1857       
1858       loc = hconn.getRegionLocation(table, row, true);
1859       if (loc == null) {
1860         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1861             + " of table:" + table);
1862       }
1863       
1864       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1865         
1866         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1867         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1868         if (tmpLoc != null) return tmpLoc;
1869       }
1870 
1871       Long lastFlushedSequenceId = -1l;
1872       AtomicBoolean isRecovering = new AtomicBoolean(true);
1873       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1874       if (!isRecovering.get()) {
1875         
1876         
1877         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1878         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1879             + " because it's not in recovering.");
1880       } else {
1881         Long cachedLastFlushedSequenceId =
1882             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1883 
1884         
1885         
1886         RegionStoreSequenceIds ids =
1887             csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1888               loc.getRegionInfo().getEncodedName());
1889         if (ids != null) {
1890           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1891           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1892           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1893           for (StoreSequenceId id : maxSeqIdInStores) {
1894             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1895           }
1896           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1897         }
1898 
1899         if (cachedLastFlushedSequenceId == null
1900             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1901           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1902         }
1903       }
1904 
1905       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1906       return loc;
1907     }
1908 
1909     private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1910         throws IOException {
1911       RegionServerWriter rsw = null;
1912 
1913       long startTime = System.nanoTime();
1914       try {
1915         rsw = getRegionServerWriter(key);
1916         rsw.sink.replayEntries(actions);
1917 
1918         
1919         rsw.incrementEdits(actions.size());
1920         rsw.incrementNanoTime(System.nanoTime() - startTime);
1921       } catch (IOException e) {
1922         e = RemoteExceptionHandler.checkIOException(e);
1923         LOG.fatal(" Got while writing log entry to log", e);
1924         throw e;
1925       }
1926     }
1927 
1928     
1929 
1930 
1931 
1932 
1933 
1934 
1935 
1936 
1937     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1938         final long timeout, AtomicBoolean isRecovering)
1939         throws IOException {
1940       final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1941       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1942         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1943       boolean reloadLocation = false;
1944       TableName tableName = loc.getRegionInfo().getTable();
1945       int tries = 0;
1946       Throwable cause = null;
1947       while (endTime > EnvironmentEdgeManager.currentTime()) {
1948         try {
1949           
1950           HConnection hconn = getConnectionByTableName(tableName);
1951           if(reloadLocation) {
1952             loc = hconn.getRegionLocation(tableName, row, true);
1953           }
1954           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1955           HRegionInfo region = loc.getRegionInfo();
1956           try {
1957             GetRegionInfoRequest request =
1958                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1959             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1960             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1961               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1962               return loc;
1963             }
1964           } catch (ServiceException se) {
1965             throw ProtobufUtil.getRemoteException(se);
1966           }
1967         } catch (IOException e) {
1968           cause = e.getCause();
1969           if(!(cause instanceof RegionOpeningException)) {
1970             reloadLocation = true;
1971           }
1972         }
1973         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1974         try {
1975           Thread.sleep(expectedSleep);
1976         } catch (InterruptedException e) {
1977           throw new IOException("Interrupted when waiting region " +
1978               loc.getRegionInfo().getEncodedName() + " online.", e);
1979         }
1980         tries++;
1981       }
1982 
1983       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1984         " online for " + timeout + " milliseconds.", cause);
1985     }
1986 
1987     @Override
1988     public boolean flush() throws IOException {
1989       String curLoc = null;
1990       int curSize = 0;
1991       List<Pair<HRegionLocation, Entry>> curQueue = null;
1992       synchronized (this.serverToBufferQueueMap) {
1993         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1994           curQueue = this.serverToBufferQueueMap.get(locationKey);
1995           if (!curQueue.isEmpty()) {
1996             curSize = curQueue.size();
1997             curLoc = locationKey;
1998             break;
1999           }
2000         }
2001         if (curSize > 0) {
2002           this.serverToBufferQueueMap.remove(curLoc);
2003         }
2004       }
2005 
2006       if (curSize > 0) {
2007         this.processWorkItems(curLoc, curQueue);
2008         
2009         synchronized(controller.dataAvailable) {
2010           controller.dataAvailable.notifyAll();
2011         }
2012         return true;
2013       }
2014       return false;
2015     }
2016 
2017     void addWriterError(Throwable t) {
2018       thrown.add(t);
2019     }
2020 
2021     @Override
2022     public List<Path> finishWritingAndClose() throws IOException {
2023       try {
2024         if (!finishWriting(false)) {
2025           return null;
2026         }
2027         if (hasEditsInDisablingOrDisabledTables) {
2028           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2029         } else {
2030           splits = new ArrayList<Path>();
2031         }
2032         
2033         return splits;
2034       } finally {
2035         List<IOException> thrown = closeRegionServerWriters();
2036         if (thrown != null && !thrown.isEmpty()) {
2037           throw MultipleIOException.createIOException(thrown);
2038         }
2039       }
2040     }
2041 
2042     @Override
2043     int getNumOpenWriters() {
2044       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2045     }
2046 
2047     private List<IOException> closeRegionServerWriters() throws IOException {
2048       List<IOException> result = null;
2049       if (!writersClosed) {
2050         result = Lists.newArrayList();
2051         try {
2052           for (WriterThread t : writerThreads) {
2053             while (t.isAlive()) {
2054               t.shouldStop = true;
2055               t.interrupt();
2056               try {
2057                 t.join(10);
2058               } catch (InterruptedException e) {
2059                 IOException iie = new InterruptedIOException();
2060                 iie.initCause(e);
2061                 throw iie;
2062               }
2063             }
2064           }
2065         } finally {
2066           synchronized (writers) {
2067             for (String locationKey : writers.keySet()) {
2068               RegionServerWriter tmpW = writers.get(locationKey);
2069               try {
2070                 tmpW.close();
2071               } catch (IOException ioe) {
2072                 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2073                 result.add(ioe);
2074               }
2075             }
2076           }
2077 
2078           
2079           synchronized (this.tableNameToHConnectionMap) {
2080             for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
2081               HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2082               try {
2083                 hconn.clearRegionCache();
2084                 hconn.close();
2085               } catch (IOException ioe) {
2086                 result.add(ioe);
2087               }
2088             }
2089           }
2090           writersClosed = true;
2091         }
2092       }
2093       return result;
2094     }
2095 
2096     @Override
2097     public Map<byte[], Long> getOutputCounts() {
2098       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2099       synchronized (writers) {
2100         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2101           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2102         }
2103       }
2104       return ret;
2105     }
2106 
2107     @Override
2108     public int getNumberOfRecoveredRegions() {
2109       return this.recoveredRegions.size();
2110     }
2111 
2112     
2113 
2114 
2115 
2116 
2117     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2118       RegionServerWriter ret = writers.get(loc);
2119       if (ret != null) {
2120         return ret;
2121       }
2122 
2123       TableName tableName = getTableFromLocationStr(loc);
2124       if(tableName == null){
2125         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2126       }
2127 
2128       HConnection hconn = getConnectionByTableName(tableName);
2129       synchronized (writers) {
2130         ret = writers.get(loc);
2131         if (ret == null) {
2132           ret = new RegionServerWriter(conf, tableName, hconn);
2133           writers.put(loc, ret);
2134         }
2135       }
2136       return ret;
2137     }
2138 
2139     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2140       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2141       if (hconn == null) {
2142         synchronized (this.tableNameToHConnectionMap) {
2143           hconn = this.tableNameToHConnectionMap.get(tableName);
2144           if (hconn == null) {
2145             hconn = HConnectionManager.getConnection(conf);
2146             this.tableNameToHConnectionMap.put(tableName, hconn);
2147           }
2148         }
2149       }
2150       return hconn;
2151     }
2152     private TableName getTableFromLocationStr(String loc) {
2153       
2154 
2155 
2156       String[] splits = loc.split(KEY_DELIMITER);
2157       if (splits.length != 2) {
2158         return null;
2159       }
2160       return TableName.valueOf(splits[1]);
2161     }
2162   }
2163 
2164   
2165 
2166 
2167 
2168   private final static class RegionServerWriter extends SinkWriter {
2169     final WALEditsReplaySink sink;
2170 
2171     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2172         throws IOException {
2173       this.sink = new WALEditsReplaySink(conf, tableName, conn);
2174     }
2175 
2176     void close() throws IOException {
2177     }
2178   }
2179 
2180   static class CorruptedLogFileException extends Exception {
2181     private static final long serialVersionUID = 1L;
2182 
2183     CorruptedLogFileException(String s) {
2184       super(s);
2185     }
2186   }
2187 
2188   
2189   public static class MutationReplay {
2190     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2191       this.type = type;
2192       this.mutation = mutation;
2193       if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2194         
2195         this.mutation.setDurability(Durability.ASYNC_WAL);
2196       }
2197       this.nonceGroup = nonceGroup;
2198       this.nonce = nonce;
2199     }
2200 
2201     public final MutationType type;
2202     public final Mutation mutation;
2203     public final long nonceGroup;
2204     public final long nonce;
2205   }
2206 
2207   
2208 
2209 
2210 
2211 
2212 
2213 
2214 
2215 
2216 
2217 
2218   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2219       Pair<WALKey, WALEdit> logEntry, Durability durability)
2220           throws IOException {
2221     if (entry == null) {
2222       
2223       return new ArrayList<MutationReplay>();
2224     }
2225 
2226     long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2227       entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2228     int count = entry.getAssociatedCellCount();
2229     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2230     Cell previousCell = null;
2231     Mutation m = null;
2232     WALKey key = null;
2233     WALEdit val = null;
2234     if (logEntry != null) val = new WALEdit();
2235 
2236     for (int i = 0; i < count; i++) {
2237       
2238       if (!cells.advance()) {
2239         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2240       }
2241       Cell cell = cells.current();
2242       if (val != null) val.add(cell);
2243 
2244       boolean isNewRowOrType =
2245           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2246               || !CellUtil.matchingRow(previousCell, cell);
2247       if (isNewRowOrType) {
2248         
2249         if (CellUtil.isDelete(cell)) {
2250           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2251           
2252           mutations.add(new MutationReplay(
2253               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2254         } else {
2255           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2256           
2257           long nonceGroup = entry.getKey().hasNonceGroup()
2258               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2259           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2260           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2261         }
2262       }
2263       if (CellUtil.isDelete(cell)) {
2264         ((Delete) m).addDeleteMarker(cell);
2265       } else {
2266         ((Put) m).add(cell);
2267       }
2268       if (m != null) {
2269         m.setDurability(durability);
2270       }
2271       previousCell = cell;
2272     }
2273 
2274     
2275     if (logEntry != null) {
2276       org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2277       List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2278       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2279         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2280       }
2281       
2282       key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2283               walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2284               clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
2285       logEntry.setFirst(key);
2286       logEntry.setSecond(val);
2287     }
2288 
2289     return mutations;
2290   }
2291 }