1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.mob.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.Tag;
35  import org.apache.hadoop.hbase.TagType;
36  import org.apache.hadoop.hbase.client.BufferedMutator;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.io.crypto.Encryption;
39  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
40  import org.apache.hadoop.hbase.mob.MobConstants;
41  import org.apache.hadoop.hbase.mob.MobUtils;
42  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
43  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
44  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
45  import org.apache.hadoop.hbase.regionserver.MemStore;
46  import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
47  import org.apache.hadoop.hbase.regionserver.StoreFile;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.mapreduce.Reducer.Context;
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  @InterfaceAudience.Private
65  public class MemStoreWrapper {
66  
67    private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
68  
69    private MemStore memstore;
70    private long flushSize;
71    private CompactionPartitionId partitionId;
72    private Context context;
73    private Configuration conf;
74    private BufferedMutator table;
75    private HColumnDescriptor hcd;
76    private Path mobFamilyDir;
77    private FileSystem fs;
78    private CacheConfig cacheConfig;
79    private Encryption.Context cryptoContext = Encryption.Context.NONE;
80  
81    public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table,
82      HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException {
83      this.memstore = memstore;
84      this.context = context;
85      this.fs = fs;
86      this.table = table;
87      this.hcd = hcd;
88      this.conf = context.getConfiguration();
89      this.cacheConfig = cacheConfig;
90      flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
91          MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
92      mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
93      cryptoContext = MobUtils.createEncryptionContext(conf, hcd);
94    }
95  
96    public void setPartitionId(CompactionPartitionId partitionId) {
97      this.partitionId = partitionId;
98    }
99  
100   
101 
102 
103 
104   private void flushMemStoreIfNecessary() throws IOException {
105     if (memstore.heapSize() >= flushSize) {
106       flushMemStore();
107     }
108   }
109 
110   
111 
112 
113 
114   public void flushMemStore() throws IOException {
115     MemStoreSnapshot snapshot = memstore.snapshot();
116     internalFlushCache(snapshot);
117     memstore.clearSnapshot(snapshot.getId());
118   }
119 
120   
121 
122 
123 
124 
125 
126   private void internalFlushCache(final MemStoreSnapshot snapshot)
127       throws IOException {
128     if (snapshot.getCellsCount() == 0) {
129       return;
130     }
131     
132     String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
133     StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
134       new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(),
135       partitionId.getStartKey(), cacheConfig, cryptoContext);
136 
137     String relativePath = mobFileWriter.getPath().getName();
138     LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
139 
140     byte[] referenceValue = Bytes.toBytes(relativePath);
141     KeyValueScanner scanner = snapshot.getScanner();
142     Cell cell = null;
143     while (null != (cell = scanner.next())) {
144       mobFileWriter.append(cell);
145     }
146     scanner.close();
147     
148     
149     mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
150     mobFileWriter.close();
151 
152     MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
153     context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
154     
155     scanner = snapshot.getScanner();
156     scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
157     cell = null;
158     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName()
159       .toString()));
160     long updatedCount = 0;
161     while (null != (cell = scanner.next())) {
162       KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
163       Put put =
164           new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
165       put.add(reference);
166       table.mutate(put);
167       updatedCount++;
168     }
169     table.flush();
170     context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount);
171     scanner.close();
172   }
173 
174   
175 
176 
177 
178 
179   public void addToMemstore(KeyValue kv) throws IOException {
180     memstore.add(kv);
181     
182     flushMemStoreIfNecessary();
183   }
184 
185 }