1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.*;
31  import org.apache.hadoop.hbase.filter.Filter;
32  import org.apache.hadoop.hbase.filter.TimestampsFilter;
33  import org.apache.hadoop.hbase.testclassification.MediumTests;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.junit.After;
36  import org.junit.AfterClass;
37  import org.junit.Before;
38  import org.junit.BeforeClass;
39  import org.junit.Test;
40  import org.junit.experimental.categories.Category;
41  
42  
43  
44  
45  
46  
47  @Category(MediumTests.class)
48  public class TestTimestampsFilter {
49    final Log LOG = LogFactory.getLog(getClass());
50    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
51  
52    
53  
54  
55    @BeforeClass
56    public static void setUpBeforeClass() throws Exception {
57      TEST_UTIL.startMiniCluster();
58    }
59  
60    
61  
62  
63    @AfterClass
64    public static void tearDownAfterClass() throws Exception {
65      TEST_UTIL.shutdownMiniCluster();
66    }
67  
68    
69  
70  
71    @Before
72    public void setUp() throws Exception {
73      
74    }
75  
76    
77  
78  
79    @After
80    public void tearDown() throws Exception {
81      
82    }
83  
84    
85  
86  
87  
88  
89  
90  
91  
92    @Test
93    public void testTimestampsFilter() throws Exception {
94      byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
95      byte [] FAMILY = Bytes.toBytes("event_log");
96      byte [][] FAMILIES = new byte[][] { FAMILY };
97      Cell kvs[];
98  
99      
100     Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
101 
102     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
103       for (int colIdx = 0; colIdx < 5; colIdx++) {
104         
105         putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
106         
107         putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
108       }
109     }
110 
111     
112     verifyInsertedValues(ht, FAMILY);
113 
114     TEST_UTIL.flush();
115 
116     
117     verifyInsertedValues(ht, FAMILY);
118 
119     
120     
121     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
122       for (int colIdx = 0; colIdx < 5; colIdx++) {
123         putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
124         putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
125       }
126     }
127 
128     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
129       for (int colIdx = 0; colIdx < 5; colIdx++) {
130         kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
131                            Arrays.asList(505L, 5L, 105L, 305L, 205L));
132         assertEquals(4, kvs.length);
133         checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
134         checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
135         checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
136         checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
137       }
138     }
139 
140     
141     
142     kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
143     assertEquals(0, kvs == null? 0: kvs.length);
144 
145     
146     
147     
148     
149     Result[] results = scanNVersions(ht, FAMILY, 0, 4,
150                                      Arrays.asList(6L, 106L, 306L));
151     assertEquals("# of rows returned from scan", 5, results.length);
152     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
153       kvs = results[rowIdx].rawCells();
154       
155       
156       assertEquals("Number of KeyValues in result for row:" + rowIdx,
157                    3*5, kvs.length);
158       for (int colIdx = 0; colIdx < 5; colIdx++) {
159         int offset = colIdx * 3;
160         checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
161         checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
162         checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
163       }
164     }
165     ht.close();
166   }
167 
168   @Test
169   public void testMultiColumns() throws Exception {
170     byte [] TABLE = Bytes.toBytes("testTimestampsFilterMultiColumns");
171     byte [] FAMILY = Bytes.toBytes("event_log");
172     byte [][] FAMILIES = new byte[][] { FAMILY };
173 
174     
175     Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
176 
177     Put p = new Put(Bytes.toBytes("row"));
178     p.add(FAMILY, Bytes.toBytes("column0"), 3, Bytes.toBytes("value0-3"));
179     p.add(FAMILY, Bytes.toBytes("column1"), 3, Bytes.toBytes("value1-3"));
180     p.add(FAMILY, Bytes.toBytes("column2"), 1, Bytes.toBytes("value2-1"));
181     p.add(FAMILY, Bytes.toBytes("column2"), 2, Bytes.toBytes("value2-2"));
182     p.add(FAMILY, Bytes.toBytes("column2"), 3, Bytes.toBytes("value2-3"));
183     p.add(FAMILY, Bytes.toBytes("column3"), 2, Bytes.toBytes("value3-2"));
184     p.add(FAMILY, Bytes.toBytes("column4"), 1, Bytes.toBytes("value4-1"));
185     p.add(FAMILY, Bytes.toBytes("column4"), 2, Bytes.toBytes("value4-2"));
186     p.add(FAMILY, Bytes.toBytes("column4"), 3, Bytes.toBytes("value4-3"));
187     ht.put(p);
188 
189     ArrayList<Long> timestamps = new ArrayList<Long>();
190     timestamps.add(new Long(3));
191     TimestampsFilter filter = new TimestampsFilter(timestamps);
192 
193     Get g = new Get(Bytes.toBytes("row"));
194     g.setFilter(filter);
195     g.setMaxVersions();
196     g.addColumn(FAMILY, Bytes.toBytes("column2"));
197     g.addColumn(FAMILY, Bytes.toBytes("column4"));
198 
199     Result result = ht.get(g);
200     for (Cell kv : result.listCells()) {
201       System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) +
202           ", column " + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value "
203           + Bytes.toString(CellUtil.cloneValue(kv)));
204     }
205 
206     assertEquals(result.listCells().size(), 2);
207     assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3")));
208     assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3")));
209 
210     ht.close();
211   }
212 
213   
214 
215 
216 
217 
218   @Test
219   public void testWithVersionDeletes() throws Exception {
220 
221     
222     testWithVersionDeletes(false);
223 
224     
225     testWithVersionDeletes(true);
226   }
227 
228   private void testWithVersionDeletes(boolean flushTables) throws IOException {
229     byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
230                                    (flushTables ? "flush" : "noflush")); 
231     byte [] FAMILY = Bytes.toBytes("event_log");
232     byte [][] FAMILIES = new byte[][] { FAMILY };
233 
234     
235     Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
236 
237     
238     putNVersions(ht, FAMILY, 0, 0, 1, 5);
239 
240     
241     deleteOneVersion(ht, FAMILY, 0, 0, 4);
242 
243     if (flushTables) {
244       TEST_UTIL.flush();
245     }
246 
247     
248     
249     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
250     assertEquals(3, kvs.length);
251     checkOneCell(kvs[0], FAMILY, 0, 0, 5);
252     checkOneCell(kvs[1], FAMILY, 0, 0, 3);
253     checkOneCell(kvs[2], FAMILY, 0, 0, 2);
254 
255     ht.close();
256   }
257 
258   private void verifyInsertedValues(Table ht, byte[] cf) throws IOException {
259     for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
260       for (int colIdx = 0; colIdx < 5; colIdx++) {
261         
262         Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
263                                       Arrays.asList(5L, 300L, 6L, 80L));
264         assertEquals(4, kvs.length);
265         checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
266         checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
267         checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
268         checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
269 
270         
271         kvs = getNVersions(ht, cf, rowIdx, colIdx,
272                            Arrays.asList(101L, 102L));
273         assertEquals(0, kvs == null? 0: kvs.length);
274 
275         
276         kvs = getNVersions(ht, cf, rowIdx, colIdx,
277                            Arrays.asList(1L, 300L, 105L, 70L, 115L));
278         assertEquals(3, kvs.length);
279         checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
280         checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
281         checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
282       }
283     }
284   }
285 
286   
287 
288 
289 
290   private void checkOneCell(Cell kv, byte[] cf,
291                              int rowIdx, int colIdx, long ts) {
292 
293     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
294 
295     assertEquals("Row mismatch which checking: " + ctx,
296                  "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
297 
298     assertEquals("ColumnFamily mismatch while checking: " + ctx,
299                  Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
300 
301     assertEquals("Column qualifier mismatch while checking: " + ctx,
302                  "column:" + colIdx,
303                   Bytes.toString(CellUtil.cloneQualifier(kv)));
304 
305     assertEquals("Timestamp mismatch while checking: " + ctx,
306                  ts, kv.getTimestamp());
307 
308     assertEquals("Value mismatch while checking: " + ctx,
309                  "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
310   }
311 
312   
313 
314 
315 
316 
317   private  Cell[] getNVersions(Table ht, byte[] cf, int rowIdx,
318                                    int colIdx, List<Long> versions)
319     throws IOException {
320     byte row[] = Bytes.toBytes("row:" + rowIdx);
321     byte column[] = Bytes.toBytes("column:" + colIdx);
322     Filter filter = new TimestampsFilter(versions);
323     Get get = new Get(row);
324     get.addColumn(cf, column);
325     get.setFilter(filter);
326     get.setMaxVersions();
327     Result result = ht.get(get);
328 
329     return result.rawCells();
330   }
331 
332   
333 
334 
335 
336   private Result[] scanNVersions(Table ht, byte[] cf, int startRowIdx,
337                                  int endRowIdx, List<Long> versions)
338     throws IOException {
339     byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
340     byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); 
341     Filter filter = new TimestampsFilter(versions);
342     Scan scan = new Scan(startRow, endRow);
343     scan.setFilter(filter);
344     scan.setMaxVersions();
345     ResultScanner scanner = ht.getScanner(scan);
346     return scanner.next(endRowIdx - startRowIdx + 1);
347   }
348 
349   
350 
351 
352 
353   private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx,
354                             long versionStart, long versionEnd)
355       throws IOException {
356     byte row[] = Bytes.toBytes("row:" + rowIdx);
357     byte column[] = Bytes.toBytes("column:" + colIdx);
358     Put put = new Put(row);
359     put.setDurability(Durability.SKIP_WAL);
360 
361     for (long idx = versionStart; idx <= versionEnd; idx++) {
362       put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
363     }
364 
365     ht.put(put);
366   }
367 
368   
369 
370 
371 
372   private void deleteOneVersion(Table ht, byte[] cf, int rowIdx,
373                                 int colIdx, long version)
374     throws IOException {
375     byte row[] = Bytes.toBytes("row:" + rowIdx);
376     byte column[] = Bytes.toBytes("column:" + colIdx);
377     Delete del = new Delete(row);
378     del.deleteColumn(cf, column, version);
379     ht.delete(del);
380   }
381 
382 }
383 
384