1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.KeyValueUtil;
31  import org.apache.hadoop.hbase.codec.BaseDecoder;
32  import org.apache.hadoop.hbase.codec.BaseEncoder;
33  import org.apache.hadoop.hbase.codec.Codec;
34  import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
35  import org.apache.hadoop.hbase.io.util.Dictionary;
36  import org.apache.hadoop.hbase.io.util.StreamUtils;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.ReflectionUtils;
39  import org.apache.hadoop.io.IOUtils;
40  
41  import com.google.protobuf.ByteString;
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
52  public class WALCellCodec implements Codec {
53    
54    public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
55  
56    protected final CompressionContext compression;
57    protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
58      @Override
59      public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
60        return WALCellCodec.uncompressByteString(data, dict);
61      }
62    };
63  
64    
65  
66  
67    public WALCellCodec() {
68      this.compression = null;
69    }
70  
71    
72  
73  
74  
75  
76  
77  
78    public WALCellCodec(Configuration conf, CompressionContext compression) {
79      this.compression = compression;
80    }
81  
82    public static String getWALCellCodecClass(Configuration conf) {
83      return conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
84    }
85  
86    
87  
88  
89  
90  
91  
92  
93  
94  
95  
96  
97  
98    public static WALCellCodec create(Configuration conf, String cellCodecClsName,
99        CompressionContext compression) throws UnsupportedOperationException {
100     if (cellCodecClsName == null) {
101       cellCodecClsName = getWALCellCodecClass(conf);
102     }
103     return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
104         { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
105   }
106 
107   
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118   public static WALCellCodec create(Configuration conf,
119       CompressionContext compression) throws UnsupportedOperationException {
120     String cellCodecClsName = getWALCellCodecClass(conf);
121     return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
122         { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
123   }
124 
125   public interface ByteStringCompressor {
126     ByteString compress(byte[] data, Dictionary dict) throws IOException;
127   }
128 
129   public interface ByteStringUncompressor {
130     byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
131   }
132 
133   
134   
135   
136   static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
137     public ByteString toByteString() {
138       return ByteString.copyFrom(this.buf, 0, this.count);
139     }
140 
141     @Override
142     public ByteString compress(byte[] data, Dictionary dict) throws IOException {
143       writeCompressed(data, dict);
144       ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
145       reset(); 
146       return result;
147     }
148 
149     private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
150       assert dict != null;
151       short dictIdx = dict.findEntry(data, 0, data.length);
152       if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
153         write(Dictionary.NOT_IN_DICTIONARY);
154         StreamUtils.writeRawVInt32(this, data.length);
155         write(data, 0, data.length);
156       } else {
157         StreamUtils.writeShort(this, dictIdx);
158       }
159     }
160   }
161 
162   private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
163     InputStream in = bs.newInput();
164     byte status = (byte)in.read();
165     if (status == Dictionary.NOT_IN_DICTIONARY) {
166       byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
167       int bytesRead = in.read(arr);
168       if (bytesRead != arr.length) {
169         throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
170       }
171       if (dict != null) dict.addEntry(arr, 0, arr.length);
172       return arr;
173     } else {
174       
175       short dictIdx = StreamUtils.toShort(status, (byte)in.read());
176       byte[] entry = dict.getEntry(dictIdx);
177       if (entry == null) {
178         throw new IOException("Missing dictionary entry for index " + dictIdx);
179       }
180       return entry;
181     }
182   }
183 
184   static class CompressedKvEncoder extends BaseEncoder {
185     private final CompressionContext compression;
186     public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
187       super(out);
188       this.compression = compression;
189     }
190 
191     @Override
192     public void write(Cell cell) throws IOException {
193       
194       StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
195       StreamUtils.writeRawVInt32(out, cell.getValueLength());
196       
197       int tagsLength = cell.getTagsLength();
198       StreamUtils.writeRawVInt32(out, tagsLength);
199 
200       
201       
202       write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict);
203       write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
204           compression.familyDict);
205       write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
206           compression.qualifierDict);
207 
208       
209       StreamUtils.writeLong(out, cell.getTimestamp());
210       out.write(cell.getTypeByte());
211       out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
212       if (tagsLength > 0) {
213         if (compression.tagCompressionContext != null) {
214           
215           compression.tagCompressionContext.compressTags(out, cell.getTagsArray(),
216               cell.getTagsOffset(), tagsLength);
217         } else {
218           
219           
220           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
221         }
222       }
223     }
224 
225     private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
226       short dictIdx = Dictionary.NOT_IN_DICTIONARY;
227       if (dict != null) {
228         dictIdx = dict.findEntry(data, offset, length);
229       }
230       if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
231         out.write(Dictionary.NOT_IN_DICTIONARY);
232         StreamUtils.writeRawVInt32(out, length);
233         out.write(data, offset, length);
234       } else {
235         StreamUtils.writeShort(out, dictIdx);
236       }
237     }
238   }
239 
240   static class CompressedKvDecoder extends BaseDecoder {
241     private final CompressionContext compression;
242     public CompressedKvDecoder(InputStream in, CompressionContext compression) {
243       super(in);
244       this.compression = compression;
245     }
246 
247     @Override
248     protected Cell parseCell() throws IOException {
249       int keylength = StreamUtils.readRawVarint32(in);
250       int vlength = StreamUtils.readRawVarint32(in);
251 
252       int tagsLength = StreamUtils.readRawVarint32(in);
253       int length = 0;
254       if(tagsLength == 0) {
255         length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
256       } else {
257         length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
258       }
259 
260       byte[] backingArray = new byte[length];
261       int pos = 0;
262       pos = Bytes.putInt(backingArray, pos, keylength);
263       pos = Bytes.putInt(backingArray, pos, vlength);
264 
265       
266       int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
267       checkLength(elemLen, Short.MAX_VALUE);
268       pos = Bytes.putShort(backingArray, pos, (short)elemLen);
269       pos += elemLen;
270 
271       
272       elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
273       checkLength(elemLen, Byte.MAX_VALUE);
274       pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
275       pos += elemLen;
276 
277       
278       elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
279       pos += elemLen;
280 
281       
282       int tsTypeValLen = length - pos;
283       if (tagsLength > 0) {
284         tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
285       }
286       IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
287       pos += tsTypeValLen;
288 
289       
290       if (tagsLength > 0) {
291         pos = Bytes.putAsShort(backingArray, pos, tagsLength);
292         if (compression.tagCompressionContext != null) {
293           compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
294         } else {
295           IOUtils.readFully(in, backingArray, pos, tagsLength);
296         }
297       }
298       return new KeyValue(backingArray, 0, length);
299     }
300 
301     private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
302       byte status = (byte)in.read();
303       if (status == Dictionary.NOT_IN_DICTIONARY) {
304         
305         
306         int length = StreamUtils.readRawVarint32(in);
307         IOUtils.readFully(in, to, offset, length);
308         dict.addEntry(to, offset, length);
309         return length;
310       } else {
311         
312         short dictIdx = StreamUtils.toShort(status, (byte)in.read());
313         byte[] entry = dict.getEntry(dictIdx);
314         if (entry == null) {
315           throw new IOException("Missing dictionary entry for index " + dictIdx);
316         }
317         
318         Bytes.putBytes(to, offset, entry, 0, entry.length);
319         return entry.length;
320       }
321     }
322 
323     private static void checkLength(int len, int max) throws IOException {
324       if (len < 0 || len > max) {
325         throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
326       }
327     }
328   }
329 
330   public static class EnsureKvEncoder extends BaseEncoder {
331     public EnsureKvEncoder(OutputStream out) {
332       super(out);
333     }
334     @Override
335     public void write(Cell cell) throws IOException {
336       checkFlushed();
337       
338       KeyValueUtil.oswrite(cell, this.out, true);
339     }
340   }
341 
342   @Override
343   public Decoder getDecoder(InputStream is) {
344     return (compression == null)
345         ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
346   }
347 
348   @Override
349   public Encoder getEncoder(OutputStream os) {
350     return (compression == null)
351         ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
352   }
353 
354   public ByteStringCompressor getByteStringCompressor() {
355     
356     return new BaosAndCompressor();
357   }
358 
359   public ByteStringUncompressor getByteStringUncompressor() {
360     
361     return this.statelessUncompressor;
362   }
363 }