1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
32  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
33  import org.apache.hadoop.hbase.regionserver.StoreFile;
34  import org.apache.hadoop.hbase.regionserver.StoreUtils;
35  
36  
37  
38  
39  
40  
41  @InterfaceAudience.Private
42  public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
43    private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
44  
45    public RatioBasedCompactionPolicy(Configuration conf,
46                                      StoreConfigInformation storeConfigInfo) {
47      super(conf, storeConfigInfo);
48    }
49  
50    
51  
52  
53  
54    @Override
55    public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
56      throws IOException {
57      boolean result = false;
58      long mcTime = getNextMajorCompactTime(filesToCompact);
59      if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
60        return result;
61      }
62      
63      long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
64      long now = System.currentTimeMillis();
65      if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
66        
67        long cfTTL = this.storeConfigInfo.getStoreFileTtl();
68        if (filesToCompact.size() == 1) {
69          
70          StoreFile sf = filesToCompact.iterator().next();
71          Long minTimestamp = sf.getMinimumTimestamp();
72          long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();
73          if (sf.isMajorCompaction() && (cfTTL == HConstants.FOREVER || oldest < cfTTL)) {
74            float blockLocalityIndex =
75              sf.getHDFSBlockDistribution().getBlockLocalityIndex(
76              RSRpcServices.getHostname(comConf.conf, false));
77            if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
78              LOG.debug("Major compaction triggered on only store " + this
79                + "; to make hdfs blocks local, current blockLocalityIndex is "
80                + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
81              result = true;
82            } else {
83              LOG.debug("Skipping major compaction of " + this
84                + " because one (major) compacted file only, oldestTime " + oldest
85                + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex
86                + " (min " + comConf.getMinLocalityToForceCompact() + ")");
87            }
88          } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
89            LOG.debug("Major compaction triggered on store " + this
90              + ", because keyvalues outdated; time since last major compaction "
91              + (now - lowTimestamp) + "ms");
92            result = true;
93          }
94        } else {
95          LOG.debug("Major compaction triggered on store " + this
96            + "; time since last major compaction " + (now - lowTimestamp) + "ms");
97        }
98        result = true;
99      }
100     return result;
101   }
102 
103   @Override
104   protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
105     boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
106     if (!tryingMajor) {
107       candidateSelection = filterBulk(candidateSelection);
108       candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
109       candidateSelection = checkMinFilesCriteria(candidateSelection,
110         comConf.getMinFilesToCompact());
111     }
112     return new CompactionRequest(candidateSelection);
113   }
114 
115   
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133 
134 
135 
136 
137 
138 
139 
140 
141 
142 
143 
144 
145   protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
146     boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
147     if (candidates.isEmpty()) {
148       return candidates;
149     }
150 
151     
152     int start = 0;
153     double ratio = comConf.getCompactionRatio();
154     if (mayUseOffPeak) {
155       ratio = comConf.getCompactionRatioOffPeak();
156       LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
157     }
158 
159     
160     final int countOfFiles = candidates.size();
161     long[] fileSizes = new long[countOfFiles];
162     long[] sumSize = new long[countOfFiles];
163     for (int i = countOfFiles - 1; i >= 0; --i) {
164       StoreFile file = candidates.get(i);
165       fileSizes[i] = file.getReader().length();
166       
167       int tooFar = i + comConf.getMaxFilesToCompact() - 1;
168       sumSize[i] = fileSizes[i]
169         + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
170         - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
171     }
172 
173 
174     while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
175       fileSizes[start] > Math.max(comConf.getMinCompactSize(),
176           (long) (sumSize[start + 1] * ratio))) {
177       ++start;
178     }
179     if (start < countOfFiles) {
180       LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
181         + " files from " + countOfFiles + " candidates");
182     } else if (mayBeStuck) {
183       
184       int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
185       if (filesToLeave >= 0) {
186         start = filesToLeave;
187       }
188     }
189     candidates.subList(0, start).clear();
190     return candidates;
191   }
192 
193   
194 
195 
196 
197 
198 
199   public boolean needsCompaction(final Collection<StoreFile> storeFiles,
200       final List<StoreFile> filesCompacting) {
201     int numCandidates = storeFiles.size() - filesCompacting.size();
202     return numCandidates >= comConf.getMinFilesToCompact();
203   }
204 
205   
206 
207 
208   public void setMinThreshold(int minThreshold) {
209     comConf.setMinFilesToCompact(minThreshold);
210   }
211 }