1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.io.OutputStream;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.net.URLEncoder;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableMap;
37  import java.util.Set;
38  import java.util.TreeMap;
39  import java.util.UUID;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.ConcurrentMap;
43  import java.util.concurrent.ConcurrentSkipListMap;
44  import java.util.concurrent.CopyOnWriteArrayList;
45  import java.util.concurrent.CountDownLatch;
46  import java.util.concurrent.ExecutionException;
47  import java.util.concurrent.ExecutorService;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.LinkedBlockingQueue;
50  import java.util.concurrent.TimeUnit;
51  import java.util.concurrent.atomic.AtomicBoolean;
52  import java.util.concurrent.atomic.AtomicInteger;
53  import java.util.concurrent.atomic.AtomicLong;
54  import java.util.concurrent.locks.ReentrantLock;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.FSDataOutputStream;
60  import org.apache.hadoop.fs.FileStatus;
61  import org.apache.hadoop.fs.FileSystem;
62  import org.apache.hadoop.fs.Path;
63  import org.apache.hadoop.fs.PathFilter;
64  import org.apache.hadoop.hbase.Cell;
65  import org.apache.hadoop.hbase.CellUtil;
66  import org.apache.hadoop.hbase.HBaseConfiguration;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HTableDescriptor;
70  import org.apache.hadoop.hbase.KeyValue;
71  import org.apache.hadoop.hbase.TableName;
72  import org.apache.hadoop.hbase.classification.InterfaceAudience;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.ClassSize;
75  import org.apache.hadoop.hbase.util.DrainBarrier;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.FSUtils;
78  import org.apache.hadoop.hbase.util.HasThread;
79  import org.apache.hadoop.hbase.util.Threads;
80  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
81  import org.apache.hadoop.hbase.wal.WAL;
82  import org.apache.hadoop.hbase.wal.WALFactory;
83  import org.apache.hadoop.hbase.wal.WALKey;
84  import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
85  import org.apache.hadoop.hbase.wal.WALProvider.Writer;
86  import org.apache.hadoop.hbase.wal.WALSplitter;
87  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
88  import org.apache.hadoop.util.StringUtils;
89  import org.apache.htrace.NullScope;
90  import org.apache.htrace.Span;
91  import org.apache.htrace.Trace;
92  import org.apache.htrace.TraceScope;
93  
94  import com.google.common.annotations.VisibleForTesting;
95  import com.google.common.collect.Maps;
96  import com.lmax.disruptor.BlockingWaitStrategy;
97  import com.lmax.disruptor.EventHandler;
98  import com.lmax.disruptor.ExceptionHandler;
99  import com.lmax.disruptor.LifecycleAware;
100 import com.lmax.disruptor.TimeoutException;
101 import com.lmax.disruptor.dsl.Disruptor;
102 import com.lmax.disruptor.dsl.ProducerType;
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133 @InterfaceAudience.Private
134 public class FSHLog implements WAL {
135   
136   
137   
138   
139   
140   
141   
142   
143   
144   
145   
146   
147   
148   
149   
150   
151   
152   
153   
154   
155   
156   
157   
158   
159   
160   
161   
162   
163   
164   
165   
166   
167   
168 
169   static final Log LOG = LogFactory.getLog(FSHLog.class);
170 
171   private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; 
172   
173   
174 
175 
176 
177 
178 
179 
180 
181   private final Disruptor<RingBufferTruck> disruptor;
182 
183   
184 
185 
186   private final ExecutorService appendExecutor;
187 
188   
189 
190 
191 
192 
193 
194   private final RingBufferEventHandler ringBufferEventHandler;
195 
196   
197 
198 
199 
200 
201 
202   private final Map<Thread, SyncFuture> syncFuturesByHandler;
203 
204   
205 
206 
207 
208   private volatile long highestUnsyncedSequence = -1;
209 
210   
211 
212 
213 
214 
215   private final AtomicLong highestSyncedSequence = new AtomicLong(0);
216 
217   
218 
219 
220   protected final FileSystem fs;
221 
222   
223 
224 
225   private final Path fullPathLogDir;
226   
227 
228 
229   private final Path fullPathArchiveDir;
230 
231   
232 
233 
234   private final PathFilter ourFiles;
235 
236   
237 
238 
239   private final String logFilePrefix;
240 
241   
242 
243 
244   private final String logFileSuffix;
245 
246   
247 
248 
249   private final String prefixPathStr;
250 
251   private final WALCoprocessorHost coprocessorHost;
252 
253   
254 
255 
256   protected final Configuration conf;
257   
258   private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
259 
260   @Override
261   public void registerWALActionsListener(final WALActionsListener listener) {
262     this.listeners.add(listener);
263   }
264   
265   @Override
266   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
267     return this.listeners.remove(listener);
268   }
269 
270   @Override
271   public WALCoprocessorHost getCoprocessorHost() {
272     return coprocessorHost;
273   }
274   
275 
276 
277   private FSDataOutputStream hdfs_out;
278 
279   
280 
281   
282   private final int minTolerableReplication;
283 
284   
285   private final Method getNumCurrentReplicas;
286   private final Method getPipeLine; 
287   private final int slowSyncNs;
288 
289   private final static Object [] NO_ARGS = new Object []{};
290 
291   
292   
293   
294   
295   private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
296 
297   private final int lowReplicationRollLimit;
298 
299   
300   
301   
302   private volatile boolean lowReplicationRollEnabled = true;
303 
304   
305 
306 
307   volatile Writer writer;
308 
309   
310   private final DrainBarrier closeBarrier = new DrainBarrier();
311 
312   
313 
314 
315 
316 
317 
318 
319   private final ReentrantLock rollWriterLock = new ReentrantLock(true);
320 
321   private volatile boolean closed = false;
322   private final AtomicBoolean shutdown = new AtomicBoolean(false);
323 
324   
325   private final AtomicLong filenum = new AtomicLong(-1);
326 
327   
328   private final AtomicInteger numEntries = new AtomicInteger(0);
329 
330   
331   private final long logrollsize;
332 
333   
334 
335 
336   private AtomicLong totalLogSize = new AtomicLong(0);
337 
338   
339 
340 
341 
342 
343   private final int maxLogs;
344 
345   
346   private final int closeErrorsTolerated;
347 
348   private final AtomicInteger closeErrorCount = new AtomicInteger();
349 
350   
351   
352   
353   
354 
355 
356 
357 
358 
359 
360   private final Object regionSequenceIdLock = new Object();
361 
362   
363 
364 
365 
366 
367 
368 
369   private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
370     = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
371       Bytes.BYTES_COMPARATOR);
372 
373   
374 
375 
376 
377 
378 
379 
380 
381   private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
382     new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
383 
384  
385 
386 
387 
388 
389 
390 
391 
392 
393 
394 
395   private Map<byte[], Long> highestRegionSequenceIds = new HashMap<byte[], Long>();
396 
397   
398 
399 
400 
401   final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
402     @Override
403     public int compare(Path o1, Path o2) {
404       long t1 = getFileNumFromFileName(o1);
405       long t2 = getFileNumFromFileName(o2);
406       if (t1 == t2) {
407         return 0;
408       }
409       return (t1 > t2) ? 1 : -1;
410     }
411   };
412 
413   
414 
415 
416 
417   private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
418     new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
419 
420   
421 
422 
423 
424   static class RingBufferExceptionHandler implements ExceptionHandler {
425     @Override
426     public void handleEventException(Throwable ex, long sequence, Object event) {
427       LOG.error("Sequence=" + sequence + ", event=" + event, ex);
428       throw new RuntimeException(ex);
429     }
430 
431     @Override
432     public void handleOnStartException(Throwable ex) {
433       LOG.error(ex);
434       throw new RuntimeException(ex);
435     }
436 
437     @Override
438     public void handleOnShutdownException(Throwable ex) {
439       LOG.error(ex);
440       throw new RuntimeException(ex);
441     }
442   }
443 
444   
445 
446 
447 
448 
449 
450 
451 
452 
453   public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
454       throws IOException {
455     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
456   }
457 
458   
459 
460 
461 
462 
463 
464 
465 
466 
467 
468 
469 
470 
471 
472 
473 
474 
475 
476 
477 
478 
479 
480 
481 
482   public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
483       final String archiveDir, final Configuration conf,
484       final List<WALActionsListener> listeners,
485       final boolean failIfWALExists, final String prefix, final String suffix)
486       throws IOException {
487     this.fs = fs;
488     this.fullPathLogDir = new Path(rootDir, logDir);
489     this.fullPathArchiveDir = new Path(rootDir, archiveDir);
490     this.conf = conf;
491 
492     if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
493       throw new IOException("Unable to mkdir " + fullPathLogDir);
494     }
495 
496     if (!fs.exists(this.fullPathArchiveDir)) {
497       if (!fs.mkdirs(this.fullPathArchiveDir)) {
498         throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
499       }
500     }
501 
502     
503     this.logFilePrefix =
504       prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
505     
506     if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
507       throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
508           "' but instead was '" + suffix + "'");
509     }
510     
511     
512     FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY,
513       HConstants.DEFAULT_WAL_STORAGE_POLICY);
514     this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
515     this.prefixPathStr = new Path(fullPathLogDir,
516         logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
517 
518     this.ourFiles = new PathFilter() {
519       @Override
520       public boolean accept(final Path fileName) {
521         
522         final String fileNameString = fileName.toString();
523         if (!fileNameString.startsWith(prefixPathStr)) {
524           return false;
525         }
526         if (logFileSuffix.isEmpty()) {
527           
528           return org.apache.commons.lang.StringUtils.isNumeric(
529               fileNameString.substring(prefixPathStr.length()));
530         } else if (!fileNameString.endsWith(logFileSuffix)) {
531           return false;
532         }
533         return true;
534       }
535     };
536 
537     if (failIfWALExists) {
538       final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
539       if (null != walFiles && 0 != walFiles.length) {
540         throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
541       }
542     }
543 
544     
545     if (listeners != null) {
546       for (WALActionsListener i: listeners) {
547         registerWALActionsListener(i);
548       }
549     }
550     this.coprocessorHost = new WALCoprocessorHost(this, conf);
551 
552     
553     
554     final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
555         FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
556     this.logrollsize =
557       (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
558 
559     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
560     this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication",
561         FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
562     this.lowReplicationRollLimit =
563       conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
564     this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
565     int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
566 
567     LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
568       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
569       ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
570       this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
571 
572     
573     rollWriter();
574 
575     this.slowSyncNs =
576         1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
577           DEFAULT_SLOW_SYNC_TIME_MS);
578     
579     
580     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
581     this.getPipeLine = getGetPipeline(this.hdfs_out);
582 
583     
584     
585     String hostingThreadName = Thread.currentThread().getName();
586     this.appendExecutor = Executors.
587       newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
588     
589     
590     
591     
592     final int preallocatedEventCount =
593       this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
594     
595     
596     this.disruptor =
597       new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
598         this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
599     
600     
601     this.disruptor.getRingBuffer().next();
602     this.ringBufferEventHandler =
603       new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
604         maxHandlersCount);
605     this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
606     this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
607     
608     this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
609     
610     this.disruptor.start();
611   }
612 
613   
614 
615 
616 
617   protected FileStatus[] getFiles() throws IOException {
618     return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
619   }
620 
621   
622 
623 
624 
625 
626 
627 
628 
629   @VisibleForTesting
630   OutputStream getOutputStream() {
631     FSDataOutputStream fsdos = this.hdfs_out;
632     if (fsdos == null) {
633       return null;
634     }
635     return fsdos.getWrappedStream();
636   }
637 
638   @Override
639   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
640     return rollWriter(false);
641   }
642 
643   
644 
645 
646 
647   private Path getNewPath() throws IOException {
648     this.filenum.set(System.currentTimeMillis());
649     Path newPath = getCurrentFileName();
650     while (fs.exists(newPath)) {
651       this.filenum.incrementAndGet();
652       newPath = getCurrentFileName();
653     }
654     return newPath;
655   }
656 
657   Path getOldPath() {
658     long currentFilenum = this.filenum.get();
659     Path oldPath = null;
660     if (currentFilenum > 0) {
661       
662       oldPath = computeFilename(currentFilenum);
663     } 
664     return oldPath;
665   }
666 
667   
668 
669 
670 
671   private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
672   throws IOException {
673     if (!this.listeners.isEmpty()) {
674       for (WALActionsListener i : this.listeners) {
675         i.preLogRoll(oldPath, newPath);
676       }
677     }
678   }
679 
680   
681 
682 
683 
684   private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
685   throws IOException {
686     if (!this.listeners.isEmpty()) {
687       for (WALActionsListener i : this.listeners) {
688         i.postLogRoll(oldPath, newPath);
689       }
690     }
691   }
692 
693   
694 
695 
696 
697   private void preemptiveSync(final ProtobufLogWriter nextWriter) {
698     long startTimeNanos = System.nanoTime();
699     try {
700       nextWriter.sync();
701       postSync(System.nanoTime() - startTimeNanos, 0);
702     } catch (IOException e) {
703       
704       LOG.warn("pre-sync failed but an optimization so keep going", e);
705     }
706   }
707 
708   @Override
709   public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
710     rollWriterLock.lock();
711     try {
712       
713       if (!force && (this.writer != null && this.numEntries.get() <= 0)) {
714         return null;
715       }
716       byte [][] regionsToFlush = null;
717       if (this.closed) {
718         LOG.debug("WAL closed. Skipping rolling of writer");
719         return regionsToFlush;
720       }
721       if (!closeBarrier.beginOp()) {
722         LOG.debug("WAL closing. Skipping rolling of writer");
723         return regionsToFlush;
724       }
725       TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
726       try {
727         Path oldPath = getOldPath();
728         Path newPath = getNewPath();
729         
730         Writer nextWriter = this.createWriterInstance(newPath);
731         FSDataOutputStream nextHdfsOut = null;
732         if (nextWriter instanceof ProtobufLogWriter) {
733           nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
734           
735           
736           preemptiveSync((ProtobufLogWriter)nextWriter);
737         }
738         tellListenersAboutPreLogRoll(oldPath, newPath);
739         
740         newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
741         tellListenersAboutPostLogRoll(oldPath, newPath);
742         
743         if (getNumRolledLogFiles() > 0) {
744           cleanOldLogs();
745           regionsToFlush = findRegionsToForceFlush();
746         }
747       } finally {
748         closeBarrier.endOp();
749         assert scope == NullScope.INSTANCE || !scope.isDetached();
750         scope.close();
751       }
752       return regionsToFlush;
753     } finally {
754       rollWriterLock.unlock();
755     }
756   }
757 
758   
759 
760 
761 
762 
763 
764   protected Writer createWriterInstance(final Path path) throws IOException {
765     return DefaultWALProvider.createWriter(conf, fs, path, false);
766   }
767 
768   private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
769     long result = HConstants.NO_SEQNUM;
770     for (Long seqNum: seqIdMap.values()) {
771       if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
772         result = seqNum.longValue();
773       }
774     }
775     return result;
776   }
777 
778   private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
779       Map<byte[], T> mapToCopy) {
780     Map<byte[], Long> copied = Maps.newHashMap();
781     for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
782       long lowestSeqId = getLowestSeqId(entry.getValue());
783       if (lowestSeqId != HConstants.NO_SEQNUM) {
784         copied.put(entry.getKey(), lowestSeqId);
785       }
786     }
787     return copied;
788   }
789 
790   
791 
792 
793 
794 
795 
796 
797 
798 
799 
800 
801   private void cleanOldLogs() throws IOException {
802     Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
803     Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
804     List<Path> logsToArchive = new ArrayList<Path>();
805     
806     synchronized (regionSequenceIdLock) {
807       lowestFlushingRegionSequenceIdsLocal =
808           copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
809       oldestUnflushedRegionSequenceIdsLocal =
810           copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
811     }
812     for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
813       
814       Path log = e.getKey();
815       Map<byte[], Long> sequenceNums = e.getValue();
816       
817       if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
818           oldestUnflushedRegionSequenceIdsLocal)) {
819         logsToArchive.add(log);
820         LOG.debug("WAL file ready for archiving " + log);
821       }
822     }
823     for (Path p : logsToArchive) {
824       this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
825       archiveLogFile(p);
826       this.byWalRegionSequenceIds.remove(p);
827     }
828   }
829 
830   
831 
832 
833 
834 
835 
836 
837 
838 
839 
840 
841    static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
842       Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
843     for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
844       
845       
846       long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
847           oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
848       long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
849           oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
850           
851       long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
852       if (minSeqNum <= regionSeqIdEntry.getValue()) return false;
853     }
854     return true;
855   }
856 
857   
858 
859 
860 
861 
862 
863 
864 
865 
866 
867 
868   private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
869     List<byte[]> regionsToFlush = null;
870     
871     synchronized (regionSequenceIdLock) {
872       for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
873         ConcurrentMap<byte[], Long> m =
874             this.oldestUnflushedStoreSequenceIds.get(e.getKey());
875         if (m == null) {
876           continue;
877         }
878         long unFlushedVal = Collections.min(m.values());
879         if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
880           if (regionsToFlush == null)
881             regionsToFlush = new ArrayList<byte[]>();
882           regionsToFlush.add(e.getKey());
883         }
884       }
885     }
886     return regionsToFlush == null ? null : regionsToFlush
887         .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
888   }
889 
890   
891 
892 
893 
894 
895 
896 
897   byte[][] findRegionsToForceFlush() throws IOException {
898     byte [][] regions = null;
899     int logCount = getNumRolledLogFiles();
900     if (logCount > this.maxLogs && logCount > 0) {
901       Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
902         this.byWalRegionSequenceIds.firstEntry();
903       regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
904     }
905     if (regions != null) {
906       StringBuilder sb = new StringBuilder();
907       for (int i = 0; i < regions.length; i++) {
908         if (i > 0) {
909           sb.append(", ");
910         }
911         sb.append(Bytes.toStringBinary(regions[i]));
912       }
913       LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
914          this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
915          sb.toString());
916     }
917     return regions;
918   }
919 
920   
921 
922 
923 
924   @VisibleForTesting
925   protected void afterCreatingZigZagLatch() {}
926 
927   
928 
929 
930   @VisibleForTesting
931   protected void beforeWaitOnSafePoint() {};
932 
933   
934 
935 
936 
937 
938 
939 
940 
941 
942 
943 
944 
945 
946 
947 
948 
949 
950 
951   Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
952       final FSDataOutputStream nextHdfsOut)
953   throws IOException {
954     
955     
956     
957     
958     
959     SyncFuture syncFuture = null;
960     SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
961       null: this.ringBufferEventHandler.attainSafePoint();
962     afterCreatingZigZagLatch();
963     TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
964     try {
965       
966       
967       
968       
969       
970       try {
971         if (zigzagLatch != null) {
972           Trace.addTimelineAnnotation("awaiting safepoint");
973           syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
974         }
975       } catch (FailedSyncBeforeLogCloseException e) {
976         
977         if (isUnflushedEntries()) {
978           throw e;
979         }
980         LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
981           e.getMessage());
982       }
983 
984       
985       
986       try {
987         if (this.writer != null) {
988           Trace.addTimelineAnnotation("closing writer");
989           this.writer.close();
990           Trace.addTimelineAnnotation("writer closed");
991         }
992         this.closeErrorCount.set(0);
993       } catch (IOException ioe) {
994         int errors = closeErrorCount.incrementAndGet();
995         if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
996           LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
997             ioe.getMessage() + "\", errors=" + errors +
998             "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
999         } else {
1000           throw ioe;
1001         }
1002       }
1003       this.writer = nextWriter;
1004       this.hdfs_out = nextHdfsOut;
1005       int oldNumEntries = this.numEntries.get();
1006       this.numEntries.set(0);
1007       final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
1008       if (oldPath != null) {
1009         this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
1010         this.highestRegionSequenceIds = new HashMap<byte[], Long>();
1011         long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
1012         this.totalLogSize.addAndGet(oldFileLen);
1013         LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
1014           ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
1015           newPathString);
1016       } else {
1017         LOG.info("New WAL " + newPathString);
1018       }
1019     } catch (InterruptedException ie) {
1020       
1021       Thread.currentThread().interrupt();
1022     } catch (IOException e) {
1023       long count = getUnflushedEntriesCount();
1024       LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
1025       throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
1026     } finally {
1027       try {
1028         
1029         if (zigzagLatch != null) {
1030           zigzagLatch.releaseSafePoint();
1031           
1032           
1033           
1034           
1035           
1036           
1037           if (syncFuture != null) {
1038             try {
1039               blockOnSync(syncFuture);
1040             } catch (IOException ioe) {
1041               if (LOG.isTraceEnabled()) {
1042                 LOG.trace("Stale sync exception", ioe);
1043               }
1044             }
1045           }
1046         }
1047       } finally {
1048         scope.close();
1049       }
1050     }
1051     return newPath;
1052   }
1053 
1054   long getUnflushedEntriesCount() {
1055     long highestSynced = this.highestSyncedSequence.get();
1056     return highestSynced > this.highestUnsyncedSequence?
1057       0: this.highestUnsyncedSequence - highestSynced;
1058   }
1059 
1060   boolean isUnflushedEntries() {
1061     return getUnflushedEntriesCount() > 0;
1062   }
1063 
1064   
1065 
1066 
1067 
1068   public static Path getWALArchivePath(Path archiveDir, Path p) {
1069     return new Path(archiveDir, p.getName());
1070   }
1071 
1072   private void archiveLogFile(final Path p) throws IOException {
1073     Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
1074     
1075     if (!this.listeners.isEmpty()) {
1076       for (WALActionsListener i : this.listeners) {
1077         i.preLogArchive(p, newPath);
1078       }
1079     }
1080     LOG.info("Archiving " + p + " to " + newPath);
1081     if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
1082       throw new IOException("Unable to rename " + p + " to " + newPath);
1083     }
1084     
1085     if (!this.listeners.isEmpty()) {
1086       for (WALActionsListener i : this.listeners) {
1087         i.postLogArchive(p, newPath);
1088       }
1089     }
1090   }
1091 
1092   
1093 
1094 
1095 
1096 
1097 
1098   protected Path computeFilename(final long filenum) {
1099     if (filenum < 0) {
1100       throw new RuntimeException("WAL file number can't be < 0");
1101     }
1102     String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
1103     return new Path(fullPathLogDir, child);
1104   }
1105 
1106   
1107 
1108 
1109 
1110 
1111   public Path getCurrentFileName() {
1112     return computeFilename(this.filenum.get());
1113   }
1114   
1115   
1116 
1117 
1118 
1119   public long getFilenum() {
1120     return filenum.get();
1121   }
1122   
1123   @Override
1124   public String toString() {
1125     return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
1126   }
1127 
1128 
1129 
1130 
1131 
1132 
1133 
1134 
1135 
1136   protected long getFileNumFromFileName(Path fileName) {
1137     if (fileName == null) throw new IllegalArgumentException("file name can't be null");
1138     if (!ourFiles.accept(fileName)) {
1139       throw new IllegalArgumentException("The log file " + fileName +
1140           " doesn't belong to this WAL. (" + toString() + ")");
1141     }
1142     final String fileNameString = fileName.toString();
1143     String chompedPath = fileNameString.substring(prefixPathStr.length(),
1144         (fileNameString.length() - logFileSuffix.length()));
1145     return Long.parseLong(chompedPath);
1146   }
1147 
1148   @Override
1149   public void close() throws IOException {
1150     shutdown();
1151     final FileStatus[] files = getFiles();
1152     if (null != files && 0 != files.length) {
1153       for (FileStatus file : files) {
1154         Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
1155         
1156         if (!this.listeners.isEmpty()) {
1157           for (WALActionsListener i : this.listeners) {
1158             i.preLogArchive(file.getPath(), p);
1159           }
1160         }
1161 
1162         if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1163           throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1164         }
1165         
1166         if (!this.listeners.isEmpty()) {
1167           for (WALActionsListener i : this.listeners) {
1168             i.postLogArchive(file.getPath(), p);
1169           }
1170         }
1171       }
1172       LOG.debug("Moved " + files.length + " WAL file(s) to " +
1173         FSUtils.getPath(this.fullPathArchiveDir));
1174     }
1175     LOG.info("Closed WAL: " + toString() );
1176   }
1177 
1178   @Override
1179   public void shutdown() throws IOException {
1180     if (shutdown.compareAndSet(false, true)) {
1181       try {
1182         
1183         closeBarrier.stopAndDrainOps();
1184       } catch (InterruptedException e) {
1185         LOG.error("Exception while waiting for cache flushes and log rolls", e);
1186         Thread.currentThread().interrupt();
1187       }
1188 
1189       
1190       
1191       
1192       if (this.disruptor != null) {
1193         long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
1194         try {
1195           this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
1196         } catch (TimeoutException e) {
1197           LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
1198             "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
1199           this.disruptor.halt();
1200           this.disruptor.shutdown();
1201         }
1202       }
1203       
1204       if (this.appendExecutor !=  null) this.appendExecutor.shutdown();
1205 
1206       
1207       if (!this.listeners.isEmpty()) {
1208         for (WALActionsListener i : this.listeners) {
1209           i.logCloseRequested();
1210         }
1211       }
1212       this.closed = true;
1213       if (LOG.isDebugEnabled()) {
1214         LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
1215       }
1216       if (this.writer != null) {
1217         this.writer.close();
1218         this.writer = null;
1219       }
1220     }
1221   }
1222 
1223   
1224 
1225 
1226 
1227 
1228 
1229 
1230 
1231   @SuppressWarnings("deprecation")
1232   protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
1233       long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
1234     
1235     return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
1236   }
1237   
1238   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
1239       justification="Will never be null")
1240   @Override
1241   public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
1242       final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
1243       final List<Cell> memstoreCells) throws IOException {
1244     if (this.closed) throw new IOException("Cannot append; log is closed");
1245     
1246     
1247     TraceScope scope = Trace.startSpan("FSHLog.append");
1248 
1249     
1250     
1251     
1252     FSWALEntry entry = null;
1253     long sequence = this.disruptor.getRingBuffer().next();
1254     try {
1255       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1256       
1257       
1258       
1259       entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri,
1260         
1261         (memstoreCells != null)? memstoreCells: edits == null? null: edits.getCells());
1262       truck.loadPayload(entry, scope.detach());
1263     } finally {
1264       this.disruptor.getRingBuffer().publish(sequence);
1265     }
1266     return sequence;
1267   }
1268 
1269   
1270 
1271 
1272 
1273 
1274 
1275 
1276 
1277 
1278 
1279 
1280 
1281 
1282 
1283 
1284   private class SyncRunner extends HasThread {
1285     private volatile long sequence;
1286     
1287     private final BlockingQueue<SyncFuture> syncFutures;
1288  
1289     
1290 
1291 
1292 
1293 
1294 
1295 
1296 
1297 
1298 
1299     SyncRunner(final String name, final int maxHandlersCount) {
1300       super(name);
1301       
1302       
1303       
1304       
1305       
1306       
1307       
1308       
1309       
1310       
1311       
1312       
1313       
1314       this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3);
1315     }
1316 
1317     void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
1318       
1319       this.sequence = sequence;
1320       for (int i = 0; i < syncFutureCount; ++i) {
1321         this.syncFutures.add(syncFutures[i]);
1322       }
1323     }
1324 
1325     
1326 
1327 
1328 
1329 
1330 
1331 
1332     private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
1333         final Throwable t) {
1334       if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
1335       
1336       return 1;
1337     }
1338  
1339     
1340 
1341 
1342 
1343 
1344 
1345     private int releaseSyncFutures(final long currentSequence, final Throwable t) {
1346       int syncCount = 0;
1347       for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
1348         if (syncFuture.getRingBufferSequence() > currentSequence) break;
1349         releaseSyncFuture(syncFuture, currentSequence, t);
1350         if (!this.syncFutures.remove(syncFuture)) {
1351           throw new IllegalStateException(syncFuture.toString());
1352         }
1353         syncCount++;
1354       }
1355       return syncCount;
1356     }
1357 
1358     
1359 
1360 
1361 
1362     private long updateHighestSyncedSequence(long sequence) {
1363       long currentHighestSyncedSequence;
1364       
1365       do {
1366         currentHighestSyncedSequence = highestSyncedSequence.get();
1367         if (currentHighestSyncedSequence >= sequence) {
1368           
1369           
1370           sequence = currentHighestSyncedSequence;
1371           break;
1372         }
1373       } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
1374       return sequence;
1375     }
1376 
1377     public void run() {
1378       long currentSequence;
1379       while (!isInterrupted()) {
1380         int syncCount = 0;
1381         SyncFuture takeSyncFuture;
1382         try {
1383           while (true) {
1384             
1385             takeSyncFuture = this.syncFutures.take();
1386             currentSequence = this.sequence;
1387             long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
1388             if (syncFutureSequence > currentSequence) {
1389               throw new IllegalStateException("currentSequence=" + syncFutureSequence +
1390                 ", syncFutureSequence=" + syncFutureSequence);
1391             }
1392             
1393             long currentHighestSyncedSequence = highestSyncedSequence.get();
1394             if (currentSequence < currentHighestSyncedSequence) {
1395               syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
1396               
1397               continue;
1398             }
1399             break;
1400           }
1401           
1402           
1403           TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
1404           long start = System.nanoTime();
1405           Throwable lastException = null;
1406           try {
1407             Trace.addTimelineAnnotation("syncing writer");
1408             writer.sync();
1409             Trace.addTimelineAnnotation("writer synced");
1410             currentSequence = updateHighestSyncedSequence(currentSequence);
1411           } catch (IOException e) {
1412             LOG.error("Error syncing, request close of WAL", e);
1413             lastException = e;
1414           } catch (Exception e) {
1415             LOG.warn("UNEXPECTED", e);
1416             lastException = e;
1417           } finally {
1418             
1419             takeSyncFuture.setSpan(scope.detach());
1420             
1421             syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
1422             
1423             syncCount += releaseSyncFutures(currentSequence, lastException);
1424             if (lastException != null) requestLogRoll();
1425             else checkLogRoll();
1426           }
1427           postSync(System.nanoTime() - start, syncCount);
1428         } catch (InterruptedException e) {
1429           
1430           Thread.currentThread().interrupt();
1431         } catch (Throwable t) {
1432           LOG.warn("UNEXPECTED, continuing", t);
1433         }
1434       }
1435     }
1436   }
1437 
1438   
1439 
1440 
1441   void checkLogRoll() {
1442     
1443     if (!rollWriterLock.tryLock()) return;
1444     boolean lowReplication;
1445     try {
1446       lowReplication = checkLowReplication();
1447     } finally {
1448       rollWriterLock.unlock();
1449     }
1450     try {
1451       if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1452         requestLogRoll(lowReplication);
1453       }
1454     } catch (IOException e) {
1455       LOG.warn("Writer.getLength() failed; continuing", e);
1456     }
1457   }
1458 
1459   
1460 
1461 
1462   private boolean checkLowReplication() {
1463     boolean logRollNeeded = false;
1464     
1465     
1466     try {
1467       int numCurrentReplicas = getLogReplication();
1468       if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
1469         if (this.lowReplicationRollEnabled) {
1470           if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1471             LOG.warn("HDFS pipeline error detected. " + "Found "
1472                 + numCurrentReplicas + " replicas but expecting no less than "
1473                 + this.minTolerableReplication + " replicas. "
1474                 + " Requesting close of WAL. current pipeline: "
1475                 + Arrays.toString(getPipeLine()));
1476             logRollNeeded = true;
1477             
1478             
1479             
1480             this.consecutiveLogRolls.getAndIncrement();
1481           } else {
1482             LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1483                 + "the total number of live datanodes is lower than the tolerable replicas.");
1484             this.consecutiveLogRolls.set(0);
1485             this.lowReplicationRollEnabled = false;
1486           }
1487         }
1488       } else if (numCurrentReplicas >= this.minTolerableReplication) {
1489         if (!this.lowReplicationRollEnabled) {
1490           
1491           
1492           
1493           if (this.numEntries.get() <= 1) {
1494             return logRollNeeded;
1495           }
1496           
1497           
1498           this.lowReplicationRollEnabled = true;
1499           LOG.info("LowReplication-Roller was enabled.");
1500         }
1501       }
1502     } catch (Exception e) {
1503       LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
1504         ", continuing...");
1505     }
1506     return logRollNeeded;
1507   }
1508 
1509   private SyncFuture publishSyncOnRingBuffer() {
1510     return publishSyncOnRingBuffer(null);
1511   }
1512 
1513   private SyncFuture publishSyncOnRingBuffer(Span span) {
1514     long sequence = this.disruptor.getRingBuffer().next();
1515     SyncFuture syncFuture = getSyncFuture(sequence, span);
1516     try {
1517       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1518       truck.loadPayload(syncFuture);
1519     } finally {
1520       this.disruptor.getRingBuffer().publish(sequence);
1521     }
1522     return syncFuture;
1523   }
1524 
1525   
1526   private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
1527     return blockOnSync(publishSyncOnRingBuffer(span));
1528   }
1529 
1530   private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
1531     
1532     try {
1533       syncFuture.get();
1534       return syncFuture.getSpan();
1535     } catch (InterruptedException ie) {
1536       LOG.warn("Interrupted", ie);
1537       throw convertInterruptedExceptionToIOException(ie);
1538     } catch (ExecutionException e) {
1539       throw ensureIOException(e.getCause());
1540     }
1541   }
1542 
1543   private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1544     Thread.currentThread().interrupt();
1545     IOException ioe = new InterruptedIOException();
1546     ioe.initCause(ie);
1547     return ioe;
1548   }
1549 
1550   private SyncFuture getSyncFuture(final long sequence, Span span) {
1551     SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
1552     if (syncFuture == null) {
1553       syncFuture = new SyncFuture();
1554       this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
1555     }
1556     return syncFuture.reset(sequence, span);
1557   }
1558 
1559   private void postSync(final long timeInNanos, final int handlerSyncs) {
1560     if (timeInNanos > this.slowSyncNs) {
1561       String msg =
1562           new StringBuilder().append("Slow sync cost: ")
1563               .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
1564               .append(Arrays.toString(getPipeLine())).toString();
1565       Trace.addTimelineAnnotation(msg);
1566       LOG.info(msg);
1567     }
1568     if (!listeners.isEmpty()) {
1569       for (WALActionsListener listener : listeners) {
1570         listener.postSync(timeInNanos, handlerSyncs);
1571       }
1572     }
1573   }
1574 
1575   private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1576     long len = 0;
1577     if (!listeners.isEmpty()) {
1578       for (Cell cell : e.getEdit().getCells()) {
1579         len += CellUtil.estimatedSerializedSizeOf(cell);
1580       }
1581       for (WALActionsListener listener : listeners) {
1582         listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1583       }
1584     }
1585     return len;
1586   }
1587 
1588   
1589 
1590 
1591 
1592 
1593   private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
1594     
1595     
1596     Method m = null;
1597     if (os != null) {
1598       Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
1599       try {
1600         m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
1601         m.setAccessible(true);
1602       } catch (NoSuchMethodException e) {
1603         LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
1604          "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
1605       } catch (SecurityException e) {
1606         LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
1607           "not available; fsOut=" + wrappedStreamClass.getName(), e);
1608         m = null; 
1609       }
1610     }
1611     if (m != null) {
1612       if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
1613     }
1614     return m;
1615   }
1616 
1617   
1618 
1619 
1620 
1621 
1622 
1623 
1624 
1625 
1626 
1627 
1628 
1629   @VisibleForTesting
1630   int getLogReplication()
1631   throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1632     final OutputStream stream = getOutputStream();
1633     if (this.getNumCurrentReplicas != null && stream != null) {
1634       Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
1635       if (repl instanceof Integer) {
1636         return ((Integer)repl).intValue();
1637       }
1638     }
1639     return 0;
1640   }
1641 
1642   @Override
1643   public void sync() throws IOException {
1644     TraceScope scope = Trace.startSpan("FSHLog.sync");
1645     try {
1646       scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1647     } finally {
1648       assert scope == NullScope.INSTANCE || !scope.isDetached();
1649       scope.close();
1650     }
1651   }
1652 
1653   @Override
1654   public void sync(long txid) throws IOException {
1655     if (this.highestSyncedSequence.get() >= txid){
1656       
1657       return;
1658     }
1659     TraceScope scope = Trace.startSpan("FSHLog.sync");
1660     try {
1661       scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1662     } finally {
1663       assert scope == NullScope.INSTANCE || !scope.isDetached();
1664       scope.close();
1665     }
1666   }
1667 
1668   
1669   public void requestLogRoll() {
1670     requestLogRoll(false);
1671   }
1672 
1673   private void requestLogRoll(boolean tooFewReplicas) {
1674     if (!this.listeners.isEmpty()) {
1675       for (WALActionsListener i: this.listeners) {
1676         i.logRollRequested(tooFewReplicas);
1677       }
1678     }
1679   }
1680 
1681   
1682   
1683   public int getNumRolledLogFiles() {
1684     return byWalRegionSequenceIds.size();
1685   }
1686 
1687   
1688   
1689   public int getNumLogFiles() {
1690     
1691     return getNumRolledLogFiles() + 1;
1692   }
1693 
1694   
1695   
1696   public long getLogFileSize() {
1697     return this.totalLogSize.get();
1698   }
1699 
1700   @Override
1701   public Long startCacheFlush(final byte[] encodedRegionName,
1702       Set<byte[]> flushedFamilyNames) {
1703     Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
1704     if (!closeBarrier.beginOp()) {
1705       LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
1706         " - because the server is closing.");
1707       return null;
1708     }
1709     long oldestUnflushedSequenceId = HConstants.NO_SEQNUM;
1710     synchronized (regionSequenceIdLock) {
1711       ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1712           oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1713       if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1714         for (byte[] familyName: flushedFamilyNames) {
1715           Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
1716           if (seqId != null) {
1717             oldStoreSeqNum.put(familyName, seqId);
1718           }
1719         }
1720         if (!oldStoreSeqNum.isEmpty()) {
1721           Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
1722               encodedRegionName, oldStoreSeqNum);
1723           assert oldValue == null: "Flushing map not cleaned up for "
1724               + Bytes.toString(encodedRegionName);
1725         }
1726         if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
1727           
1728           
1729           
1730           
1731           oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
1732         } else {
1733           oldestUnflushedSequenceId =
1734               Collections.min(oldestUnflushedStoreSequenceIdsOfRegion.values());
1735         }
1736       }
1737     }
1738     if (oldStoreSeqNum.isEmpty()) {
1739       
1740       
1741       
1742       
1743       
1744       LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1745         + Bytes.toString(encodedRegionName) + "]");
1746     }
1747     return oldestUnflushedSequenceId;
1748   }
1749 
1750   @Override
1751   public void completeCacheFlush(final byte [] encodedRegionName) {
1752     synchronized (regionSequenceIdLock) {
1753       this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
1754     }
1755     closeBarrier.endOp();
1756   }
1757 
1758   private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
1759       byte[] encodedRegionName) {
1760     ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1761         oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1762     if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1763       return oldestUnflushedStoreSequenceIdsOfRegion;
1764     }
1765     oldestUnflushedStoreSequenceIdsOfRegion =
1766         new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1767     ConcurrentMap<byte[], Long> alreadyPut =
1768         oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName,
1769           oldestUnflushedStoreSequenceIdsOfRegion);
1770     return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
1771   }
1772 
1773   @Override
1774   public void abortCacheFlush(byte[] encodedRegionName) {
1775     Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
1776     Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1777     synchronized (regionSequenceIdLock) {
1778       storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
1779         encodedRegionName);
1780       if (storeSeqNumsBeforeFlushStarts != null) {
1781         ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1782             getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
1783         for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
1784             .entrySet()) {
1785           currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
1786             oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
1787               familyNameAndSeqId.getValue()));
1788         }
1789       }
1790     }
1791     closeBarrier.endOp();
1792     if (storeSeqNumsBeforeFlushStarts != null) {
1793       for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
1794         Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
1795         if (currentSeqNum != null
1796             && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
1797           String errorStr =
1798               "Region " + Bytes.toString(encodedRegionName) + " family "
1799                   + Bytes.toString(familyNameAndSeqId.getKey())
1800                   + " acquired edits out of order current memstore seq=" + currentSeqNum
1801                   + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
1802           LOG.error(errorStr);
1803           Runtime.getRuntime().halt(1);
1804         }
1805       }
1806     }
1807   }
1808 
1809   @VisibleForTesting
1810   boolean isLowReplicationRollEnabled() {
1811       return lowReplicationRollEnabled;
1812   }
1813 
1814   public static final long FIXED_OVERHEAD = ClassSize.align(
1815     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1816     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1817 
1818   private static void split(final Configuration conf, final Path p)
1819   throws IOException {
1820     FileSystem fs = FileSystem.get(conf);
1821     if (!fs.exists(p)) {
1822       throw new FileNotFoundException(p.toString());
1823     }
1824     if (!fs.getFileStatus(p).isDirectory()) {
1825       throw new IOException(p + " is not a directory");
1826     }
1827 
1828     final Path baseDir = FSUtils.getRootDir(conf);
1829     final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1830     WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1831   }
1832 
1833   @Override
1834   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1835     ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1836         this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1837     return oldestUnflushedStoreSequenceIdsOfRegion != null ?
1838         getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
1839   }
1840 
1841   @Override
1842   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
1843       byte[] familyName) {
1844     synchronized (regionSequenceIdLock) {
1845       Map<byte[], Long> m = this.lowestFlushingStoreSequenceIds.get(encodedRegionName);
1846       if (m != null) {
1847         Long earlist = m.get(familyName);
1848         if (earlist != null) {
1849           return earlist;
1850         }
1851       }
1852       m = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1853       if (m != null) {
1854         Long earlist = m.get(familyName);
1855         if (earlist != null) {
1856           return earlist;
1857         }
1858       }
1859     }
1860     return HConstants.NO_SEQNUM;
1861   }
1862 
1863   
1864 
1865 
1866 
1867 
1868 
1869 
1870 
1871 
1872 
1873 
1874 
1875 
1876 
1877 
1878 
1879 
1880 
1881 
1882 
1883 
1884 
1885 
1886 
1887 
1888 
1889   static class SafePointZigZagLatch {
1890     
1891 
1892 
1893     private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
1894     
1895 
1896 
1897     private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
1898  
1899     
1900 
1901 
1902 
1903 
1904 
1905 
1906 
1907 
1908 
1909     SyncFuture waitSafePoint(final SyncFuture syncFuture)
1910     throws InterruptedException, FailedSyncBeforeLogCloseException {
1911       while (true) {
1912         if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break;
1913         if (syncFuture.isThrowable()) {
1914           throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
1915         }
1916       }
1917       return syncFuture;
1918     }
1919  
1920     
1921 
1922 
1923 
1924 
1925 
1926     void safePointAttained() throws InterruptedException {
1927       this.safePointAttainedLatch.countDown();
1928       this.safePointReleasedLatch.await();
1929     }
1930 
1931     
1932 
1933 
1934 
1935     void releaseSafePoint() {
1936       this.safePointReleasedLatch.countDown();
1937     }
1938 
1939     
1940 
1941 
1942     boolean isCocked() {
1943       return this.safePointAttainedLatch.getCount() > 0 &&
1944         this.safePointReleasedLatch.getCount() > 0;
1945     }
1946   }
1947 
1948   
1949 
1950 
1951 
1952 
1953 
1954 
1955 
1956 
1957 
1958 
1959 
1960 
1961 
1962 
1963 
1964 
1965 
1966 
1967 
1968 
1969 
1970 
1971   class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
1972     private final SyncRunner [] syncRunners;
1973     private final SyncFuture [] syncFutures;
1974     
1975     
1976     private volatile int syncFuturesCount = 0;
1977     private volatile SafePointZigZagLatch zigzagLatch;
1978     
1979 
1980 
1981 
1982     private Exception exception = null;
1983     
1984 
1985 
1986     private final Object safePointWaiter = new Object();
1987     private volatile boolean shutdown = false;
1988 
1989     
1990 
1991 
1992     private int syncRunnerIndex;
1993 
1994     RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
1995       this.syncFutures = new SyncFuture[maxHandlersCount];
1996       this.syncRunners = new SyncRunner[syncRunnerCount];
1997       for (int i = 0; i < syncRunnerCount; i++) {
1998         this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
1999       }
2000     }
2001 
2002     private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
2003       
2004       for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
2005       this.syncFuturesCount = 0;
2006     }
2007 
2008     
2009 
2010 
2011     private boolean isOutstandingSyncs() {
2012       for (int i = 0; i < this.syncFuturesCount; i++) {
2013         if (!this.syncFutures[i].isDone()) return true;
2014       }
2015       return false;
2016     }
2017 
2018     @Override
2019     
2020     public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
2021     throws Exception {
2022       
2023       
2024       
2025       
2026       
2027       
2028       
2029 
2030       try {
2031         if (truck.hasSyncFuturePayload()) {
2032           this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
2033           
2034           if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
2035         } else if (truck.hasFSWALEntryPayload()) {
2036           TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
2037           try {
2038             FSWALEntry entry = truck.unloadFSWALEntryPayload();
2039             if (this.exception != null) {
2040               
2041               
2042               
2043               
2044               
2045               entry.stampRegionSequenceId();
2046               
2047               return;
2048             }
2049             append(entry);
2050           } catch (Exception e) {
2051             
2052             this.exception = e;
2053             
2054             return;
2055           } finally {
2056             assert scope == NullScope.INSTANCE || !scope.isDetached();
2057             scope.close(); 
2058           }
2059         } else {
2060           
2061           cleanupOutstandingSyncsOnException(sequence,
2062             new IllegalStateException("Neither append nor sync"));
2063           
2064           return;
2065         }
2066 
2067         
2068 
2069         
2070         
2071         if (!endOfBatch || this.syncFuturesCount <= 0) return;
2072 
2073         
2074 
2075         if (LOG.isTraceEnabled()) {
2076           LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount);
2077         }
2078 
2079         if (this.exception == null) {
2080           
2081           
2082           this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
2083           try {
2084             this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
2085               this.syncFuturesCount);
2086           } catch (Exception e) {
2087             
2088             requestLogRoll();
2089             this.exception = new DamagedWALException("Failed offering sync", e);
2090           }
2091         }
2092         
2093         if (this.exception != null) {
2094           cleanupOutstandingSyncsOnException(sequence,
2095             new DamagedWALException("On sync", this.exception));
2096         }
2097         attainSafePoint(sequence);
2098         this.syncFuturesCount = 0;
2099       } catch (Throwable t) {
2100         LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
2101       }
2102     }
2103 
2104     SafePointZigZagLatch attainSafePoint() {
2105       this.zigzagLatch = new SafePointZigZagLatch();
2106       return this.zigzagLatch;
2107     }
2108 
2109     
2110 
2111 
2112 
2113     private void attainSafePoint(final long currentSequence) {
2114       if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
2115       
2116       beforeWaitOnSafePoint();
2117       try {
2118         
2119         
2120         
2121         while (!this.shutdown && this.zigzagLatch.isCocked() &&
2122             highestSyncedSequence.get() < currentSequence &&
2123             
2124             
2125             isOutstandingSyncs()) {
2126           synchronized (this.safePointWaiter) {
2127             this.safePointWaiter.wait(0, 1);
2128           }
2129         }
2130         
2131         
2132         
2133         this.exception = null;
2134         this.zigzagLatch.safePointAttained();
2135       } catch (InterruptedException e) {
2136         LOG.warn("Interrupted ", e);
2137         Thread.currentThread().interrupt();
2138       }
2139     }
2140 
2141     private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
2142         Set<byte[]> familyNameSet, Long lRegionSequenceId) {
2143       ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
2144           getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
2145       for (byte[] familyName : familyNameSet) {
2146         oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
2147       }
2148     }
2149 
2150     
2151 
2152 
2153 
2154 
2155     void append(final FSWALEntry entry) throws Exception {
2156       
2157       atHeadOfRingBufferEventHandlerAppend();
2158 
2159       long start = EnvironmentEdgeManager.currentTime();
2160       byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
2161       long regionSequenceId = WALKey.NO_SEQUENCE_ID;
2162       try {
2163         
2164         
2165         
2166         regionSequenceId = entry.stampRegionSequenceId();
2167         
2168         
2169         
2170         
2171         if (entry.getEdit().isEmpty()) {
2172           return;
2173         }
2174         
2175         
2176         if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
2177             entry.getEdit())) {
2178           if (entry.getEdit().isReplay()) {
2179             
2180             entry.getKey().setScopes(null);
2181           }
2182         }
2183         if (!listeners.isEmpty()) {
2184           for (WALActionsListener i: listeners) {
2185             
2186             i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
2187               entry.getEdit());
2188           }
2189         }
2190 
2191         writer.append(entry);
2192         assert highestUnsyncedSequence < entry.getSequence();
2193         highestUnsyncedSequence = entry.getSequence();
2194         Long lRegionSequenceId = Long.valueOf(regionSequenceId);
2195         highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
2196         if (entry.isInMemstore()) {
2197           updateOldestUnflushedSequenceIds(encodedRegionName,
2198               entry.getFamilyNames(), lRegionSequenceId);
2199         }
2200 
2201         coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
2202         
2203         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
2204       } catch (Exception e) {
2205         String msg = "Failed appending " + regionSequenceId + ", requesting roll of WAL";
2206         LOG.warn(msg, e);
2207         requestLogRoll();
2208         throw new DamagedWALException(msg, e);
2209       }
2210       numEntries.incrementAndGet();
2211     }
2212 
2213     @Override
2214     public void onStart() {
2215       for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
2216     }
2217 
2218     @Override
2219     public void onShutdown() {
2220       for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
2221     }
2222   }
2223 
2224   
2225 
2226 
2227   @VisibleForTesting
2228   void atHeadOfRingBufferEventHandlerAppend() {
2229     
2230   }
2231 
2232   private static IOException ensureIOException(final Throwable t) {
2233     return (t instanceof IOException)? (IOException)t: new IOException(t);
2234   }
2235 
2236   private static void usage() {
2237     System.err.println("Usage: FSHLog <ARGS>");
2238     System.err.println("Arguments:");
2239     System.err.println(" --dump  Dump textual representation of passed one or more files");
2240     System.err.println("         For example: " +
2241       "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
2242     System.err.println(" --split Split the passed directory of WAL logs");
2243     System.err.println("         For example: " +
2244       "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
2245   }
2246 
2247   
2248 
2249 
2250 
2251 
2252 
2253 
2254   public static void main(String[] args) throws IOException {
2255     if (args.length < 2) {
2256       usage();
2257       System.exit(-1);
2258     }
2259     
2260     if (args[0].compareTo("--dump") == 0) {
2261       WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2262     } else if (args[0].compareTo("--perf") == 0) {
2263       LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
2264       LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
2265           args[1]);
2266       System.exit(-1);
2267     } else if (args[0].compareTo("--split") == 0) {
2268       Configuration conf = HBaseConfiguration.create();
2269       for (int i = 1; i < args.length; i++) {
2270         try {
2271           Path logPath = new Path(args[i]);
2272           FSUtils.setFsDefault(conf, logPath);
2273           split(conf, logPath);
2274         } catch (IOException t) {
2275           t.printStackTrace(System.err);
2276           System.exit(-1);
2277         }
2278       }
2279     } else {
2280       usage();
2281       System.exit(-1);
2282     }
2283   }
2284   
2285   
2286 
2287 
2288 
2289   private Method getGetPipeline(final FSDataOutputStream os) {
2290     Method m = null;
2291     if (os != null) {
2292       Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
2293           .getClass();
2294       try {
2295         m = wrappedStreamClass.getDeclaredMethod("getPipeline",
2296           new Class<?>[] {});
2297         m.setAccessible(true);
2298       } catch (NoSuchMethodException e) {
2299         LOG.info("FileSystem's output stream doesn't support"
2300             + " getPipeline; not available; fsOut="
2301             + wrappedStreamClass.getName());
2302       } catch (SecurityException e) {
2303         LOG.info(
2304           "Doesn't have access to getPipeline on "
2305               + "FileSystems's output stream ; fsOut="
2306               + wrappedStreamClass.getName(), e);
2307         m = null; 
2308       }
2309     }
2310     return m;
2311   }
2312 
2313   
2314 
2315 
2316   @VisibleForTesting
2317   DatanodeInfo[] getPipeLine() {
2318     if (this.getPipeLine != null && this.hdfs_out != null) {
2319       Object repl;
2320       try {
2321         repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
2322         if (repl instanceof DatanodeInfo[]) {
2323           return ((DatanodeInfo[]) repl);
2324         }
2325       } catch (Exception e) {
2326         LOG.info("Get pipeline failed", e);
2327       }
2328     }
2329     return new DatanodeInfo[0];
2330   }
2331 }