1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.wal.WAL.Reader;
28 import org.apache.hadoop.hbase.wal.WAL.Entry;
29 import org.apache.hadoop.hbase.wal.WALFactory;
30
31 import java.io.IOException;
32
33 /**
34 * Wrapper class around WAL to help manage the implementation details
35 * such as compression.
36 */
37 @InterfaceAudience.Private
38 public class ReplicationWALReaderManager {
39
40 private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
41 private final FileSystem fs;
42 private final Configuration conf;
43 private long position = 0;
44 private Reader reader;
45 private Path lastPath;
46
47 /**
48 * Creates the helper but doesn't open any file
49 * Use setInitialPosition after using the constructor if some content needs to be skipped
50 * @param fs
51 * @param conf
52 */
53 public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
54 this.fs = fs;
55 this.conf = conf;
56 }
57
58 /**
59 * Opens the file at the current position
60 * @param path
61 * @return an WAL reader.
62 * @throws IOException
63 */
64 public Reader openReader(Path path) throws IOException {
65 // Detect if this is a new file, if so get a new reader else
66 // reset the current reader so that we see the new data
67 if (this.reader == null || !this.lastPath.equals(path)) {
68 this.closeReader();
69 this.reader = WALFactory.createReader(this.fs, path, this.conf);
70 this.lastPath = path;
71 } else {
72 try {
73 this.reader.reset();
74 } catch (NullPointerException npe) {
75 throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
76 }
77 }
78 return this.reader;
79 }
80
81 /**
82 * Get the next entry, returned and also added in the array
83 * @return a new entry or null
84 * @throws IOException
85 */
86 public Entry readNextAndSetPosition() throws IOException {
87 Entry entry = this.reader.next();
88 // Store the position so that in the future the reader can start
89 // reading from here. If the above call to next() throws an
90 // exception, the position won't be changed and retry will happen
91 // from the last known good position
92 this.position = this.reader.getPosition();
93 // We need to set the CC to null else it will be compressed when sent to the sink
94 if (entry != null) {
95 entry.setCompressionContext(null);
96 }
97 return entry;
98 }
99
100 /**
101 * Advance the reader to the current position
102 * @throws IOException
103 */
104 public void seek() throws IOException {
105 if (this.position != 0) {
106 this.reader.seek(this.position);
107 }
108 }
109
110 /**
111 * Get the position that we stopped reading at
112 * @return current position, cannot be negative
113 */
114 public long getPosition() {
115 return this.position;
116 }
117
118 public void setPosition(long pos) {
119 this.position = pos;
120 }
121
122 /**
123 * Close the current reader
124 * @throws IOException
125 */
126 public void closeReader() throws IOException {
127 if (this.reader != null) {
128 this.reader.close();
129 this.reader = null;
130 }
131 }
132
133 /**
134 * Tell the helper to reset internal state
135 */
136 void finishCurrentFile() {
137 this.position = 0;
138 try {
139 this.closeReader();
140 } catch (IOException e) {
141 LOG.warn("Unable to close reader", e);
142 }
143 }
144
145 }