1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.io;
19
20 import java.io.IOException;
21
22 import org.apache.hadoop.fs.FSDataInputStream;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.fs.HFileSystem;
26 import org.apache.hadoop.hbase.io.FileLink;
27
28 import com.google.common.annotations.VisibleForTesting;
29
30 /**
31 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums,
32 * as well as closing streams. Initialization is not thread-safe, but normal operation is;
33 * see method comments.
34 */
35 public class FSDataInputStreamWrapper {
36 private final HFileSystem hfs;
37 private final Path path;
38 private final FileLink link;
39 private final boolean doCloseStreams;
40
41 /** Two stream handles, one with and one without FS-level checksum.
42 * HDFS checksum setting is on FS level, not single read level, so you have to keep two
43 * FS objects and two handles open to interleave different reads freely, which is very sad.
44 * This is what we do:
45 * 1) First, we need to read the trailer of HFile to determine checksum parameters.
46 * We always use FS checksum to do that, so ctor opens {@link #stream}.
47 * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream};
48 * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum},
49 * and close {@link #stream}. User MUST call prepareForBlockReader for that to happen;
50 * if they don't, (2.1) will be the default.
51 * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to
52 * {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could
53 * return both in one call). This stream is guaranteed to be set.
54 * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}.
55 * That will take lock, and open {@link #stream}. While this is going on, others will
56 * continue to use the old stream; if they also want to fall back, they'll also call
57 * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
58 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum.
59 * We will have 2 handles; however we presume checksums fail so rarely that we don't care.
60 */
61 private volatile FSDataInputStream stream = null;
62 private volatile FSDataInputStream streamNoFsChecksum = null;
63 private Object streamNoFsChecksumFirstCreateLock = new Object();
64
65 // The configuration states that we should validate hbase checksums
66 private boolean useHBaseChecksumConfigured;
67
68 // Record the current state of this reader with respect to
69 // validating checkums in HBase. This is originally set the same
70 // value as useHBaseChecksumConfigured, but can change state as and when
71 // we encounter checksum verification failures.
72 private volatile boolean useHBaseChecksum;
73
74 // In the case of a checksum failure, do these many succeeding
75 // reads without hbase checksum verification.
76 private volatile int hbaseChecksumOffCount = -1;
77
78 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
79 this(fs, null, path);
80 }
81
82 public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
83 this(fs, link, null);
84 }
85
86 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException {
87 assert (path == null) != (link == null);
88 this.path = path;
89 this.link = link;
90 this.doCloseStreams = true;
91 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
92 // that wraps over the specified fs. In this case, we will not be able to avoid
93 // checksumming inside the filesystem.
94 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
95
96 // Initially we are going to read the tail block. Open the reader w/FS checksum.
97 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
98 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
99 }
100
101 /**
102 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
103 * reads finish and before any other reads start (what happens in reality is we read the
104 * tail, then call this based on what's in the tail, then read blocks).
105 * @param forceNoHBaseChecksum Force not using HBase checksum.
106 */
107 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
108 if (hfs == null) return;
109 assert this.stream != null && !this.useHBaseChecksumConfigured;
110 boolean useHBaseChecksum =
111 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
112
113 if (useHBaseChecksum) {
114 FileSystem fsNc = hfs.getNoChecksumFs();
115 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
116 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
117 // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
118 this.stream.close();
119 this.stream = null;
120 }
121 }
122
123 /** For use in tests. */
124 @VisibleForTesting
125 public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
126 this(fsdis, fsdis);
127 }
128
129 /** For use in tests. */
130 @VisibleForTesting
131 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
132 doCloseStreams = false;
133 stream = fsdis;
134 streamNoFsChecksum = noChecksum;
135 path = null;
136 link = null;
137 hfs = null;
138 useHBaseChecksumConfigured = useHBaseChecksum = false;
139 }
140
141 /**
142 * @return Whether we are presently using HBase checksum.
143 */
144 public boolean shouldUseHBaseChecksum() {
145 return this.useHBaseChecksum;
146 }
147
148 /**
149 * Get the stream to use. Thread-safe.
150 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned
151 * at some point in the past, otherwise the result is undefined.
152 */
153 public FSDataInputStream getStream(boolean useHBaseChecksum) {
154 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
155 }
156
157 /**
158 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
159 * @param offCount For how many checksumOk calls to turn off the HBase checksum.
160 */
161 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
162 // checksumOffCount is speculative, but let's try to reset it less.
163 boolean partOfConvoy = false;
164 if (this.stream == null) {
165 synchronized (streamNoFsChecksumFirstCreateLock) {
166 partOfConvoy = (this.stream != null);
167 if (!partOfConvoy) {
168 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
169 }
170 }
171 }
172 if (!partOfConvoy) {
173 this.useHBaseChecksum = false;
174 this.hbaseChecksumOffCount = offCount;
175 }
176 return this.stream;
177 }
178
179 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
180 public void checksumOk() {
181 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
182 && (this.hbaseChecksumOffCount-- < 0)) {
183 // The stream we need is already open (because we were using HBase checksum in the past).
184 assert this.streamNoFsChecksum != null;
185 this.useHBaseChecksum = true;
186 }
187 }
188
189 /** Close stream(s) if necessary. */
190 public void close() throws IOException {
191 if (!doCloseStreams) return;
192 try {
193 if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
194 streamNoFsChecksum.close();
195 streamNoFsChecksum = null;
196 }
197 } finally {
198 if (stream != null) {
199 stream.close();
200 stream = null;
201 }
202 }
203 }
204
205 public HFileSystem getHfs() {
206 return this.hfs;
207 }
208 }