1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.mob.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.UUID;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FSDataOutputStream;
35  import org.apache.hadoop.fs.FileStatus;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.fs.PathFilter;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.InvalidFamilyOperationException;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.KeyValueUtil;
45  import org.apache.hadoop.hbase.TableName;
46  import org.apache.hadoop.hbase.client.Admin;
47  import org.apache.hadoop.hbase.client.BufferedMutator;
48  import org.apache.hadoop.hbase.client.BufferedMutatorParams;
49  import org.apache.hadoop.hbase.client.Connection;
50  import org.apache.hadoop.hbase.client.ConnectionFactory;
51  import org.apache.hadoop.hbase.io.HFileLink;
52  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
53  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
54  import org.apache.hadoop.hbase.mob.MobConstants;
55  import org.apache.hadoop.hbase.mob.MobFile;
56  import org.apache.hadoop.hbase.mob.MobFileName;
57  import org.apache.hadoop.hbase.mob.MobUtils;
58  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
59  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
60  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
61  import org.apache.hadoop.hbase.regionserver.BloomType;
62  import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
63  import org.apache.hadoop.hbase.regionserver.StoreFile;
64  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.FSUtils;
67  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
68  import org.apache.hadoop.io.IOUtils;
69  import org.apache.hadoop.io.SequenceFile;
70  import org.apache.hadoop.io.SequenceFile.CompressionType;
71  import org.apache.hadoop.io.Text;
72  import org.apache.hadoop.io.Writable;
73  import org.apache.hadoop.mapreduce.Reducer;
74  import org.apache.zookeeper.KeeperException;
75  
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  @InterfaceAudience.Private
88  public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
89  
90    private static final Log LOG = LogFactory.getLog(SweepReducer.class);
91  
92    private SequenceFile.Writer writer = null;
93    private MemStoreWrapper memstore;
94    private Configuration conf;
95    private FileSystem fs;
96  
97    private Path familyDir;
98    private CacheConfig cacheConfig;
99    private long compactionBegin;
100   private BufferedMutator table;
101   private HColumnDescriptor family;
102   private long mobCompactionDelay;
103   private Path mobTableDir;
104 
105   @Override
106   protected void setup(Context context) throws IOException, InterruptedException {
107     this.conf = context.getConfiguration();
108     Connection c = ConnectionFactory.createConnection(this.conf);
109     this.fs = FileSystem.get(conf);
110     
111     mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
112     String tableName = conf.get(TableInputFormat.INPUT_TABLE);
113     String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
114     TableName tn = TableName.valueOf(tableName);
115     this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
116     Admin admin = c.getAdmin();
117     try {
118       family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
119       if (family == null) {
120         
121         throw new InvalidFamilyOperationException("Column family '" + familyName
122             + "' does not exist. It might be removed.");
123       }
124     } finally {
125       try {
126         admin.close();
127       } catch (IOException e) {
128         LOG.warn("Failed to close the HBaseAdmin", e);
129       }
130     }
131     
132     Configuration copyOfConf = new Configuration(conf);
133     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
134     this.cacheConfig = new CacheConfig(copyOfConf);
135 
136     table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024));
137     memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
138 
139     
140     
141     
142     this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
143     mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
144   }
145 
146   private SweepPartition createPartition(CompactionPartitionId id, Context context)
147     throws IOException {
148     return new SweepPartition(id, context);
149   }
150 
151   @Override
152   public void run(Context context) throws IOException, InterruptedException {
153     String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
154     String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
155     String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
156     ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
157         new DummyMobAbortable());
158     FSDataOutputStream fout = null;
159     try {
160       SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
161       tracker.start();
162       setup(context);
163       
164       String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
165       Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
166           .replace("-", MobConstants.EMPTY_STRING));
167       fout = fs.create(nameFilePath, true);
168       writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
169           String.class, CompressionType.NONE, null);
170       CompactionPartitionId id;
171       SweepPartition partition = null;
172       
173       while (context.nextKey()) {
174         Text key = context.getCurrentKey();
175         String keyString = key.toString();
176         id = createPartitionId(keyString);
177         if (null == partition || !id.equals(partition.getId())) {
178           
179           if (null != partition) {
180             
181             
182             partition.close();
183           }
184           
185           partition = createPartition(id, context);
186         }
187         if (partition != null) {
188           
189           partition.execute(key, context.getValues());
190         }
191       }
192       if (null != partition) {
193         partition.close();
194       }
195       writer.hflush();
196     } catch (KeeperException e) {
197       throw new IOException(e);
198     } finally {
199       cleanup(context);
200       zkw.close();
201       if (writer != null) {
202         IOUtils.closeStream(writer);
203       }
204       if (fout != null) {
205         IOUtils.closeStream(fout);
206       }
207       if (table != null) {
208         try {
209           table.close();
210         } catch (IOException e) {
211           LOG.warn(e);
212         }
213       }
214     }
215 
216   }
217 
218   
219 
220 
221 
222   public class SweepPartition {
223 
224     private final CompactionPartitionId id;
225     private final Context context;
226     private boolean memstoreUpdated = false;
227     private boolean mergeSmall = false;
228     private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
229     private final List<Path> toBeDeleted = new ArrayList<Path>();
230 
231     public SweepPartition(CompactionPartitionId id, Context context) throws IOException {
232       this.id = id;
233       this.context = context;
234       memstore.setPartitionId(id);
235       init();
236     }
237 
238     public CompactionPartitionId getId() {
239       return this.id;
240     }
241 
242     
243 
244 
245 
246 
247     private void init() throws IOException {
248       FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
249       if (null == fileStats) {
250         return;
251       }
252 
253       int smallFileCount = 0;
254       float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
255           MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
256       long compactionMergeableSize = conf.getLong(
257           MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
258           MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
259       
260       
261       for (FileStatus fileStat : fileStats) {
262         MobFileStatus mobFileStatus = null;
263         if (!HFileLink.isHFileLink(fileStat.getPath())) {
264           mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
265           if (mobFileStatus.needMerge()) {
266             smallFileCount++;
267           }
268           
269           fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
270         }
271       }
272       if (smallFileCount >= 2) {
273         
274         this.mergeSmall = true;
275       }
276     }
277 
278     
279 
280 
281 
282 
283     public void close() throws IOException {
284       if (null == id) {
285         return;
286       }
287       
288       if (memstoreUpdated) {
289         memstore.flushMemStore();
290       }
291       List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
292       
293       for (Path path : toBeDeleted) {
294         LOG.info("[In Partition close] Delete the file " + path + " in partition close");
295         storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
296       }
297       if (!storeFiles.isEmpty()) {
298         try {
299           MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
300               storeFiles);
301           context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
302         } catch (IOException e) {
303           LOG.error("Failed to archive the store files " + storeFiles, e);
304         }
305         storeFiles.clear();
306       }
307       fileStatusMap.clear();
308     }
309 
310     
311 
312 
313 
314 
315 
316     public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
317       if (null == values) {
318         return;
319       }
320       MobFileName mobFileName = MobFileName.create(fileName.toString());
321       LOG.info("[In reducer] The file name: " + fileName.toString());
322       MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
323       if (null == mobFileStat) {
324         LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
325         return;
326       }
327       
328       if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
329           <= mobCompactionDelay) {
330         return;
331       }
332       
333       writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
334       Set<KeyValue> kvs = new HashSet<KeyValue>();
335       for (KeyValue kv : values) {
336         if (kv.getValueLength() > Bytes.SIZEOF_INT) {
337           mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
338               Bytes.SIZEOF_INT));
339         }
340         kvs.add(kv.createKeyOnly(false));
341       }
342       
343       if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
344         context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
345         MobFile file = MobFile.create(fs,
346             new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
347         StoreFileScanner scanner = null;
348         file.open();
349         try {
350           scanner = file.getScanner();
351           scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
352           Cell cell;
353           while (null != (cell = scanner.next())) {
354             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
355             KeyValue keyOnly = kv.createKeyOnly(false);
356             if (kvs.contains(keyOnly)) {
357               
358               memstore.addToMemstore(kv);
359               memstoreUpdated = true;
360             }
361           }
362         } finally {
363           if (scanner != null) {
364             scanner.close();
365           }
366           file.close();
367         }
368         toBeDeleted.add(mobFileStat.getFileStatus().getPath());
369       }
370     }
371 
372     
373 
374 
375 
376 
377 
378 
379     private FileStatus[] listStatus(Path p, String prefix) throws IOException {
380       return fs.listStatus(p, new PathPrefixFilter(prefix));
381     }
382   }
383 
384   static class PathPrefixFilter implements PathFilter {
385 
386     private final String prefix;
387 
388     public PathPrefixFilter(String prefix) {
389       this.prefix = prefix;
390     }
391 
392     public boolean accept(Path path) {
393       return path.getName().startsWith(prefix, 0);
394     }
395 
396   }
397 
398   
399 
400 
401 
402 
403   private CompactionPartitionId createPartitionId(String fileNameAsString) {
404     MobFileName fileName = MobFileName.create(fileNameAsString);
405     return new CompactionPartitionId(fileName.getStartKey(), fileName.getDate());
406   }
407 
408   
409 
410 
411   private static class MobFileStatus {
412     private FileStatus fileStatus;
413     private int validSize;
414     private long size;
415 
416     private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
417     private long compactionMergeableSize =
418         MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
419 
420     
421 
422 
423 
424 
425 
426 
427 
428 
429     public MobFileStatus(FileStatus fileStatus, float compactionRatio,
430         long compactionMergeableSize) {
431       this.fileStatus = fileStatus;
432       this.size = fileStatus.getLen();
433       validSize = 0;
434       this.compactionRatio = compactionRatio;
435       this.compactionMergeableSize = compactionMergeableSize;
436     }
437 
438     
439 
440 
441 
442     public void addValidSize(int size) {
443       this.validSize += size;
444     }
445 
446     
447 
448 
449 
450 
451     public boolean needClean() {
452       return validSize < compactionRatio * size;
453     }
454 
455     
456 
457 
458 
459 
460     public boolean needMerge() {
461       return this.size < compactionMergeableSize;
462     }
463 
464     
465 
466 
467 
468     public FileStatus getFileStatus() {
469       return fileStatus;
470     }
471   }
472 }