1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.nio.charset.StandardCharsets;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.KeyValue.Type;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
38  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
40  import org.apache.hadoop.hbase.testclassification.SmallTests;
41  import org.junit.After;
42  import org.junit.Before;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  import org.mockito.Mockito;
46  import org.mockito.invocation.InvocationOnMock;
47  import org.mockito.stubbing.Answer;
48  
49  
50  
51  
52  @Category(SmallTests.class)
53  public class TestClientSmallReversedScanner {
54  
55    Scan scan;
56    ExecutorService pool;
57    Configuration conf;
58  
59    ClusterConnection clusterConn;
60    RpcRetryingCallerFactory rpcFactory;
61    RpcControllerFactory controllerFactory;
62    RpcRetryingCaller<Result[]> caller;
63  
64    @Before
65    @SuppressWarnings({"deprecation", "unchecked"})
66    public void setup() throws IOException {
67      clusterConn = Mockito.mock(ClusterConnection.class);
68      rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
69      controllerFactory = Mockito.mock(RpcControllerFactory.class);
70      pool = Executors.newSingleThreadExecutor();
71      scan = new Scan();
72      conf = new Configuration();
73      Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
74      
75      caller = Mockito.mock(RpcRetryingCaller.class);
76      
77      Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
78    }
79  
80    @After
81    public void teardown() {
82      if (null != pool) {
83        pool.shutdownNow();
84      }
85    }
86  
87    
88  
89  
90    private Answer<Boolean> createTrueThenFalseAnswer() {
91      return new Answer<Boolean>() {
92        boolean first = true;
93  
94        @Override
95        public Boolean answer(InvocationOnMock invocation) {
96          if (first) {
97            first = false;
98            return true;
99          }
100         return false;
101       }
102     };
103   }
104 
105   private SmallScannerCallableFactory getFactory(
106       final ScannerCallableWithReplicas callableWithReplicas) {
107     return new SmallScannerCallableFactory() {
108       @Override
109       public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
110           Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
111           RpcControllerFactory controllerFactory, ExecutorService pool,
112           int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
113           RpcRetryingCaller<Result[]> caller) {
114         return callableWithReplicas;
115       }
116     };
117   }
118 
119   @Test
120   public void testContextPresent() throws Exception {
121     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
122         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
123         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
124         Type.Maximum);
125 
126     ScannerCallableWithReplicas callableWithReplicas = Mockito
127         .mock(ScannerCallableWithReplicas.class);
128 
129     
130     @SuppressWarnings("unchecked")
131     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
132     
133     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
134 
135     
136     
137 
138     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
139 
140     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
141         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
142         Integer.MAX_VALUE)) {
143 
144       csrs.setScannerCallableFactory(factory);
145 
146       
147       Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
148           .thenAnswer(new Answer<Result[]>() {
149             int count = 0;
150 
151             @Override
152             public Result[] answer(InvocationOnMock invocation) {
153               Result[] results;
154               if (0 == count) {
155                 results = new Result[] {Result.create(new Cell[] {kv3}),
156                     Result.create(new Cell[] {kv2})};
157               } else if (1 == count) {
158                 results = new Result[] {Result.create(new Cell[] {kv1})};
159               } else {
160                 results = new Result[0];
161               }
162               count++;
163               return results;
164             }
165           });
166 
167       
168       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
169       
170       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
171           createTrueThenFalseAnswer());
172 
173       
174       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
175       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
176       
177       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
178 
179       csrs.loadCache();
180 
181       List<Result> results = csrs.cache;
182       Iterator<Result> iter = results.iterator();
183       assertEquals(3, results.size());
184       for (int i = 3; i >= 1 && iter.hasNext(); i--) {
185         Result result = iter.next();
186         byte[] row = result.getRow();
187         assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
188         assertEquals(1, result.getMap().size());
189       }
190       assertTrue(csrs.closed);
191     }
192   }
193 
194   @Test
195   public void testNoContextFewerRecords() throws Exception {
196     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
197         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
198         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
199         Type.Maximum);
200 
201     ScannerCallableWithReplicas callableWithReplicas = Mockito
202         .mock(ScannerCallableWithReplicas.class);
203 
204     
205     scan.setCaching(2);
206 
207     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
208 
209     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
210         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
211         Integer.MAX_VALUE)) {
212 
213       csrs.setScannerCallableFactory(factory);
214 
215       
216       Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
217           .thenAnswer(new Answer<Result[]>() {
218             int count = 0;
219 
220             @Override
221             public Result[] answer(InvocationOnMock invocation) {
222               Result[] results;
223               if (0 == count) {
224                 results = new Result[] {Result.create(new Cell[] {kv3}),
225                     Result.create(new Cell[] {kv2})};
226               } else if (1 == count) {
227                 
228                 results = new Result[] {Result.create(new Cell[] {kv1})};
229               } else {
230                 throw new RuntimeException("Should not fetch a third batch from the server");
231               }
232               count++;
233               return results;
234             }
235           });
236 
237       
238       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
239       
240       Mockito.when(callableWithReplicas.getServerHasMoreResults())
241           .thenThrow(new RuntimeException("Should not be called"));
242 
243       
244       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
245       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
246       
247       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
248 
249       csrs.loadCache();
250 
251       List<Result> results = csrs.cache;
252       Iterator<Result> iter = results.iterator();
253       assertEquals(2, results.size());
254       for (int i = 3; i >= 2 && iter.hasNext(); i--) {
255         Result result = iter.next();
256         byte[] row = result.getRow();
257         assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
258         assertEquals(1, result.getMap().size());
259       }
260 
261       
262       results.clear();
263 
264       csrs.loadCache();
265 
266       assertEquals(1, results.size());
267       Result result = results.get(0);
268       assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8));
269       assertEquals(1, result.getMap().size());
270 
271       assertTrue(csrs.closed);
272     }
273   }
274 
275   @Test
276   public void testNoContextNoRecords() throws Exception {
277     ScannerCallableWithReplicas callableWithReplicas = Mockito
278         .mock(ScannerCallableWithReplicas.class);
279 
280     
281     scan.setCaching(2);
282 
283     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
284 
285     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
286         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
287         Integer.MAX_VALUE)) {
288 
289       csrs.setScannerCallableFactory(factory);
290 
291       
292       Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
293           .thenReturn(new Result[0]);
294 
295       
296       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
297       
298       Mockito.when(callableWithReplicas.getServerHasMoreResults())
299           .thenThrow(new RuntimeException("Should not be called"));
300 
301       
302       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
303       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
304       
305       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
306 
307       csrs.loadCache();
308 
309       assertEquals(0, csrs.cache.size());
310       assertTrue(csrs.closed);
311     }
312   }
313 
314   @Test
315   public void testContextNoRecords() throws Exception {
316     ScannerCallableWithReplicas callableWithReplicas = Mockito
317         .mock(ScannerCallableWithReplicas.class);
318 
319     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
320 
321     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
322         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
323         Integer.MAX_VALUE)) {
324 
325       csrs.setScannerCallableFactory(factory);
326 
327       
328       Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
329           .thenReturn(new Result[0]);
330 
331       
332       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
333       
334       Mockito.when(callableWithReplicas.getServerHasMoreResults())
335           .thenReturn(false);
336 
337       
338       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
339       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
340       
341       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
342 
343       csrs.loadCache();
344 
345       assertEquals(0, csrs.cache.size());
346       assertTrue(csrs.closed);
347     }
348   }
349 }