1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collection;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.FileNotFoundException;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.fs.PositionedReadable;
37  import org.apache.hadoop.fs.Seekable;
38  import org.apache.hadoop.hbase.util.FSUtils;
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88  
89  
90  @InterfaceAudience.Private
91  public class FileLink {
92    private static final Log LOG = LogFactory.getLog(FileLink.class);
93  
94    
95    public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
96  
97    
98  
99  
100 
101   private static class FileLinkInputStream extends InputStream
102       implements Seekable, PositionedReadable {
103     private FSDataInputStream in = null;
104     private Path currentPath = null;
105     private long pos = 0;
106 
107     private final FileLink fileLink;
108     private final int bufferSize;
109     private final FileSystem fs;
110 
111     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
112         throws IOException {
113       this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
114     }
115 
116     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
117         throws IOException {
118       this.bufferSize = bufferSize;
119       this.fileLink = fileLink;
120       this.fs = fs;
121 
122       this.in = tryOpen();
123     }
124 
125     @Override
126     public int read() throws IOException {
127       int res;
128       try {
129         res = in.read();
130       } catch (FileNotFoundException e) {
131         res = tryOpen().read();
132       } catch (NullPointerException e) { 
133         res = tryOpen().read();
134       } catch (AssertionError e) { 
135         res = tryOpen().read();
136       }
137       if (res > 0) pos += 1;
138       return res;
139     }
140 
141     @Override
142     public int read(byte[] b) throws IOException {
143        return read(b, 0, b.length);
144     }
145 
146     @Override
147     public int read(byte[] b, int off, int len) throws IOException {
148       int n;
149       try {
150         n = in.read(b, off, len);
151       } catch (FileNotFoundException e) {
152         n = tryOpen().read(b, off, len);
153       } catch (NullPointerException e) { 
154         n = tryOpen().read(b, off, len);
155       } catch (AssertionError e) { 
156         n = tryOpen().read(b, off, len);
157       }
158       if (n > 0) pos += n;
159       assert(in.getPos() == pos);
160       return n;
161     }
162 
163     @Override
164     public int read(long position, byte[] buffer, int offset, int length) throws IOException {
165       int n;
166       try {
167         n = in.read(position, buffer, offset, length);
168       } catch (FileNotFoundException e) {
169         n = tryOpen().read(position, buffer, offset, length);
170       } catch (NullPointerException e) { 
171         n = tryOpen().read(position, buffer, offset, length);
172       } catch (AssertionError e) { 
173         n = tryOpen().read(position, buffer, offset, length);
174       }
175       return n;
176     }
177 
178     @Override
179     public void readFully(long position, byte[] buffer) throws IOException {
180       readFully(position, buffer, 0, buffer.length);
181     }
182 
183     @Override
184     public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
185       try {
186         in.readFully(position, buffer, offset, length);
187       } catch (FileNotFoundException e) {
188         tryOpen().readFully(position, buffer, offset, length);
189       } catch (NullPointerException e) { 
190         tryOpen().readFully(position, buffer, offset, length);
191       } catch (AssertionError e) { 
192         tryOpen().readFully(position, buffer, offset, length);
193       }
194     }
195 
196     @Override
197     public long skip(long n) throws IOException {
198       long skipped;
199 
200       try {
201         skipped = in.skip(n);
202       } catch (FileNotFoundException e) {
203         skipped = tryOpen().skip(n);
204       } catch (NullPointerException e) { 
205         skipped = tryOpen().skip(n);
206       } catch (AssertionError e) { 
207         skipped = tryOpen().skip(n);
208       }
209 
210       if (skipped > 0) pos += skipped;
211       return skipped;
212     }
213 
214     @Override
215     public int available() throws IOException {
216       try {
217         return in.available();
218       } catch (FileNotFoundException e) {
219         return tryOpen().available();
220       } catch (NullPointerException e) { 
221         return tryOpen().available();
222       } catch (AssertionError e) { 
223         return tryOpen().available();
224       }
225     }
226 
227     @Override
228     public void seek(long pos) throws IOException {
229       try {
230         in.seek(pos);
231       } catch (FileNotFoundException e) {
232         tryOpen().seek(pos);
233       } catch (NullPointerException e) { 
234         tryOpen().seek(pos);
235       } catch (AssertionError e) { 
236         tryOpen().seek(pos);
237       }
238       this.pos = pos;
239     }
240 
241     @Override
242     public long getPos() throws IOException {
243       return pos;
244     }
245 
246     @Override
247     public boolean seekToNewSource(long targetPos) throws IOException {
248       boolean res;
249       try {
250         res = in.seekToNewSource(targetPos);
251       } catch (FileNotFoundException e) {
252         res = tryOpen().seekToNewSource(targetPos);
253       } catch (NullPointerException e) { 
254         res = tryOpen().seekToNewSource(targetPos);
255       } catch (AssertionError e) { 
256         res = tryOpen().seekToNewSource(targetPos);
257       }
258       if (res) pos = targetPos;
259       return res;
260     }
261 
262     @Override
263     public void close() throws IOException {
264       in.close();
265     }
266 
267     @Override
268     public synchronized void mark(int readlimit) {
269     }
270 
271     @Override
272     public synchronized void reset() throws IOException {
273       throw new IOException("mark/reset not supported");
274     }
275 
276     @Override
277     public boolean markSupported() {
278       return false;
279     }
280 
281     
282 
283 
284 
285 
286 
287     private FSDataInputStream tryOpen() throws IOException {
288       for (Path path: fileLink.getLocations()) {
289         if (path.equals(currentPath)) continue;
290         try {
291           in = fs.open(path, bufferSize);
292           if (pos != 0) in.seek(pos);
293           assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
294           if (LOG.isTraceEnabled()) {
295             if (currentPath == null) {
296               LOG.debug("link open path=" + path);
297             } else {
298               LOG.trace("link switch from path=" + currentPath + " to path=" + path);
299             }
300           }
301           currentPath = path;
302           return(in);
303         } catch (FileNotFoundException e) {
304           
305         }
306       }
307       throw new FileNotFoundException("Unable to open link: " + fileLink);
308     }
309   }
310 
311   private Path[] locations = null;
312 
313   protected FileLink() {
314     this.locations = null;
315   }
316 
317   
318 
319 
320 
321   public FileLink(Path originPath, Path... alternativePaths) {
322     setLocations(originPath, alternativePaths);
323   }
324 
325   
326 
327 
328   public FileLink(final Collection<Path> locations) {
329     this.locations = locations.toArray(new Path[locations.size()]);
330   }
331 
332   
333 
334 
335   public Path[] getLocations() {
336     return locations;
337   }
338 
339   @Override
340   public String toString() {
341     StringBuilder str = new StringBuilder(getClass().getName());
342     str.append(" locations=[");
343     for (int i = 0; i < locations.length; ++i) {
344       if (i > 0) str.append(", ");
345       str.append(locations[i].toString());
346     }
347     str.append("]");
348     return str.toString();
349   }
350 
351   
352 
353 
354   public boolean exists(final FileSystem fs) throws IOException {
355     for (int i = 0; i < locations.length; ++i) {
356       if (fs.exists(locations[i])) {
357         return true;
358       }
359     }
360     return false;
361   }
362 
363   
364 
365 
366   public Path getAvailablePath(FileSystem fs) throws IOException {
367     for (int i = 0; i < locations.length; ++i) {
368       if (fs.exists(locations[i])) {
369         return locations[i];
370       }
371     }
372     throw new FileNotFoundException("Unable to open link: " + this);
373   }
374 
375   
376 
377 
378 
379 
380 
381 
382   public FileStatus getFileStatus(FileSystem fs) throws IOException {
383     for (int i = 0; i < locations.length; ++i) {
384       try {
385         return fs.getFileStatus(locations[i]);
386       } catch (FileNotFoundException e) {
387         
388       }
389     }
390     throw new FileNotFoundException("Unable to open link: " + this);
391   }
392 
393   
394 
395 
396 
397 
398 
399 
400 
401 
402 
403   public FSDataInputStream open(final FileSystem fs) throws IOException {
404     return new FSDataInputStream(new FileLinkInputStream(fs, this));
405   }
406 
407   
408 
409 
410 
411 
412 
413 
414 
415 
416 
417 
418   public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
419     return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
420   }
421 
422   
423 
424 
425 
426   protected void setLocations(Path originPath, Path... alternativePaths) {
427     assert this.locations == null : "Link locations already set";
428 
429     List<Path> paths = new ArrayList<Path>(alternativePaths.length +1);
430     if (originPath != null) {
431       paths.add(originPath);
432     }
433 
434     for (int i = 0; i < alternativePaths.length; i++) {
435       if (alternativePaths[i] != null) {
436         paths.add(alternativePaths[i]);
437       }
438     }
439     this.locations = paths.toArray(new Path[0]);
440   }
441 
442   
443 
444 
445 
446 
447 
448 
449 
450 
451 
452   public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
453     return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
454   }
455 
456   
457 
458 
459 
460 
461 
462   public static String getBackReferenceFileName(final Path dirPath) {
463     return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
464   }
465 
466   
467 
468 
469 
470 
471 
472   public static boolean isBackReferencesDir(final Path dirPath) {
473     if (dirPath == null) return false;
474     return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
475   }
476 
477   @Override
478   public boolean equals(Object obj) {
479     if (obj == null) {
480       return false;
481     }
482     
483     
484     
485     if (this.getClass().equals(obj.getClass())) {
486       return Arrays.equals(this.locations, ((FileLink) obj).locations);
487     }
488 
489     return false;
490   }
491 
492   @Override
493   public int hashCode() {
494     return Arrays.hashCode(locations);
495   }
496 }
497