1 /**
2 * Copyright The Apache Software Foundation
3 *
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.hadoop.hbase.io.hfile.bucket;
20
21
22 import java.util.Comparator;
23 import java.util.Map;
24 import java.util.Map.Entry;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
28 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
29
30 import com.google.common.collect.MinMaxPriorityQueue;
31
32 /**
33 * A memory-bound queue that will grow until an element brings total size larger
34 * than maxSize. From then on, only entries that are sorted larger than the
35 * smallest current entry will be inserted/replaced.
36 *
37 * <p>
38 * Use this when you want to find the largest elements (according to their
39 * ordering, not their heap size) that consume as close to the specified maxSize
40 * as possible. Default behavior is to grow just above rather than just below
41 * specified max.
42 */
43 @InterfaceAudience.Private
44 public class CachedEntryQueue {
45
46 private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue;
47
48 private long cacheSize;
49 private long maxSize;
50
51 /**
52 * @param maxSize the target size of elements in the queue
53 * @param blockSize expected average size of blocks
54 */
55 public CachedEntryQueue(long maxSize, long blockSize) {
56 int initialSize = (int) (maxSize / blockSize);
57 if (initialSize == 0) {
58 initialSize++;
59 }
60 queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
61
62 public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
63 Entry<BlockCacheKey, BucketEntry> entry2) {
64 return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
65 }
66
67 }).expectedSize(initialSize).create();
68 cacheSize = 0;
69 this.maxSize = maxSize;
70 }
71
72 /**
73 * Attempt to add the specified entry to this queue.
74 * <p>
75 * If the queue is smaller than the max size, or if the specified element is
76 * ordered after the smallest element in the queue, the element will be added
77 * to the queue. Otherwise, there is no side effect of this call.
78 * @param entry a bucket entry with key to try to add to the queue
79 */
80 public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) {
81 if (cacheSize < maxSize) {
82 queue.add(entry);
83 cacheSize += entry.getValue().getLength();
84 } else {
85 BucketEntry head = queue.peek().getValue();
86 if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) {
87 cacheSize += entry.getValue().getLength();
88 cacheSize -= head.getLength();
89 if (cacheSize > maxSize) {
90 queue.poll();
91 } else {
92 cacheSize += head.getLength();
93 }
94 queue.add(entry);
95 }
96 }
97 }
98
99 /**
100 * @return The next element in this queue, or {@code null} if the queue is
101 * empty.
102 */
103 public Map.Entry<BlockCacheKey, BucketEntry> poll() {
104 return queue.poll();
105 }
106
107 /**
108 * @return The last element in this queue, or {@code null} if the queue is
109 * empty.
110 */
111 public Map.Entry<BlockCacheKey, BucketEntry> pollLast() {
112 return queue.pollLast();
113 }
114
115 /**
116 * Total size of all elements in this queue.
117 * @return size of all elements currently in queue, in bytes
118 */
119 public long cacheSize() {
120 return cacheSize;
121 }
122 }