1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.Collections;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configurable;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.RegionLocator;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.mapreduce.InputSplit;
39  import org.apache.hadoop.mapreduce.JobContext;
40  import org.apache.hadoop.hbase.util.Pair;
41  import org.apache.hadoop.mapreduce.Job;
42  import org.apache.hadoop.util.StringUtils;
43  
44  
45  
46  
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class TableInputFormat extends TableInputFormatBase
50  implements Configurable {
51  
52    @SuppressWarnings("hiding")
53    private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
54  
55    
56    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
57    
58  
59  
60  
61    private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
62    
63  
64  
65    public static final String SCAN = "hbase.mapreduce.scan";
66    
67    public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
68    
69    public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
70    
71    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
72    
73    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
74    
75    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
76    
77    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
78    
79    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
80    
81    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
82    
83    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
84    
85    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
86    
87    public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
88    
89    public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
90  
91    
92    private Configuration conf = null;
93  
94    
95  
96  
97  
98  
99  
100   @Override
101   public Configuration getConf() {
102     return conf;
103   }
104 
105   
106 
107 
108 
109 
110 
111 
112 
113   @Override
114   public void setConf(Configuration configuration) {
115     this.conf = configuration;
116 
117     Scan scan = null;
118 
119     if (conf.get(SCAN) != null) {
120       try {
121         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
122       } catch (IOException e) {
123         LOG.error("An error occurred.", e);
124       }
125     } else {
126       try {
127         scan = new Scan();
128 
129         if (conf.get(SCAN_ROW_START) != null) {
130           scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
131         }
132 
133         if (conf.get(SCAN_ROW_STOP) != null) {
134           scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
135         }
136 
137         if (conf.get(SCAN_COLUMNS) != null) {
138           addColumns(scan, conf.get(SCAN_COLUMNS));
139         }
140 
141         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
142           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
143         }
144 
145         if (conf.get(SCAN_TIMESTAMP) != null) {
146           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
147         }
148 
149         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
150           scan.setTimeRange(
151               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
152               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
153         }
154 
155         if (conf.get(SCAN_MAXVERSIONS) != null) {
156           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
157         }
158 
159         if (conf.get(SCAN_CACHEDROWS) != null) {
160           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
161         }
162 
163         if (conf.get(SCAN_BATCHSIZE) != null) {
164           scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
165         }
166 
167         
168         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
169       } catch (Exception e) {
170           LOG.error(StringUtils.stringifyException(e));
171       }
172     }
173 
174     setScan(scan);
175   }
176 
177   @Override
178   protected void initialize(JobContext context) throws IOException {
179     
180     
181     TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
182     try {
183       initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
184     } catch (Exception e) {
185       LOG.error(StringUtils.stringifyException(e));
186     }
187   }
188 
189   
190 
191 
192 
193 
194 
195 
196 
197 
198   private static void addColumn(Scan scan, byte[] familyAndQualifier) {
199     byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
200     if (fq.length == 1) {
201       scan.addFamily(fq[0]);
202     } else if (fq.length == 2) {
203       scan.addColumn(fq[0], fq[1]);
204     } else {
205       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
206     }
207   }
208 
209   
210 
211 
212 
213 
214 
215 
216 
217 
218 
219   public static void addColumns(Scan scan, byte [][] columns) {
220     for (byte[] column : columns) {
221       addColumn(scan, column);
222     }
223   }
224 
225   
226 
227 
228 
229 
230 
231 
232 
233 
234 
235   @Override
236   public List<InputSplit> getSplits(JobContext context) throws IOException {
237     List<InputSplit> splits = super.getSplits(context);
238     if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
239       Collections.shuffle(splits);
240     }
241     return splits;
242   }
243 
244   
245 
246 
247 
248 
249 
250   private static void addColumns(Scan scan, String columns) {
251     String[] cols = columns.split(" ");
252     for (String col : cols) {
253       addColumn(scan, Bytes.toBytes(col));
254     }
255   }
256 
257   @Override
258   protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
259     if (conf.get(SPLIT_TABLE) != null) {
260       TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
261       try (Connection conn = ConnectionFactory.createConnection(getConf())) {
262         try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
263           return rl.getStartEndKeys();
264         }
265       }
266     }
267 
268     return super.getStartEndKeys();
269   }
270 
271   
272 
273 
274   public static void configureSplitTable(Job job, TableName tableName) {
275     job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
276   }
277 }