1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.coprocessor;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.CellUtil;
28  import org.apache.hadoop.hbase.Coprocessor;
29  import org.apache.hadoop.hbase.CoprocessorEnvironment;
30  import org.apache.hadoop.hbase.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
34  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.SumRequest;
35  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.SumResponse;
36  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37  import org.apache.hadoop.hbase.regionserver.InternalScanner;
38  import org.apache.hadoop.hbase.regionserver.Region;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  import com.google.protobuf.RpcCallback;
42  import com.google.protobuf.RpcController;
43  import com.google.protobuf.Service;
44  
45  
46  
47  
48  
49  
50  public class ColumnAggregationEndpointWithErrors
51      extends
52      ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
53  implements Coprocessor, CoprocessorService  {
54    static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointWithErrors.class);
55    private RegionCoprocessorEnvironment env = null;
56    @Override
57    public Service getService() {
58      return this;
59    }
60  
61    @Override
62    public void start(CoprocessorEnvironment env) throws IOException {
63      if (env instanceof RegionCoprocessorEnvironment) {
64        this.env = (RegionCoprocessorEnvironment)env;
65        return;
66      }
67      throw new CoprocessorException("Must be loaded on a table region!");
68    }
69  
70    @Override
71    public void stop(CoprocessorEnvironment env) throws IOException {
72      
73    }
74  
75    @Override
76    public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
77      
78      Scan scan = new Scan();
79      
80      byte[] family = request.getFamily().toByteArray();
81      byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
82      if (request.hasQualifier()) {
83        scan.addColumn(family, qualifier);
84      } else {
85        scan.addFamily(family);
86      }
87      int sumResult = 0;
88      InternalScanner scanner = null;
89      try {
90        Region region = this.env.getRegion();
91        
92        if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
93          throw new DoNotRetryIOException("An expected exception");
94        }
95        scanner = region.getScanner(scan);
96        List<Cell> curVals = new ArrayList<Cell>();
97        boolean hasMore = false;
98        do {
99          curVals.clear();
100         hasMore = scanner.next(curVals);
101         for (Cell kv : curVals) {
102           if (CellUtil.matchingQualifier(kv, qualifier)) {
103             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
104           }
105         }
106       } while (hasMore);
107     } catch (IOException e) {
108       ResponseConverter.setControllerException(controller, e);
109       
110       sumResult = -1;
111       LOG.info("Setting sum result to -1 to indicate error", e);
112     } finally {
113       if (scanner != null) {
114         try {
115           scanner.close();
116         } catch (IOException e) {
117           ResponseConverter.setControllerException(controller, e);
118           sumResult = -1;
119           LOG.info("Setting sum result to -1 to indicate error", e);
120         }
121       }
122     }
123     done.run(SumResponse.newBuilder().setSum(sumResult).build());
124   }
125 }