1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase;
20
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.ByteArrayInputStream;
28 import java.io.ByteArrayOutputStream;
29 import java.io.DataInputStream;
30 import java.io.DataOutputStream;
31 import java.io.IOException;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.NavigableSet;
35 import java.util.Set;
36
37 import org.apache.hadoop.hbase.client.Get;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.exceptions.DeserializationException;
40 import org.apache.hadoop.hbase.filter.BinaryComparator;
41 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
42 import org.apache.hadoop.hbase.filter.Filter;
43 import org.apache.hadoop.hbase.filter.PrefixFilter;
44 import org.apache.hadoop.hbase.filter.RowFilter;
45 import org.apache.hadoop.hbase.io.TimeRange;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
48 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
49 import org.apache.hadoop.hbase.testclassification.SmallTests;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.Writables;
52 import org.apache.hadoop.io.DataInputBuffer;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 /**
57 * Test HBase Writables serializations
58 */
59 @Category(SmallTests.class)
60 public class TestSerialization {
61 @Test public void testKeyValue() throws Exception {
62 final String name = "testKeyValue2";
63 byte[] row = name.getBytes();
64 byte[] fam = "fam".getBytes();
65 byte[] qf = "qf".getBytes();
66 long ts = System.currentTimeMillis();
67 byte[] val = "val".getBytes();
68 KeyValue kv = new KeyValue(row, fam, qf, ts, val);
69 ByteArrayOutputStream baos = new ByteArrayOutputStream();
70 DataOutputStream dos = new DataOutputStream(baos);
71 long l = KeyValue.write(kv, dos);
72 dos.close();
73 byte [] mb = baos.toByteArray();
74 ByteArrayInputStream bais = new ByteArrayInputStream(mb);
75 DataInputStream dis = new DataInputStream(bais);
76 KeyValue deserializedKv = KeyValue.create(dis);
77 assertTrue(Bytes.equals(kv.getBuffer(), deserializedKv.getBuffer()));
78 assertEquals(kv.getOffset(), deserializedKv.getOffset());
79 assertEquals(kv.getLength(), deserializedKv.getLength());
80 }
81
82 @Test public void testCreateKeyValueInvalidNegativeLength() {
83
84 KeyValue kv_0 = new KeyValue(Bytes.toBytes("myRow"), Bytes.toBytes("myCF"), // 51 bytes
85 Bytes.toBytes("myQualifier"), 12345L, Bytes.toBytes("my12345"));
86
87 KeyValue kv_1 = new KeyValue(Bytes.toBytes("myRow"), Bytes.toBytes("myCF"), // 49 bytes
88 Bytes.toBytes("myQualifier"), 12345L, Bytes.toBytes("my123"));
89
90 ByteArrayOutputStream baos = new ByteArrayOutputStream();
91 DataOutputStream dos = new DataOutputStream(baos);
92
93 long l = 0;
94 try {
95 l = KeyValue.oswrite(kv_0, dos, false);
96 l += KeyValue.oswrite(kv_1, dos, false);
97 assertEquals(100L, l);
98 } catch (IOException e) {
99 fail("Unexpected IOException" + e.getMessage());
100 }
101
102 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
103 DataInputStream dis = new DataInputStream(bais);
104
105 try {
106 KeyValue.create(dis);
107 assertTrue(kv_0.equals(kv_1));
108 } catch (Exception e) {
109 fail("Unexpected Exception" + e.getMessage());
110 }
111
112 // length -1
113 try {
114 // even if we have a good kv now in dis we will just pass length with -1 for simplicity
115 KeyValue.create(-1, dis);
116 fail("Expected corrupt stream");
117 } catch (Exception e) {
118 assertEquals("Failed read -1 bytes, stream corrupt?", e.getMessage());
119 }
120
121 }
122
123 @Test
124 public void testSplitLogTask() throws DeserializationException {
125 SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
126 RecoveryMode.LOG_REPLAY);
127 byte [] bytes = slt.toByteArray();
128 SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
129 assertTrue(slt.equals(sltDeserialized));
130 }
131
132 @Test public void testCompareFilter() throws Exception {
133 Filter f = new RowFilter(CompareOp.EQUAL,
134 new BinaryComparator(Bytes.toBytes("testRowOne-2")));
135 byte [] bytes = f.toByteArray();
136 Filter ff = RowFilter.parseFrom(bytes);
137 assertNotNull(ff);
138 }
139
140 @Test public void testTableDescriptor() throws Exception {
141 final String name = "testTableDescriptor";
142 HTableDescriptor htd = createTableDescriptor(name);
143 byte [] mb = Writables.getBytes(htd);
144 HTableDescriptor deserializedHtd =
145 (HTableDescriptor)Writables.getWritable(mb, new HTableDescriptor());
146 assertEquals(htd.getTableName(), deserializedHtd.getTableName());
147 }
148
149 /**
150 * Test RegionInfo serialization
151 * @throws Exception
152 */
153 @Test public void testRegionInfo() throws Exception {
154 HRegionInfo hri = createRandomRegion("testRegionInfo");
155
156 //test toByteArray()
157 byte [] hrib = hri.toByteArray();
158 HRegionInfo deserializedHri = HRegionInfo.parseFrom(hrib);
159 assertEquals(hri.getEncodedName(), deserializedHri.getEncodedName());
160 assertEquals(hri, deserializedHri);
161
162 //test toDelimitedByteArray()
163 hrib = hri.toDelimitedByteArray();
164 DataInputBuffer buf = new DataInputBuffer();
165 try {
166 buf.reset(hrib, hrib.length);
167 deserializedHri = HRegionInfo.parseFrom(buf);
168 assertEquals(hri.getEncodedName(), deserializedHri.getEncodedName());
169 assertEquals(hri, deserializedHri);
170 } finally {
171 buf.close();
172 }
173 }
174
175 @Test public void testRegionInfos() throws Exception {
176 HRegionInfo hri = createRandomRegion("testRegionInfos");
177 byte[] triple = HRegionInfo.toDelimitedByteArray(hri, hri, hri);
178 List<HRegionInfo> regions = HRegionInfo.parseDelimitedFrom(triple, 0, triple.length);
179 assertTrue(regions.size() == 3);
180 assertTrue(regions.get(0).equals(regions.get(1)));
181 assertTrue(regions.get(0).equals(regions.get(2)));
182 }
183
184 private HRegionInfo createRandomRegion(final String name) {
185 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
186 String [] families = new String [] {"info", "anchor"};
187 for (int i = 0; i < families.length; i++) {
188 htd.addFamily(new HColumnDescriptor(families[i]));
189 }
190 return new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW,
191 HConstants.EMPTY_END_ROW);
192 }
193
194 /*
195 * TODO
196 @Test public void testPut() throws Exception{
197 byte[] row = "row".getBytes();
198 byte[] fam = "fam".getBytes();
199 byte[] qf1 = "qf1".getBytes();
200 byte[] qf2 = "qf2".getBytes();
201 byte[] qf3 = "qf3".getBytes();
202 byte[] qf4 = "qf4".getBytes();
203 byte[] qf5 = "qf5".getBytes();
204 byte[] qf6 = "qf6".getBytes();
205 byte[] qf7 = "qf7".getBytes();
206 byte[] qf8 = "qf8".getBytes();
207
208 long ts = System.currentTimeMillis();
209 byte[] val = "val".getBytes();
210
211 Put put = new Put(row);
212 put.setWriteToWAL(false);
213 put.add(fam, qf1, ts, val);
214 put.add(fam, qf2, ts, val);
215 put.add(fam, qf3, ts, val);
216 put.add(fam, qf4, ts, val);
217 put.add(fam, qf5, ts, val);
218 put.add(fam, qf6, ts, val);
219 put.add(fam, qf7, ts, val);
220 put.add(fam, qf8, ts, val);
221
222 byte[] sb = Writables.getBytes(put);
223 Put desPut = (Put)Writables.getWritable(sb, new Put());
224
225 //Timing test
226 // long start = System.nanoTime();
227 // desPut = (Put)Writables.getWritable(sb, new Put());
228 // long stop = System.nanoTime();
229 // System.out.println("timer " +(stop-start));
230
231 assertTrue(Bytes.equals(put.getRow(), desPut.getRow()));
232 List<KeyValue> list = null;
233 List<KeyValue> desList = null;
234 for(Map.Entry<byte[], List<KeyValue>> entry : put.getFamilyMap().entrySet()){
235 assertTrue(desPut.getFamilyMap().containsKey(entry.getKey()));
236 list = entry.getValue();
237 desList = desPut.getFamilyMap().get(entry.getKey());
238 for(int i=0; i<list.size(); i++){
239 assertTrue(list.get(i).equals(desList.get(i)));
240 }
241 }
242 }
243
244
245 @Test public void testPut2() throws Exception{
246 byte[] row = "testAbort,,1243116656250".getBytes();
247 byte[] fam = "historian".getBytes();
248 byte[] qf1 = "creation".getBytes();
249
250 long ts = 9223372036854775807L;
251 byte[] val = "dont-care".getBytes();
252
253 Put put = new Put(row);
254 put.add(fam, qf1, ts, val);
255
256 byte[] sb = Writables.getBytes(put);
257 Put desPut = (Put)Writables.getWritable(sb, new Put());
258
259 assertTrue(Bytes.equals(put.getRow(), desPut.getRow()));
260 List<KeyValue> list = null;
261 List<KeyValue> desList = null;
262 for(Map.Entry<byte[], List<KeyValue>> entry : put.getFamilyMap().entrySet()){
263 assertTrue(desPut.getFamilyMap().containsKey(entry.getKey()));
264 list = entry.getValue();
265 desList = desPut.getFamilyMap().get(entry.getKey());
266 for(int i=0; i<list.size(); i++){
267 assertTrue(list.get(i).equals(desList.get(i)));
268 }
269 }
270 }
271
272
273 @Test public void testDelete() throws Exception{
274 byte[] row = "row".getBytes();
275 byte[] fam = "fam".getBytes();
276 byte[] qf1 = "qf1".getBytes();
277
278 long ts = System.currentTimeMillis();
279
280 Delete delete = new Delete(row);
281 delete.deleteColumn(fam, qf1, ts);
282
283 byte[] sb = Writables.getBytes(delete);
284 Delete desDelete = (Delete)Writables.getWritable(sb, new Delete());
285
286 assertTrue(Bytes.equals(delete.getRow(), desDelete.getRow()));
287 List<KeyValue> list = null;
288 List<KeyValue> desList = null;
289 for(Map.Entry<byte[], List<KeyValue>> entry :
290 delete.getFamilyMap().entrySet()){
291 assertTrue(desDelete.getFamilyMap().containsKey(entry.getKey()));
292 list = entry.getValue();
293 desList = desDelete.getFamilyMap().get(entry.getKey());
294 for(int i=0; i<list.size(); i++){
295 assertTrue(list.get(i).equals(desList.get(i)));
296 }
297 }
298 }
299 */
300
301 @Test public void testGet() throws Exception{
302 byte[] row = "row".getBytes();
303 byte[] fam = "fam".getBytes();
304 byte[] qf1 = "qf1".getBytes();
305
306 long ts = System.currentTimeMillis();
307 int maxVersions = 2;
308
309 Get get = new Get(row);
310 get.addColumn(fam, qf1);
311 get.setTimeRange(ts, ts+1);
312 get.setMaxVersions(maxVersions);
313
314 ClientProtos.Get getProto = ProtobufUtil.toGet(get);
315 Get desGet = ProtobufUtil.toGet(getProto);
316
317 assertTrue(Bytes.equals(get.getRow(), desGet.getRow()));
318 Set<byte[]> set = null;
319 Set<byte[]> desSet = null;
320
321 for(Map.Entry<byte[], NavigableSet<byte[]>> entry :
322 get.getFamilyMap().entrySet()){
323 assertTrue(desGet.getFamilyMap().containsKey(entry.getKey()));
324 set = entry.getValue();
325 desSet = desGet.getFamilyMap().get(entry.getKey());
326 for(byte [] qualifier : set){
327 assertTrue(desSet.contains(qualifier));
328 }
329 }
330
331 assertEquals(get.getMaxVersions(), desGet.getMaxVersions());
332 TimeRange tr = get.getTimeRange();
333 TimeRange desTr = desGet.getTimeRange();
334 assertEquals(tr.getMax(), desTr.getMax());
335 assertEquals(tr.getMin(), desTr.getMin());
336 }
337
338
339 @Test public void testScan() throws Exception {
340
341 byte[] startRow = "startRow".getBytes();
342 byte[] stopRow = "stopRow".getBytes();
343 byte[] fam = "fam".getBytes();
344 byte[] qf1 = "qf1".getBytes();
345
346 long ts = System.currentTimeMillis();
347 int maxVersions = 2;
348
349 Scan scan = new Scan(startRow, stopRow);
350 scan.addColumn(fam, qf1);
351 scan.setTimeRange(ts, ts+1);
352 scan.setMaxVersions(maxVersions);
353
354 ClientProtos.Scan scanProto = ProtobufUtil.toScan(scan);
355 Scan desScan = ProtobufUtil.toScan(scanProto);
356
357 assertTrue(Bytes.equals(scan.getStartRow(), desScan.getStartRow()));
358 assertTrue(Bytes.equals(scan.getStopRow(), desScan.getStopRow()));
359 assertEquals(scan.getCacheBlocks(), desScan.getCacheBlocks());
360 Set<byte[]> set = null;
361 Set<byte[]> desSet = null;
362
363 for(Map.Entry<byte[], NavigableSet<byte[]>> entry :
364 scan.getFamilyMap().entrySet()){
365 assertTrue(desScan.getFamilyMap().containsKey(entry.getKey()));
366 set = entry.getValue();
367 desSet = desScan.getFamilyMap().get(entry.getKey());
368 for(byte[] column : set){
369 assertTrue(desSet.contains(column));
370 }
371
372 // Test filters are serialized properly.
373 scan = new Scan(startRow);
374 final String name = "testScan";
375 byte [] prefix = Bytes.toBytes(name);
376 scan.setFilter(new PrefixFilter(prefix));
377 scanProto = ProtobufUtil.toScan(scan);
378 desScan = ProtobufUtil.toScan(scanProto);
379 Filter f = desScan.getFilter();
380 assertTrue(f instanceof PrefixFilter);
381 }
382
383 assertEquals(scan.getMaxVersions(), desScan.getMaxVersions());
384 TimeRange tr = scan.getTimeRange();
385 TimeRange desTr = desScan.getTimeRange();
386 assertEquals(tr.getMax(), desTr.getMax());
387 assertEquals(tr.getMin(), desTr.getMin());
388 }
389
390 /*
391 * TODO
392 @Test public void testResultEmpty() throws Exception {
393 List<KeyValue> keys = new ArrayList<KeyValue>();
394 Result r = Result.newResult(keys);
395 assertTrue(r.isEmpty());
396 byte [] rb = Writables.getBytes(r);
397 Result deserializedR = (Result)Writables.getWritable(rb, new Result());
398 assertTrue(deserializedR.isEmpty());
399 }
400
401
402 @Test public void testResult() throws Exception {
403 byte [] rowA = Bytes.toBytes("rowA");
404 byte [] famA = Bytes.toBytes("famA");
405 byte [] qfA = Bytes.toBytes("qfA");
406 byte [] valueA = Bytes.toBytes("valueA");
407
408 byte [] rowB = Bytes.toBytes("rowB");
409 byte [] famB = Bytes.toBytes("famB");
410 byte [] qfB = Bytes.toBytes("qfB");
411 byte [] valueB = Bytes.toBytes("valueB");
412
413 KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA);
414 KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB);
415
416 Result result = Result.newResult(new KeyValue[]{kvA, kvB});
417
418 byte [] rb = Writables.getBytes(result);
419 Result deResult = (Result)Writables.getWritable(rb, new Result());
420
421 assertTrue("results are not equivalent, first key mismatch",
422 result.raw()[0].equals(deResult.raw()[0]));
423
424 assertTrue("results are not equivalent, second key mismatch",
425 result.raw()[1].equals(deResult.raw()[1]));
426
427 // Test empty Result
428 Result r = new Result();
429 byte [] b = Writables.getBytes(r);
430 Result deserialized = (Result)Writables.getWritable(b, new Result());
431 assertEquals(r.size(), deserialized.size());
432 }
433
434 @Test public void testResultDynamicBuild() throws Exception {
435 byte [] rowA = Bytes.toBytes("rowA");
436 byte [] famA = Bytes.toBytes("famA");
437 byte [] qfA = Bytes.toBytes("qfA");
438 byte [] valueA = Bytes.toBytes("valueA");
439
440 byte [] rowB = Bytes.toBytes("rowB");
441 byte [] famB = Bytes.toBytes("famB");
442 byte [] qfB = Bytes.toBytes("qfB");
443 byte [] valueB = Bytes.toBytes("valueB");
444
445 KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA);
446 KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB);
447
448 Result result = Result.newResult(new KeyValue[]{kvA, kvB});
449
450 byte [] rb = Writables.getBytes(result);
451
452
453 // Call getRow() first
454 Result deResult = (Result)Writables.getWritable(rb, new Result());
455 byte [] row = deResult.getRow();
456 assertTrue(Bytes.equals(row, rowA));
457
458 // Call sorted() first
459 deResult = (Result)Writables.getWritable(rb, new Result());
460 assertTrue("results are not equivalent, first key mismatch",
461 result.raw()[0].equals(deResult.raw()[0]));
462 assertTrue("results are not equivalent, second key mismatch",
463 result.raw()[1].equals(deResult.raw()[1]));
464
465 // Call raw() first
466 deResult = (Result)Writables.getWritable(rb, new Result());
467 assertTrue("results are not equivalent, first key mismatch",
468 result.raw()[0].equals(deResult.raw()[0]));
469 assertTrue("results are not equivalent, second key mismatch",
470 result.raw()[1].equals(deResult.raw()[1]));
471
472
473 }
474
475 @Test public void testResultArray() throws Exception {
476 byte [] rowA = Bytes.toBytes("rowA");
477 byte [] famA = Bytes.toBytes("famA");
478 byte [] qfA = Bytes.toBytes("qfA");
479 byte [] valueA = Bytes.toBytes("valueA");
480
481 byte [] rowB = Bytes.toBytes("rowB");
482 byte [] famB = Bytes.toBytes("famB");
483 byte [] qfB = Bytes.toBytes("qfB");
484 byte [] valueB = Bytes.toBytes("valueB");
485
486 KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA);
487 KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB);
488
489
490 Result result1 = Result.newResult(new KeyValue[]{kvA, kvB});
491 Result result2 = Result.newResult(new KeyValue[]{kvB});
492 Result result3 = Result.newResult(new KeyValue[]{kvB});
493
494 Result [] results = new Result [] {result1, result2, result3};
495
496 ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
497 DataOutputStream out = new DataOutputStream(byteStream);
498 Result.writeArray(out, results);
499
500 byte [] rb = byteStream.toByteArray();
501
502 DataInputBuffer in = new DataInputBuffer();
503 in.reset(rb, 0, rb.length);
504
505 Result [] deResults = Result.readArray(in);
506
507 assertTrue(results.length == deResults.length);
508
509 for(int i=0;i<results.length;i++) {
510 KeyValue [] keysA = results[i].raw();
511 KeyValue [] keysB = deResults[i].raw();
512 assertTrue(keysA.length == keysB.length);
513 for(int j=0;j<keysA.length;j++) {
514 assertTrue("Expected equivalent keys but found:\n" +
515 "KeyA : " + keysA[j].toString() + "\n" +
516 "KeyB : " + keysB[j].toString() + "\n" +
517 keysA.length + " total keys, " + i + "th so far"
518 ,keysA[j].equals(keysB[j]));
519 }
520 }
521
522 }
523
524 @Test public void testResultArrayEmpty() throws Exception {
525 List<KeyValue> keys = new ArrayList<KeyValue>();
526 Result r = Result.newResult(keys);
527 Result [] results = new Result [] {r};
528
529 ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
530 DataOutputStream out = new DataOutputStream(byteStream);
531
532 Result.writeArray(out, results);
533
534 results = null;
535
536 byteStream = new ByteArrayOutputStream();
537 out = new DataOutputStream(byteStream);
538 Result.writeArray(out, results);
539
540 byte [] rb = byteStream.toByteArray();
541
542 DataInputBuffer in = new DataInputBuffer();
543 in.reset(rb, 0, rb.length);
544
545 Result [] deResults = Result.readArray(in);
546
547 assertTrue(deResults.length == 0);
548
549 results = new Result[0];
550
551 byteStream = new ByteArrayOutputStream();
552 out = new DataOutputStream(byteStream);
553 Result.writeArray(out, results);
554
555 rb = byteStream.toByteArray();
556
557 in = new DataInputBuffer();
558 in.reset(rb, 0, rb.length);
559
560 deResults = Result.readArray(in);
561
562 assertTrue(deResults.length == 0);
563
564 }
565 */
566
567 protected static final int MAXVERSIONS = 3;
568 protected final static byte [] fam1 = Bytes.toBytes("colfamily1");
569 protected final static byte [] fam2 = Bytes.toBytes("colfamily2");
570 protected final static byte [] fam3 = Bytes.toBytes("colfamily3");
571 protected static final byte [][] COLUMNS = {fam1, fam2, fam3};
572
573 /**
574 * Create a table of name <code>name</code> with {@link COLUMNS} for
575 * families.
576 * @param name Name to give table.
577 * @return Column descriptor.
578 */
579 protected HTableDescriptor createTableDescriptor(final String name) {
580 return createTableDescriptor(name, MAXVERSIONS);
581 }
582
583 /**
584 * Create a table of name <code>name</code> with {@link COLUMNS} for
585 * families.
586 * @param name Name to give table.
587 * @param versions How many versions to allow per column.
588 * @return Column descriptor.
589 */
590 protected HTableDescriptor createTableDescriptor(final String name,
591 final int versions) {
592 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
593 htd.addFamily(new HColumnDescriptor(fam1)
594 .setMaxVersions(versions)
595 .setBlockCacheEnabled(false)
596 );
597 htd.addFamily(new HColumnDescriptor(fam2)
598 .setMaxVersions(versions)
599 .setBlockCacheEnabled(false)
600 );
601 htd.addFamily(new HColumnDescriptor(fam3)
602 .setMaxVersions(versions)
603 .setBlockCacheEnabled(false)
604 );
605 return htd;
606 }
607 }