1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.ByteArrayInputStream;
28  import java.io.ByteArrayOutputStream;
29  import java.io.DataInputStream;
30  import java.io.DataOutputStream;
31  import java.io.IOException;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.NavigableSet;
35  import java.util.Set;
36  
37  import org.apache.hadoop.hbase.client.Get;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.exceptions.DeserializationException;
40  import org.apache.hadoop.hbase.filter.BinaryComparator;
41  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
42  import org.apache.hadoop.hbase.filter.Filter;
43  import org.apache.hadoop.hbase.filter.PrefixFilter;
44  import org.apache.hadoop.hbase.filter.RowFilter;
45  import org.apache.hadoop.hbase.io.TimeRange;
46  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
48  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
49  import org.apache.hadoop.hbase.testclassification.SmallTests;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.Writables;
52  import org.apache.hadoop.io.DataInputBuffer;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  /**
57   * Test HBase Writables serializations
58   */
59  @Category(SmallTests.class)
60  public class TestSerialization {
61    @Test public void testKeyValue() throws Exception {
62      final String name = "testKeyValue2";
63      byte[] row = name.getBytes();
64      byte[] fam = "fam".getBytes();
65      byte[] qf = "qf".getBytes();
66      long ts = System.currentTimeMillis();
67      byte[] val = "val".getBytes();
68      KeyValue kv = new KeyValue(row, fam, qf, ts, val);
69      ByteArrayOutputStream baos = new ByteArrayOutputStream();
70      DataOutputStream dos = new DataOutputStream(baos);
71      long l = KeyValue.write(kv, dos);
72      dos.close();
73      byte [] mb = baos.toByteArray();
74      ByteArrayInputStream bais = new ByteArrayInputStream(mb);
75      DataInputStream dis = new DataInputStream(bais);
76      KeyValue deserializedKv = KeyValue.create(dis);
77      assertTrue(Bytes.equals(kv.getBuffer(), deserializedKv.getBuffer()));
78      assertEquals(kv.getOffset(), deserializedKv.getOffset());
79      assertEquals(kv.getLength(), deserializedKv.getLength());
80    }
81  
82    @Test public void testCreateKeyValueInvalidNegativeLength() {
83  
84      KeyValue kv_0 = new KeyValue(Bytes.toBytes("myRow"), Bytes.toBytes("myCF"),       // 51 bytes
85                                   Bytes.toBytes("myQualifier"), 12345L, Bytes.toBytes("my12345"));
86  
87      KeyValue kv_1 = new KeyValue(Bytes.toBytes("myRow"), Bytes.toBytes("myCF"),       // 49 bytes
88                                   Bytes.toBytes("myQualifier"), 12345L, Bytes.toBytes("my123"));
89  
90      ByteArrayOutputStream baos = new ByteArrayOutputStream();
91      DataOutputStream dos = new DataOutputStream(baos);
92  
93      long l = 0;
94      try {
95        l  = KeyValue.oswrite(kv_0, dos, false);
96        l += KeyValue.oswrite(kv_1, dos, false);
97        assertEquals(100L, l);
98      } catch (IOException e) {
99        fail("Unexpected IOException" + e.getMessage());
100     }
101 
102     ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
103     DataInputStream dis = new DataInputStream(bais);
104 
105     try {
106       KeyValue.create(dis);
107       assertTrue(kv_0.equals(kv_1));
108     } catch (Exception e) {
109       fail("Unexpected Exception" + e.getMessage());
110     }
111 
112     // length -1
113     try {
114       // even if we have a good kv now in dis we will just pass length with -1 for simplicity
115       KeyValue.create(-1, dis);
116       fail("Expected corrupt stream");
117     } catch (Exception e) {
118       assertEquals("Failed read -1 bytes, stream corrupt?", e.getMessage());
119     }
120 
121   }
122 
123   @Test
124   public void testSplitLogTask() throws DeserializationException {
125     SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
126       RecoveryMode.LOG_REPLAY);
127     byte [] bytes = slt.toByteArray();
128     SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
129     assertTrue(slt.equals(sltDeserialized));
130   }
131 
132   @Test public void testCompareFilter() throws Exception {
133     Filter f = new RowFilter(CompareOp.EQUAL,
134       new BinaryComparator(Bytes.toBytes("testRowOne-2")));
135     byte [] bytes = f.toByteArray();
136     Filter ff = RowFilter.parseFrom(bytes);
137     assertNotNull(ff);
138   }
139 
140   @Test public void testTableDescriptor() throws Exception {
141     final String name = "testTableDescriptor";
142     HTableDescriptor htd = createTableDescriptor(name);
143     byte [] mb = Writables.getBytes(htd);
144     HTableDescriptor deserializedHtd =
145       (HTableDescriptor)Writables.getWritable(mb, new HTableDescriptor());
146     assertEquals(htd.getTableName(), deserializedHtd.getTableName());
147   }
148 
149   /**
150    * Test RegionInfo serialization
151    * @throws Exception
152    */
153   @Test public void testRegionInfo() throws Exception {
154     HRegionInfo hri = createRandomRegion("testRegionInfo");
155 
156     //test toByteArray()
157     byte [] hrib = hri.toByteArray();
158     HRegionInfo deserializedHri = HRegionInfo.parseFrom(hrib);
159     assertEquals(hri.getEncodedName(), deserializedHri.getEncodedName());
160     assertEquals(hri, deserializedHri);
161 
162     //test toDelimitedByteArray()
163     hrib = hri.toDelimitedByteArray();
164     DataInputBuffer buf = new DataInputBuffer();
165     try {
166       buf.reset(hrib, hrib.length);
167       deserializedHri = HRegionInfo.parseFrom(buf);
168       assertEquals(hri.getEncodedName(), deserializedHri.getEncodedName());
169       assertEquals(hri, deserializedHri);
170     } finally {
171       buf.close();
172     }
173   }
174 
175   @Test public void testRegionInfos() throws Exception {
176     HRegionInfo hri = createRandomRegion("testRegionInfos");
177     byte[] triple = HRegionInfo.toDelimitedByteArray(hri, hri, hri);
178     List<HRegionInfo> regions = HRegionInfo.parseDelimitedFrom(triple, 0, triple.length);
179     assertTrue(regions.size() == 3);
180     assertTrue(regions.get(0).equals(regions.get(1)));
181     assertTrue(regions.get(0).equals(regions.get(2)));
182   }
183 
184   private HRegionInfo createRandomRegion(final String name) {
185     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
186     String [] families = new String [] {"info", "anchor"};
187     for (int i = 0; i < families.length; i++) {
188       htd.addFamily(new HColumnDescriptor(families[i]));
189     }
190     return new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW,
191       HConstants.EMPTY_END_ROW);
192   }
193 
194   /*
195    * TODO
196   @Test public void testPut() throws Exception{
197     byte[] row = "row".getBytes();
198     byte[] fam = "fam".getBytes();
199     byte[] qf1 = "qf1".getBytes();
200     byte[] qf2 = "qf2".getBytes();
201     byte[] qf3 = "qf3".getBytes();
202     byte[] qf4 = "qf4".getBytes();
203     byte[] qf5 = "qf5".getBytes();
204     byte[] qf6 = "qf6".getBytes();
205     byte[] qf7 = "qf7".getBytes();
206     byte[] qf8 = "qf8".getBytes();
207 
208     long ts = System.currentTimeMillis();
209     byte[] val = "val".getBytes();
210 
211     Put put = new Put(row);
212     put.setWriteToWAL(false);
213     put.add(fam, qf1, ts, val);
214     put.add(fam, qf2, ts, val);
215     put.add(fam, qf3, ts, val);
216     put.add(fam, qf4, ts, val);
217     put.add(fam, qf5, ts, val);
218     put.add(fam, qf6, ts, val);
219     put.add(fam, qf7, ts, val);
220     put.add(fam, qf8, ts, val);
221 
222     byte[] sb = Writables.getBytes(put);
223     Put desPut = (Put)Writables.getWritable(sb, new Put());
224 
225     //Timing test
226 //    long start = System.nanoTime();
227 //    desPut = (Put)Writables.getWritable(sb, new Put());
228 //    long stop = System.nanoTime();
229 //    System.out.println("timer " +(stop-start));
230 
231     assertTrue(Bytes.equals(put.getRow(), desPut.getRow()));
232     List<KeyValue> list = null;
233     List<KeyValue> desList = null;
234     for(Map.Entry<byte[], List<KeyValue>> entry : put.getFamilyMap().entrySet()){
235       assertTrue(desPut.getFamilyMap().containsKey(entry.getKey()));
236       list = entry.getValue();
237       desList = desPut.getFamilyMap().get(entry.getKey());
238       for(int i=0; i<list.size(); i++){
239         assertTrue(list.get(i).equals(desList.get(i)));
240       }
241     }
242   }
243 
244 
245   @Test public void testPut2() throws Exception{
246     byte[] row = "testAbort,,1243116656250".getBytes();
247     byte[] fam = "historian".getBytes();
248     byte[] qf1 = "creation".getBytes();
249 
250     long ts = 9223372036854775807L;
251     byte[] val = "dont-care".getBytes();
252 
253     Put put = new Put(row);
254     put.add(fam, qf1, ts, val);
255 
256     byte[] sb = Writables.getBytes(put);
257     Put desPut = (Put)Writables.getWritable(sb, new Put());
258 
259     assertTrue(Bytes.equals(put.getRow(), desPut.getRow()));
260     List<KeyValue> list = null;
261     List<KeyValue> desList = null;
262     for(Map.Entry<byte[], List<KeyValue>> entry : put.getFamilyMap().entrySet()){
263       assertTrue(desPut.getFamilyMap().containsKey(entry.getKey()));
264       list = entry.getValue();
265       desList = desPut.getFamilyMap().get(entry.getKey());
266       for(int i=0; i<list.size(); i++){
267         assertTrue(list.get(i).equals(desList.get(i)));
268       }
269     }
270   }
271 
272 
273   @Test public void testDelete() throws Exception{
274     byte[] row = "row".getBytes();
275     byte[] fam = "fam".getBytes();
276     byte[] qf1 = "qf1".getBytes();
277 
278     long ts = System.currentTimeMillis();
279 
280     Delete delete = new Delete(row);
281     delete.deleteColumn(fam, qf1, ts);
282 
283     byte[] sb = Writables.getBytes(delete);
284     Delete desDelete = (Delete)Writables.getWritable(sb, new Delete());
285 
286     assertTrue(Bytes.equals(delete.getRow(), desDelete.getRow()));
287     List<KeyValue> list = null;
288     List<KeyValue> desList = null;
289     for(Map.Entry<byte[], List<KeyValue>> entry :
290         delete.getFamilyMap().entrySet()){
291       assertTrue(desDelete.getFamilyMap().containsKey(entry.getKey()));
292       list = entry.getValue();
293       desList = desDelete.getFamilyMap().get(entry.getKey());
294       for(int i=0; i<list.size(); i++){
295         assertTrue(list.get(i).equals(desList.get(i)));
296       }
297     }
298   }
299   */
300 
301   @Test public void testGet() throws Exception{
302     byte[] row = "row".getBytes();
303     byte[] fam = "fam".getBytes();
304     byte[] qf1 = "qf1".getBytes();
305 
306     long ts = System.currentTimeMillis();
307     int maxVersions = 2;
308 
309     Get get = new Get(row);
310     get.addColumn(fam, qf1);
311     get.setTimeRange(ts, ts+1);
312     get.setMaxVersions(maxVersions);
313 
314     ClientProtos.Get getProto = ProtobufUtil.toGet(get);
315     Get desGet = ProtobufUtil.toGet(getProto);
316 
317     assertTrue(Bytes.equals(get.getRow(), desGet.getRow()));
318     Set<byte[]> set = null;
319     Set<byte[]> desSet = null;
320 
321     for(Map.Entry<byte[], NavigableSet<byte[]>> entry :
322         get.getFamilyMap().entrySet()){
323       assertTrue(desGet.getFamilyMap().containsKey(entry.getKey()));
324       set = entry.getValue();
325       desSet = desGet.getFamilyMap().get(entry.getKey());
326       for(byte [] qualifier : set){
327         assertTrue(desSet.contains(qualifier));
328       }
329     }
330 
331     assertEquals(get.getMaxVersions(), desGet.getMaxVersions());
332     TimeRange tr = get.getTimeRange();
333     TimeRange desTr = desGet.getTimeRange();
334     assertEquals(tr.getMax(), desTr.getMax());
335     assertEquals(tr.getMin(), desTr.getMin());
336   }
337 
338 
339   @Test public void testScan() throws Exception {
340 
341     byte[] startRow = "startRow".getBytes();
342     byte[] stopRow  = "stopRow".getBytes();
343     byte[] fam = "fam".getBytes();
344     byte[] qf1 = "qf1".getBytes();
345 
346     long ts = System.currentTimeMillis();
347     int maxVersions = 2;
348 
349     Scan scan = new Scan(startRow, stopRow);
350     scan.addColumn(fam, qf1);
351     scan.setTimeRange(ts, ts+1);
352     scan.setMaxVersions(maxVersions);
353 
354     ClientProtos.Scan scanProto = ProtobufUtil.toScan(scan);
355     Scan desScan = ProtobufUtil.toScan(scanProto);
356 
357     assertTrue(Bytes.equals(scan.getStartRow(), desScan.getStartRow()));
358     assertTrue(Bytes.equals(scan.getStopRow(), desScan.getStopRow()));
359     assertEquals(scan.getCacheBlocks(), desScan.getCacheBlocks());
360     Set<byte[]> set = null;
361     Set<byte[]> desSet = null;
362 
363     for(Map.Entry<byte[], NavigableSet<byte[]>> entry :
364         scan.getFamilyMap().entrySet()){
365       assertTrue(desScan.getFamilyMap().containsKey(entry.getKey()));
366       set = entry.getValue();
367       desSet = desScan.getFamilyMap().get(entry.getKey());
368       for(byte[] column : set){
369         assertTrue(desSet.contains(column));
370       }
371 
372       // Test filters are serialized properly.
373       scan = new Scan(startRow);
374       final String name = "testScan";
375       byte [] prefix = Bytes.toBytes(name);
376       scan.setFilter(new PrefixFilter(prefix));
377       scanProto = ProtobufUtil.toScan(scan);
378       desScan = ProtobufUtil.toScan(scanProto);
379       Filter f = desScan.getFilter();
380       assertTrue(f instanceof PrefixFilter);
381     }
382 
383     assertEquals(scan.getMaxVersions(), desScan.getMaxVersions());
384     TimeRange tr = scan.getTimeRange();
385     TimeRange desTr = desScan.getTimeRange();
386     assertEquals(tr.getMax(), desTr.getMax());
387     assertEquals(tr.getMin(), desTr.getMin());
388   }
389 
390   /*
391    * TODO
392   @Test public void testResultEmpty() throws Exception {
393     List<KeyValue> keys = new ArrayList<KeyValue>();
394     Result r = Result.newResult(keys);
395     assertTrue(r.isEmpty());
396     byte [] rb = Writables.getBytes(r);
397     Result deserializedR = (Result)Writables.getWritable(rb, new Result());
398     assertTrue(deserializedR.isEmpty());
399   }
400 
401 
402   @Test public void testResult() throws Exception {
403     byte [] rowA = Bytes.toBytes("rowA");
404     byte [] famA = Bytes.toBytes("famA");
405     byte [] qfA = Bytes.toBytes("qfA");
406     byte [] valueA = Bytes.toBytes("valueA");
407 
408     byte [] rowB = Bytes.toBytes("rowB");
409     byte [] famB = Bytes.toBytes("famB");
410     byte [] qfB = Bytes.toBytes("qfB");
411     byte [] valueB = Bytes.toBytes("valueB");
412 
413     KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA);
414     KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB);
415 
416     Result result = Result.newResult(new KeyValue[]{kvA, kvB});
417 
418     byte [] rb = Writables.getBytes(result);
419     Result deResult = (Result)Writables.getWritable(rb, new Result());
420 
421     assertTrue("results are not equivalent, first key mismatch",
422         result.raw()[0].equals(deResult.raw()[0]));
423 
424     assertTrue("results are not equivalent, second key mismatch",
425         result.raw()[1].equals(deResult.raw()[1]));
426 
427     // Test empty Result
428     Result r = new Result();
429     byte [] b = Writables.getBytes(r);
430     Result deserialized = (Result)Writables.getWritable(b, new Result());
431     assertEquals(r.size(), deserialized.size());
432   }
433 
434   @Test public void testResultDynamicBuild() throws Exception {
435     byte [] rowA = Bytes.toBytes("rowA");
436     byte [] famA = Bytes.toBytes("famA");
437     byte [] qfA = Bytes.toBytes("qfA");
438     byte [] valueA = Bytes.toBytes("valueA");
439 
440     byte [] rowB = Bytes.toBytes("rowB");
441     byte [] famB = Bytes.toBytes("famB");
442     byte [] qfB = Bytes.toBytes("qfB");
443     byte [] valueB = Bytes.toBytes("valueB");
444 
445     KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA);
446     KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB);
447 
448     Result result = Result.newResult(new KeyValue[]{kvA, kvB});
449 
450     byte [] rb = Writables.getBytes(result);
451 
452 
453     // Call getRow() first
454     Result deResult = (Result)Writables.getWritable(rb, new Result());
455     byte [] row = deResult.getRow();
456     assertTrue(Bytes.equals(row, rowA));
457 
458     // Call sorted() first
459     deResult = (Result)Writables.getWritable(rb, new Result());
460     assertTrue("results are not equivalent, first key mismatch",
461         result.raw()[0].equals(deResult.raw()[0]));
462     assertTrue("results are not equivalent, second key mismatch",
463         result.raw()[1].equals(deResult.raw()[1]));
464 
465     // Call raw() first
466     deResult = (Result)Writables.getWritable(rb, new Result());
467     assertTrue("results are not equivalent, first key mismatch",
468         result.raw()[0].equals(deResult.raw()[0]));
469     assertTrue("results are not equivalent, second key mismatch",
470         result.raw()[1].equals(deResult.raw()[1]));
471 
472 
473   }
474 
475   @Test public void testResultArray() throws Exception {
476     byte [] rowA = Bytes.toBytes("rowA");
477     byte [] famA = Bytes.toBytes("famA");
478     byte [] qfA = Bytes.toBytes("qfA");
479     byte [] valueA = Bytes.toBytes("valueA");
480 
481     byte [] rowB = Bytes.toBytes("rowB");
482     byte [] famB = Bytes.toBytes("famB");
483     byte [] qfB = Bytes.toBytes("qfB");
484     byte [] valueB = Bytes.toBytes("valueB");
485 
486     KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA);
487     KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB);
488 
489 
490     Result result1 = Result.newResult(new KeyValue[]{kvA, kvB});
491     Result result2 = Result.newResult(new KeyValue[]{kvB});
492     Result result3 = Result.newResult(new KeyValue[]{kvB});
493 
494     Result [] results = new Result [] {result1, result2, result3};
495 
496     ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
497     DataOutputStream out = new DataOutputStream(byteStream);
498     Result.writeArray(out, results);
499 
500     byte [] rb = byteStream.toByteArray();
501 
502     DataInputBuffer in = new DataInputBuffer();
503     in.reset(rb, 0, rb.length);
504 
505     Result [] deResults = Result.readArray(in);
506 
507     assertTrue(results.length == deResults.length);
508 
509     for(int i=0;i<results.length;i++) {
510       KeyValue [] keysA = results[i].raw();
511       KeyValue [] keysB = deResults[i].raw();
512       assertTrue(keysA.length == keysB.length);
513       for(int j=0;j<keysA.length;j++) {
514         assertTrue("Expected equivalent keys but found:\n" +
515             "KeyA : " + keysA[j].toString() + "\n" +
516             "KeyB : " + keysB[j].toString() + "\n" +
517             keysA.length + " total keys, " + i + "th so far"
518             ,keysA[j].equals(keysB[j]));
519       }
520     }
521 
522   }
523 
524   @Test public void testResultArrayEmpty() throws Exception {
525     List<KeyValue> keys = new ArrayList<KeyValue>();
526     Result r = Result.newResult(keys);
527     Result [] results = new Result [] {r};
528 
529     ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
530     DataOutputStream out = new DataOutputStream(byteStream);
531 
532     Result.writeArray(out, results);
533 
534     results = null;
535 
536     byteStream = new ByteArrayOutputStream();
537     out = new DataOutputStream(byteStream);
538     Result.writeArray(out, results);
539 
540     byte [] rb = byteStream.toByteArray();
541 
542     DataInputBuffer in = new DataInputBuffer();
543     in.reset(rb, 0, rb.length);
544 
545     Result [] deResults = Result.readArray(in);
546 
547     assertTrue(deResults.length == 0);
548 
549     results = new Result[0];
550 
551     byteStream = new ByteArrayOutputStream();
552     out = new DataOutputStream(byteStream);
553     Result.writeArray(out, results);
554 
555     rb = byteStream.toByteArray();
556 
557     in = new DataInputBuffer();
558     in.reset(rb, 0, rb.length);
559 
560     deResults = Result.readArray(in);
561 
562     assertTrue(deResults.length == 0);
563 
564   }
565   */
566 
567   protected static final int MAXVERSIONS = 3;
568   protected final static byte [] fam1 = Bytes.toBytes("colfamily1");
569   protected final static byte [] fam2 = Bytes.toBytes("colfamily2");
570   protected final static byte [] fam3 = Bytes.toBytes("colfamily3");
571   protected static final byte [][] COLUMNS = {fam1, fam2, fam3};
572 
573   /**
574    * Create a table of name <code>name</code> with {@link COLUMNS} for
575    * families.
576    * @param name Name to give table.
577    * @return Column descriptor.
578    */
579   protected HTableDescriptor createTableDescriptor(final String name) {
580     return createTableDescriptor(name, MAXVERSIONS);
581   }
582 
583   /**
584    * Create a table of name <code>name</code> with {@link COLUMNS} for
585    * families.
586    * @param name Name to give table.
587    * @param versions How many versions to allow per column.
588    * @return Column descriptor.
589    */
590   protected HTableDescriptor createTableDescriptor(final String name,
591       final int versions) {
592     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
593     htd.addFamily(new HColumnDescriptor(fam1)
594         .setMaxVersions(versions)
595         .setBlockCacheEnabled(false)
596     );
597     htd.addFamily(new HColumnDescriptor(fam2)
598         .setMaxVersions(versions)
599         .setBlockCacheEnabled(false)
600     );
601     htd.addFamily(new HColumnDescriptor(fam3)
602         .setMaxVersions(versions)
603         .setBlockCacheEnabled(false)
604     );
605     return htd;
606   }
607 }