1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.mob;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.Tag;
35  import org.apache.hadoop.hbase.TagType;
36  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
37  import org.apache.hadoop.hbase.regionserver.*;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.util.StringUtils;
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  @InterfaceAudience.Private
57  public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
58  
59    private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
60    private final Object flushLock = new Object();
61    private long mobCellValueSizeThreshold = 0;
62    private Path targetPath;
63    private HMobStore mobStore;
64  
65    public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
66      super(conf, store);
67      mobCellValueSizeThreshold = store.getFamily().getMobThreshold();
68      this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
69          store.getColumnFamilyName());
70      if (!this.store.getFileSystem().exists(targetPath)) {
71        this.store.getFileSystem().mkdirs(targetPath);
72      }
73      this.mobStore = (HMobStore) store;
74    }
75  
76    
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88  
89  
90    @Override
91    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
92        MonitoredTask status) throws IOException {
93      ArrayList<Path> result = new ArrayList<Path>();
94      int cellsCount = snapshot.getCellsCount();
95      if (cellsCount == 0) return result; 
96  
97      
98      long smallestReadPoint = store.getSmallestReadPoint();
99      InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
100     if (scanner == null) {
101       return result; 
102     }
103     StoreFile.Writer writer;
104     try {
105       
106       
107       synchronized (flushLock) {
108         status.setStatus("Flushing " + store + ": creating writer");
109         
110         writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
111             false, true, true);
112         writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
113         try {
114           
115           
116           performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
117         } finally {
118           finalizeWriter(writer, cacheFlushId, status);
119         }
120       }
121     } finally {
122       scanner.close();
123     }
124     LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
125         + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
126         ", hasBloomFilter=" + writer.hasGeneralBloom() +
127         ", into tmp file " + writer.getPath());
128     result.add(writer.getPath());
129     return result;
130   }
131 
132   
133 
134 
135 
136 
137 
138 
139 
140 
141 
142 
143 
144 
145 
146 
147 
148 
149   protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
150       InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
151     StoreFile.Writer mobFileWriter = null;
152     int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
153         HConstants.COMPACTION_KV_MAX_DEFAULT);
154     long mobCount = 0;
155     long mobSize = 0;
156     long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
157     mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
158         store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
159     
160     
161     byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
162     try {
163       Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
164           .getName());
165       List<Cell> cells = new ArrayList<Cell>();
166       boolean hasMore;
167       ScannerContext scannerContext =
168               ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
169 
170       do {
171         hasMore = scanner.next(cells, scannerContext);
172         if (!cells.isEmpty()) {
173           for (Cell c : cells) {
174             
175             
176             
177             if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c)
178                 || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
179               writer.append(c);
180             } else {
181               
182               mobFileWriter.append(c);
183               mobSize += c.getValueLength();
184               mobCount++;
185 
186               
187               
188               KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
189               writer.append(reference);
190             }
191           }
192           cells.clear();
193         }
194       } while (hasMore);
195     } finally {
196       status.setStatus("Flushing mob file " + store + ": appending metadata");
197       mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
198       status.setStatus("Flushing mob file " + store + ": closing flushed file");
199       mobFileWriter.close();
200     }
201 
202     if (mobCount > 0) {
203       
204       
205       
206       
207       mobStore.commitFile(mobFileWriter.getPath(), targetPath);
208       mobStore.updateMobFlushCount();
209       mobStore.updateMobFlushedCellsCount(mobCount);
210       mobStore.updateMobFlushedCellsSize(mobSize);
211     } else {
212       try {
213         
214         store.getFileSystem().delete(mobFileWriter.getPath(), true);
215       } catch (IOException e) {
216         LOG.error("Failed to delete the temp mob file", e);
217       }
218     }
219   }
220 }