1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.wal.WAL;
37  import org.apache.hadoop.hbase.wal.WAL.Entry;
38  import org.apache.hadoop.hbase.wal.WAL.Reader;
39  import org.apache.hadoop.hbase.wal.WALFactory;
40  import org.apache.hadoop.hbase.wal.WALKey;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.mapreduce.InputFormat;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.JobContext;
45  import org.apache.hadoop.mapreduce.RecordReader;
46  import org.apache.hadoop.mapreduce.TaskAttemptContext;
47  import org.apache.hadoop.util.StringUtils;
48  
49  
50  
51  
52  @InterfaceAudience.Public
53  public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
54    private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
55  
56    public static final String START_TIME_KEY = "wal.start.time";
57    public static final String END_TIME_KEY = "wal.end.time";
58  
59    
60  
61  
62  
63    static class WALSplit extends InputSplit implements Writable {
64      private String logFileName;
65      private long fileSize;
66      private long startTime;
67      private long endTime;
68  
69      
70      public WALSplit() {}
71  
72      
73  
74  
75  
76  
77  
78  
79  
80  
81      public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
82        this.logFileName = logFileName;
83        this.fileSize = fileSize;
84        this.startTime = startTime;
85        this.endTime = endTime;
86      }
87  
88      @Override
89      public long getLength() throws IOException, InterruptedException {
90        return fileSize;
91      }
92  
93      @Override
94      public String[] getLocations() throws IOException, InterruptedException {
95        
96        return new String[] {};
97      }
98  
99      public String getLogFileName() {
100       return logFileName;
101     }
102 
103     public long getStartTime() {
104       return startTime;
105     }
106 
107     public long getEndTime() {
108       return endTime;
109     }
110 
111     @Override
112     public void readFields(DataInput in) throws IOException {
113       logFileName = in.readUTF();
114       fileSize = in.readLong();
115       startTime = in.readLong();
116       endTime = in.readLong();
117     }
118 
119     @Override
120     public void write(DataOutput out) throws IOException {
121       out.writeUTF(logFileName);
122       out.writeLong(fileSize);
123       out.writeLong(startTime);
124       out.writeLong(endTime);
125     }
126 
127     @Override
128     public String toString() {
129       return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
130     }
131   }
132 
133   
134 
135 
136 
137   static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
138     private Reader reader = null;
139     
140     Entry currentEntry = new Entry();
141     private long startTime;
142     private long endTime;
143 
144     @Override
145     public void initialize(InputSplit split, TaskAttemptContext context)
146         throws IOException, InterruptedException {
147       WALSplit hsplit = (WALSplit)split;
148       Path logFile = new Path(hsplit.getLogFileName());
149       Configuration conf = context.getConfiguration();
150       LOG.info("Opening reader for "+split);
151       try {
152         this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
153       } catch (EOFException x) {
154         LOG.info("Ignoring corrupted WAL file: " + logFile
155             + " (This is normal when a RegionServer crashed.)");
156         this.reader = null;
157       }
158       this.startTime = hsplit.getStartTime();
159       this.endTime = hsplit.getEndTime();
160     }
161 
162     @Override
163     public boolean nextKeyValue() throws IOException, InterruptedException {
164       if (reader == null) return false;
165 
166       Entry temp;
167       long i = -1;
168       do {
169         
170         try {
171           temp = reader.next(currentEntry);
172           i++;
173         } catch (EOFException x) {
174           LOG.info("Corrupted entry detected. Ignoring the rest of the file."
175               + " (This is normal when a RegionServer crashed.)");
176           return false;
177         }
178       }
179       while(temp != null && temp.getKey().getWriteTime() < startTime);
180 
181       if (temp == null) {
182         if (i > 0) LOG.info("Skipped " + i + " entries.");
183         LOG.info("Reached end of file.");
184         return false;
185       } else if (i > 0) {
186         LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
187       }
188       boolean res = temp.getKey().getWriteTime() <= endTime;
189       if (!res) {
190         LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
191       }
192       return res;
193     }
194 
195     @Override
196     public WALEdit getCurrentValue() throws IOException, InterruptedException {
197       return currentEntry.getEdit();
198     }
199 
200     @Override
201     public float getProgress() throws IOException, InterruptedException {
202       
203       return 0;
204     }
205 
206     @Override
207     public void close() throws IOException {
208       LOG.info("Closing reader");
209       if (reader != null) this.reader.close();
210     }
211   }
212 
213   
214 
215 
216 
217   static class WALKeyRecordReader extends WALRecordReader<WALKey> {
218     @Override
219     public WALKey getCurrentKey() throws IOException, InterruptedException {
220       return currentEntry.getKey();
221     }
222   }
223 
224   @Override
225   public List<InputSplit> getSplits(JobContext context) throws IOException,
226       InterruptedException {
227     return getSplits(context, START_TIME_KEY, END_TIME_KEY);
228   }
229 
230   
231 
232 
233   List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
234       throws IOException, InterruptedException {
235     Configuration conf = context.getConfiguration();
236     
237     Path[] inputPaths = getInputPaths(conf);
238 
239     long startTime = conf.getLong(startKey, Long.MIN_VALUE);
240     long endTime = conf.getLong(endKey, Long.MAX_VALUE);
241 
242     List<FileStatus> allFiles = new ArrayList<FileStatus>();
243     for(Path inputPath: inputPaths){
244       FileSystem fs = inputPath.getFileSystem(conf);
245       List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
246       allFiles.addAll(files);
247     }
248     List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
249     for (FileStatus file : allFiles) {
250       splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
251     }
252     return splits;
253   }
254 
255   private Path[] getInputPaths(Configuration conf) {
256     String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir");
257     return StringUtils.stringToPath(inpDirs.split(","));
258   }
259 
260   private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
261       throws IOException {
262     List<FileStatus> result = new ArrayList<FileStatus>();
263     LOG.debug("Scanning " + dir.toString() + " for WAL files");
264 
265     FileStatus[] files = fs.listStatus(dir);
266     if (files == null) return Collections.emptyList();
267     for (FileStatus file : files) {
268       if (file.isDirectory()) {
269         
270         result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
271       } else {
272         String name = file.getPath().toString();
273         int idx = name.lastIndexOf('.');
274         if (idx > 0) {
275           try {
276             long fileStartTime = Long.parseLong(name.substring(idx+1));
277             if (fileStartTime <= endTime) {
278               LOG.info("Found: " + name);
279               result.add(file);
280             }
281           } catch (NumberFormatException x) {
282             idx = 0;
283           }
284         }
285         if (idx == 0) {
286           LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
287         }
288       }
289     }
290     return result;
291   }
292 
293   @Override
294   public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
295       TaskAttemptContext context) throws IOException, InterruptedException {
296     return new WALKeyRecordReader();
297   }
298 }