1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.util.ByteRange;
30 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
31
32 import com.google.common.base.Preconditions;
33
34 /**
35 * A memstore-local allocation buffer.
36 * <p>
37 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
38 * big (2MB) byte[] chunks from and then doles it out to threads that request
39 * slices into the array.
40 * <p>
41 * The purpose of this class is to combat heap fragmentation in the
42 * regionserver. By ensuring that all KeyValues in a given memstore refer
43 * only to large chunks of contiguous memory, we ensure that large blocks
44 * get freed up when the memstore is flushed.
45 * <p>
46 * Without the MSLAB, the byte array allocated during insertion end up
47 * interleaved throughout the heap, and the old generation gets progressively
48 * more fragmented until a stop-the-world compacting collection occurs.
49 * <p>
50 * TODO: we should probably benchmark whether word-aligning the allocations
51 * would provide a performance improvement - probably would speed up the
52 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
53 * anyway
54 */
55 @InterfaceAudience.Private
56 public class HeapMemStoreLAB implements MemStoreLAB {
57
58 static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
59 static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
60 static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
61 static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
62 // allocator
63
64 private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
65 // A queue of chunks contained by this memstore
66 private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
67 final int chunkSize;
68 final int maxAlloc;
69 private final MemStoreChunkPool chunkPool;
70
71 // This flag is for closing this instance, its set when clearing snapshot of
72 // memstore
73 private volatile boolean closed = false;
74 // This flag is for reclaiming chunks. Its set when putting chunks back to
75 // pool
76 private AtomicBoolean reclaimed = new AtomicBoolean(false);
77 // Current count of open scanners which reading data from this MemStoreLAB
78 private final AtomicInteger openScannerCount = new AtomicInteger();
79
80 // Used in testing
81 public HeapMemStoreLAB() {
82 this(new Configuration());
83 }
84
85 public HeapMemStoreLAB(Configuration conf) {
86 chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
87 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
88 this.chunkPool = MemStoreChunkPool.getPool(conf);
89
90 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
91 Preconditions.checkArgument(
92 maxAlloc <= chunkSize,
93 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
94 }
95
96 /**
97 * Allocate a slice of the given length.
98 *
99 * If the size is larger than the maximum size specified for this
100 * allocator, returns null.
101 */
102 @Override
103 public ByteRange allocateBytes(int size) {
104 Preconditions.checkArgument(size >= 0, "negative size");
105
106 // Callers should satisfy large allocations directly from JVM since they
107 // don't cause fragmentation as badly.
108 if (size > maxAlloc) {
109 return null;
110 }
111
112 while (true) {
113 Chunk c = getOrMakeChunk();
114
115 // Try to allocate from this chunk
116 int allocOffset = c.alloc(size);
117 if (allocOffset != -1) {
118 // We succeeded - this is the common case - small alloc
119 // from a big buffer
120 return new SimpleMutableByteRange(c.data, allocOffset, size);
121 }
122
123 // not enough space!
124 // try to retire this chunk
125 tryRetireChunk(c);
126 }
127 }
128
129 /**
130 * Close this instance since it won't be used any more, try to put the chunks
131 * back to pool
132 */
133 @Override
134 public void close() {
135 this.closed = true;
136 // We could put back the chunks to pool for reusing only when there is no
137 // opening scanner which will read their data
138 if (chunkPool != null && openScannerCount.get() == 0
139 && reclaimed.compareAndSet(false, true)) {
140 chunkPool.putbackChunks(this.chunkQueue);
141 }
142 }
143
144 /**
145 * Called when opening a scanner on the data of this MemStoreLAB
146 */
147 @Override
148 public void incScannerCount() {
149 this.openScannerCount.incrementAndGet();
150 }
151
152 /**
153 * Called when closing a scanner on the data of this MemStoreLAB
154 */
155 @Override
156 public void decScannerCount() {
157 int count = this.openScannerCount.decrementAndGet();
158 if (chunkPool != null && count == 0 && this.closed
159 && reclaimed.compareAndSet(false, true)) {
160 chunkPool.putbackChunks(this.chunkQueue);
161 }
162 }
163
164 /**
165 * Try to retire the current chunk if it is still
166 * <code>c</code>. Postcondition is that curChunk.get()
167 * != c
168 */
169 private void tryRetireChunk(Chunk c) {
170 curChunk.compareAndSet(c, null);
171 // If the CAS succeeds, that means that we won the race
172 // to retire the chunk. We could use this opportunity to
173 // update metrics on external fragmentation.
174 //
175 // If the CAS fails, that means that someone else already
176 // retired the chunk for us.
177 }
178
179 /**
180 * Get the current chunk, or, if there is no current chunk,
181 * allocate a new one from the JVM.
182 */
183 private Chunk getOrMakeChunk() {
184 while (true) {
185 // Try to get the chunk
186 Chunk c = curChunk.get();
187 if (c != null) {
188 return c;
189 }
190
191 // No current chunk, so we want to allocate one. We race
192 // against other allocators to CAS in an uninitialized chunk
193 // (which is cheap to allocate)
194 c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
195 if (curChunk.compareAndSet(null, c)) {
196 // we won race - now we need to actually do the expensive
197 // allocation step
198 c.init();
199 this.chunkQueue.add(c);
200 return c;
201 } else if (chunkPool != null) {
202 chunkPool.putbackChunk(c);
203 }
204 // someone else won race - that's fine, we'll try to grab theirs
205 // in the next iteration of the loop.
206 }
207 }
208
209 /**
210 * A chunk of memory out of which allocations are sliced.
211 */
212 static class Chunk {
213 /** Actual underlying data */
214 private byte[] data;
215
216 private static final int UNINITIALIZED = -1;
217 private static final int OOM = -2;
218 /**
219 * Offset for the next allocation, or the sentinel value -1
220 * which implies that the chunk is still uninitialized.
221 * */
222 private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
223
224 /** Total number of allocations satisfied from this buffer */
225 private AtomicInteger allocCount = new AtomicInteger();
226
227 /** Size of chunk in bytes */
228 private final int size;
229
230 /**
231 * Create an uninitialized chunk. Note that memory is not allocated yet, so
232 * this is cheap.
233 * @param size in bytes
234 */
235 Chunk(int size) {
236 this.size = size;
237 }
238
239 /**
240 * Actually claim the memory for this chunk. This should only be called from
241 * the thread that constructed the chunk. It is thread-safe against other
242 * threads calling alloc(), who will block until the allocation is complete.
243 */
244 public void init() {
245 assert nextFreeOffset.get() == UNINITIALIZED;
246 try {
247 if (data == null) {
248 data = new byte[size];
249 }
250 } catch (OutOfMemoryError e) {
251 boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
252 assert failInit; // should be true.
253 throw e;
254 }
255 // Mark that it's ready for use
256 boolean initted = nextFreeOffset.compareAndSet(
257 UNINITIALIZED, 0);
258 // We should always succeed the above CAS since only one thread
259 // calls init()!
260 Preconditions.checkState(initted,
261 "Multiple threads tried to init same chunk");
262 }
263
264 /**
265 * Reset the offset to UNINITIALIZED before before reusing an old chunk
266 */
267 void reset() {
268 if (nextFreeOffset.get() != UNINITIALIZED) {
269 nextFreeOffset.set(UNINITIALIZED);
270 allocCount.set(0);
271 }
272 }
273
274 /**
275 * Try to allocate <code>size</code> bytes from the chunk.
276 * @return the offset of the successful allocation, or -1 to indicate not-enough-space
277 */
278 public int alloc(int size) {
279 while (true) {
280 int oldOffset = nextFreeOffset.get();
281 if (oldOffset == UNINITIALIZED) {
282 // The chunk doesn't have its data allocated yet.
283 // Since we found this in curChunk, we know that whoever
284 // CAS-ed it there is allocating it right now. So spin-loop
285 // shouldn't spin long!
286 Thread.yield();
287 continue;
288 }
289 if (oldOffset == OOM) {
290 // doh we ran out of ram. return -1 to chuck this away.
291 return -1;
292 }
293
294 if (oldOffset + size > data.length) {
295 return -1; // alloc doesn't fit
296 }
297
298 // Try to atomically claim this chunk
299 if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
300 // we got the alloc
301 allocCount.incrementAndGet();
302 return oldOffset;
303 }
304 // we raced and lost alloc, try again
305 }
306 }
307
308 @Override
309 public String toString() {
310 return "Chunk@" + System.identityHashCode(this) +
311 " allocs=" + allocCount.get() + "waste=" +
312 (data.length - nextFreeOffset.get());
313 }
314 }
315 }