1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.io.OutputStream;
25  import java.nio.ByteBuffer;
26  import java.util.Iterator;
27  
28  import org.apache.commons.lang.NotImplementedException;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
34  import org.apache.hadoop.hbase.io.hfile.HFileContext;
35  import org.apache.hadoop.hbase.util.ByteBufferUtils;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.io.IOUtils;
38  import org.apache.hadoop.io.compress.Compressor;
39  
40  import com.google.common.annotations.VisibleForTesting;
41  import com.google.common.base.Preconditions;
42  
43  
44  
45  
46  
47  
48  @InterfaceAudience.Private
49  @VisibleForTesting
50  public class EncodedDataBlock {
51    private byte[] rawKVs;
52    private ByteBuffer rawBuffer;
53    private DataBlockEncoder dataBlockEncoder;
54  
55    private byte[] cachedEncodedData;
56  
57    private final HFileBlockEncodingContext encodingCtx;
58    private HFileContext meta;
59  
60    
61  
62  
63  
64  
65  
66  
67    public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
68        byte[] rawKVs, HFileContext meta) {
69      Preconditions.checkNotNull(encoding,
70          "Cannot create encoded data block with null encoder");
71      this.dataBlockEncoder = dataBlockEncoder;
72      encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
73          HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
74      this.rawKVs = rawKVs;
75      this.meta = meta;
76    }
77  
78    
79  
80  
81  
82  
83    public Iterator<Cell> getIterator(int headerSize) {
84      final int rawSize = rawKVs.length;
85      byte[] encodedDataWithHeader = getEncodedData();
86      int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT;
87      ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader,
88          bytesToSkip, encodedDataWithHeader.length - bytesToSkip);
89      final DataInputStream dis = new DataInputStream(bais);
90  
91      return new Iterator<Cell>() {
92        private ByteBuffer decompressedData = null;
93  
94        @Override
95        public boolean hasNext() {
96          if (decompressedData == null) {
97            return rawSize > 0;
98          }
99          return decompressedData.hasRemaining();
100       }
101 
102       @Override
103       public Cell next() {
104         if (decompressedData == null) {
105           try {
106             decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
107                 .newDataBlockDecodingContext(meta));
108           } catch (IOException e) {
109             throw new RuntimeException("Problem with data block encoder, " +
110                 "most likely it requested more bytes than are available.", e);
111           }
112           decompressedData.rewind();
113         }
114         int offset = decompressedData.position();
115         int klen = decompressedData.getInt();
116         int vlen = decompressedData.getInt();
117         int tagsLen = 0;
118         ByteBufferUtils.skip(decompressedData, klen + vlen);
119         
120         if (meta.isIncludesTags()) {
121           tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
122           ByteBufferUtils.skip(decompressedData, tagsLen);
123         }
124         KeyValue kv = new KeyValue(decompressedData.array(), offset,
125             (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
126         if (meta.isIncludesMvcc()) {
127           long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
128           kv.setSequenceId(mvccVersion);
129         }
130         return kv;
131       }
132 
133       @Override
134       public void remove() {
135         throw new NotImplementedException("remove() is not supported!");
136       }
137 
138       @Override
139       public String toString() {
140         return "Iterator of: " + dataBlockEncoder.getClass().getName();
141       }
142 
143     };
144   }
145 
146   
147 
148 
149 
150   public int getSize() {
151     return getEncodedData().length;
152   }
153 
154   
155 
156 
157 
158 
159 
160 
161 
162 
163 
164 
165   public static int getCompressedSize(Algorithm algo, Compressor compressor,
166       byte[] inputBuffer, int offset, int length) throws IOException {
167     DataOutputStream compressedStream = new DataOutputStream(
168         new IOUtils.NullOutputStream());
169     if (compressor != null) {
170       compressor.reset();
171     }
172     OutputStream compressingStream = null;
173 
174     try {
175       compressingStream = algo.createCompressionStream(
176           compressedStream, compressor, 0);
177 
178       compressingStream.write(inputBuffer, offset, length);
179       compressingStream.flush();
180 
181       return compressedStream.size();
182     } finally {
183       if (compressingStream != null) compressingStream.close();
184     }
185   }
186 
187   
188 
189 
190 
191 
192 
193 
194   public int getEncodedCompressedSize(Algorithm comprAlgo,
195       Compressor compressor) throws IOException {
196     byte[] compressedBytes = getEncodedData();
197     return getCompressedSize(comprAlgo, compressor, compressedBytes, 0,
198         compressedBytes.length);
199   }
200 
201   
202   private byte[] getEncodedData() {
203     if (cachedEncodedData != null) {
204       return cachedEncodedData;
205     }
206     cachedEncodedData = encodeData();
207     return cachedEncodedData;
208   }
209 
210   private ByteBuffer getUncompressedBuffer() {
211     if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) {
212       rawBuffer = ByteBuffer.wrap(rawKVs);
213     }
214     return rawBuffer;
215   }
216 
217   
218 
219 
220 
221   public byte[] encodeData() {
222     ByteArrayOutputStream baos = new ByteArrayOutputStream();
223     try {
224       baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
225       DataOutputStream out = new DataOutputStream(baos);
226       this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
227       ByteBuffer in = getUncompressedBuffer();
228       in.rewind();
229       int klength, vlength;
230       int tagsLength = 0;
231       long memstoreTS = 0L;
232       KeyValue kv = null;
233       while (in.hasRemaining()) {
234         int kvOffset = in.position();
235         klength = in.getInt();
236         vlength = in.getInt();
237         ByteBufferUtils.skip(in, klength + vlength);
238         if (this.meta.isIncludesTags()) {
239           tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
240           ByteBufferUtils.skip(in, tagsLength);
241         }
242         if (this.meta.isIncludesMvcc()) {
243           memstoreTS = ByteBufferUtils.readVLong(in);
244         }
245         kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
246             klength, vlength, tagsLength));
247         kv.setSequenceId(memstoreTS);
248         this.dataBlockEncoder.encode(kv, encodingCtx, out);
249       }
250       BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
251       baos.writeTo(stream);
252       this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, stream.buf);
253     } catch (IOException e) {
254       throw new RuntimeException(String.format(
255           "Bug in encoding part of algorithm %s. " +
256           "Probably it requested more bytes than are available.",
257           toString()), e);
258     }
259     return baos.toByteArray();
260   }
261 
262   private static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
263     private byte[] buf;
264 
265     @Override
266     public void write(byte[] b, int off, int len) {
267       this.buf = b;
268     }
269   }
270 
271   @Override
272   public String toString() {
273     return dataBlockEncoder.toString();
274   }
275 }