1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Method;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.client.ResultScanner;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.client.ScannerCallable;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.mapreduce.Counter;
39  import org.apache.hadoop.mapreduce.InputSplit;
40  import org.apache.hadoop.mapreduce.TaskAttemptContext;
41  import org.apache.hadoop.util.StringUtils;
42  
43  
44  
45  
46  
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class TableRecordReaderImpl {
50    public static final String LOG_PER_ROW_COUNT
51      = "hbase.mapreduce.log.scanner.rowcount";
52  
53    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
54  
55    
56    private static final String HBASE_COUNTER_GROUP_NAME =
57      "HBase Counters";
58    private ResultScanner scanner = null;
59    private Scan scan = null;
60    private Scan currentScan = null;
61    private Table htable = null;
62    private byte[] lastSuccessfulRow = null;
63    private ImmutableBytesWritable key = null;
64    private Result value = null;
65    private TaskAttemptContext context = null;
66    private Method getCounter = null;
67    private long numRestarts = 0;
68    private long numStale = 0;
69    private long timestamp;
70    private int rowcount;
71    private boolean logScannerActivity = false;
72    private int logPerRowCount = 100;
73  
74    
75  
76  
77  
78  
79  
80    public void restart(byte[] firstRow) throws IOException {
81      currentScan = new Scan(scan);
82      currentScan.setStartRow(firstRow);
83      currentScan.setScanMetricsEnabled(true);
84      if (this.scanner != null) {
85        if (logScannerActivity) {
86          LOG.info("Closing the previously opened scanner object.");
87        }
88        this.scanner.close();
89      }
90      this.scanner = this.htable.getScanner(currentScan);
91      if (logScannerActivity) {
92        LOG.info("Current scan=" + currentScan.toString());
93        timestamp = System.currentTimeMillis();
94        rowcount = 0;
95      }
96    }
97  
98    
99  
100 
101 
102 
103 
104   protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
105   throws IOException {
106     Method m = null;
107     try {
108       m = context.getClass().getMethod("getCounter",
109         new Class [] {String.class, String.class});
110     } catch (SecurityException e) {
111       throw new IOException("Failed test for getCounter", e);
112     } catch (NoSuchMethodException e) {
113       
114     }
115     return m;
116   }
117 
118   
119 
120 
121 
122 
123   public void setHTable(Table htable) {
124     Configuration conf = htable.getConfiguration();
125     logScannerActivity = conf.getBoolean(
126       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
127     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
128     this.htable = htable;
129   }
130 
131   
132 
133 
134 
135 
136   public void setScan(Scan scan) {
137     this.scan = scan;
138   }
139 
140   
141 
142 
143 
144 
145   public void initialize(InputSplit inputsplit,
146       TaskAttemptContext context) throws IOException,
147       InterruptedException {
148     if (context != null) {
149       this.context = context;
150       getCounter = retrieveGetCounterWithStringsParams(context);
151     }
152     restart(scan.getStartRow());
153   }
154 
155   
156 
157 
158 
159 
160   public void close() {
161     this.scanner.close();
162     try {
163       this.htable.close();
164     } catch (IOException ioe) {
165       LOG.warn("Error closing table", ioe);
166     }
167   }
168 
169   
170 
171 
172 
173 
174 
175 
176   public ImmutableBytesWritable getCurrentKey() throws IOException,
177       InterruptedException {
178     return key;
179   }
180 
181   
182 
183 
184 
185 
186 
187 
188   public Result getCurrentValue() throws IOException, InterruptedException {
189     return value;
190   }
191 
192 
193   
194 
195 
196 
197 
198 
199 
200   public boolean nextKeyValue() throws IOException, InterruptedException {
201     if (key == null) key = new ImmutableBytesWritable();
202     if (value == null) value = new Result();
203     try {
204       try {
205         value = this.scanner.next();
206         if (value != null && value.isStale()) numStale++;
207         if (logScannerActivity) {
208           rowcount ++;
209           if (rowcount >= logPerRowCount) {
210             long now = System.currentTimeMillis();
211             LOG.info("Mapper took " + (now-timestamp)
212               + "ms to process " + rowcount + " rows");
213             timestamp = now;
214             rowcount = 0;
215           }
216         }
217       } catch (IOException e) {
218         
219         if (e instanceof DoNotRetryIOException) {
220           throw e;
221         }
222         
223         
224         LOG.info("recovered from " + StringUtils.stringifyException(e));
225         if (lastSuccessfulRow == null) {
226           LOG.warn("We are restarting the first next() invocation," +
227               " if your mapper has restarted a few other times like this" +
228               " then you should consider killing this job and investigate" +
229               " why it's taking so long.");
230         }
231         if (lastSuccessfulRow == null) {
232           restart(scan.getStartRow());
233         } else {
234           restart(lastSuccessfulRow);
235           scanner.next();    
236         }
237         value = scanner.next();
238         if (value != null && value.isStale()) numStale++;
239         numRestarts++;
240       }
241       if (value != null && value.size() > 0) {
242         key.set(value.getRow());
243         lastSuccessfulRow = key.get();
244         return true;
245       }
246 
247       updateCounters();
248       return false;
249     } catch (IOException ioe) {
250       if (logScannerActivity) {
251         long now = System.currentTimeMillis();
252         LOG.info("Mapper took " + (now-timestamp)
253           + "ms to process " + rowcount + " rows");
254         LOG.info(ioe);
255         String lastRow = lastSuccessfulRow == null ?
256           "null" : Bytes.toStringBinary(lastSuccessfulRow);
257         LOG.info("lastSuccessfulRow=" + lastRow);
258       }
259       throw ioe;
260     }
261   }
262 
263   
264 
265 
266 
267 
268 
269 
270   private void updateCounters() throws IOException {
271     ScanMetrics scanMetrics = this.scan.getScanMetrics();
272     if (scanMetrics == null) {
273       return;
274     }
275 
276     updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
277   }
278 
279   protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
280       Method getCounter, TaskAttemptContext context, long numStale) {
281     
282     if (getCounter == null) {
283       return;
284     }
285 
286     try {
287       for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
288         Counter ct = (Counter)getCounter.invoke(context,
289             HBASE_COUNTER_GROUP_NAME, entry.getKey());
290 
291         ct.increment(entry.getValue());
292       }
293       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
294           "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
295       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
296           "NUM_SCAN_RESULTS_STALE")).increment(numStale);
297     } catch (Exception e) {
298       LOG.debug("can't update counter." + StringUtils.stringifyException(e));
299     }
300   }
301 
302   
303 
304 
305 
306 
307   public float getProgress() {
308     
309     return 0;
310   }
311 
312 }