1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.io;
19  
20  import java.nio.ByteBuffer;
21  import java.util.Queue;
22  import java.util.concurrent.atomic.AtomicLong;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.util.BoundedArrayQueue;
29  
30  import com.google.common.annotations.VisibleForTesting;
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  @InterfaceAudience.Private
49  public class BoundedByteBufferPool {
50    private final Log LOG = LogFactory.getLog(this.getClass());
51  
52    @VisibleForTesting
53    final Queue<ByteBuffer> buffers;
54  
55    
56    private final int maxByteBufferSizeToCache;
57  
58    
59    @VisibleForTesting
60    volatile int runningAverage;
61  
62    
63    private volatile int totalReservoirCapacity;
64  
65    
66    private AtomicLong allocations = new AtomicLong(0);
67  
68    private ReentrantLock lock =  new ReentrantLock();
69  
70    
71  
72  
73  
74  
75    public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
76        final int maxToCache) {
77      this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
78      this.runningAverage = initialByteBufferSize;
79      this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
80    }
81  
82    public ByteBuffer getBuffer() {
83      ByteBuffer bb = null;
84      lock.lock();
85      try {
86        bb = this.buffers.poll();
87        if (bb != null) {
88          this.totalReservoirCapacity -= bb.capacity();
89        }
90      } finally {
91        lock.unlock();
92      }
93      if (bb != null) {
94        
95        bb.clear();
96      } else {
97        bb = ByteBuffer.allocate(this.runningAverage);
98        this.allocations.incrementAndGet();
99      }
100     if (LOG.isTraceEnabled()) {
101       LOG.trace("runningAverage=" + this.runningAverage +
102         ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() +
103         ", alloctions=" + this.allocations.get());
104     }
105     return bb;
106   }
107 
108   public void putBuffer(ByteBuffer bb) {
109     
110     if (bb.capacity() > this.maxByteBufferSizeToCache) return;
111     boolean success = false;
112     int average = 0;
113     lock.lock();
114     try {
115       success = this.buffers.offer(bb);
116       if (success) {
117         this.totalReservoirCapacity += bb.capacity();
118         average = this.totalReservoirCapacity / this.buffers.size(); 
119       }
120     } finally {
121       lock.unlock();
122     }
123     if (!success) {
124       LOG.warn("At capacity: " + this.buffers.size());
125     } else {
126       if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
127         this.runningAverage = average;
128       }
129     }
130   }
131 }