1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.metrics2.util;
20
21 import java.io.IOException;
22 import java.util.Arrays;
23 import java.util.HashMap;
24 import java.util.LinkedList;
25 import java.util.ListIterator;
26 import java.util.Map;
27
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29
30 import com.google.common.annotations.VisibleForTesting;
31
32 /**
33 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
34 * for streaming calculation of targeted high-percentile epsilon-approximate
35 * quantiles.
36 *
37 * This is a generalization of the earlier work by Greenwald and Khanna (GK),
38 * which essentially allows different error bounds on the targeted quantiles,
39 * which allows for far more efficient calculation of high-percentiles.
40 *
41 * See: Cormode, Korn, Muthukrishnan, and Srivastava
42 * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
43 *
44 * Greenwald and Khanna,
45 * "Space-efficient online computation of quantile summaries" in SIGMOD 2001
46 *
47 */
48 @InterfaceAudience.Private
49 public class MetricSampleQuantiles {
50
51 /**
52 * Total number of items in stream
53 */
54 private long count = 0;
55
56 /**
57 * Current list of sampled items, maintained in sorted order with error bounds
58 */
59 private LinkedList<SampleItem> samples;
60
61 /**
62 * Buffers incoming items to be inserted in batch. Items are inserted into
63 * the buffer linearly. When the buffer fills, it is flushed into the samples
64 * array in its entirety.
65 */
66 private long[] buffer = new long[500];
67 private int bufferCount = 0;
68
69 /**
70 * Array of Quantiles that we care about, along with desired error.
71 */
72 private final MetricQuantile quantiles[];
73
74 public MetricSampleQuantiles(MetricQuantile[] quantiles) {
75 this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
76 this.samples = new LinkedList<SampleItem>();
77 }
78
79 /**
80 * Specifies the allowable error for this rank, depending on which quantiles
81 * are being targeted.
82 *
83 * This is the f(r_i, n) function from the CKMS paper. It's basically how wide
84 * the range of this rank can be.
85 *
86 * @param rank
87 * the index in the list of samples
88 */
89 private double allowableError(int rank) {
90 int size = samples.size();
91 double minError = size + 1;
92 for (MetricQuantile q : quantiles) {
93 double error;
94 if (rank <= q.quantile * size) {
95 error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
96 } else {
97 error = (2.0 * q.error * rank) / q.quantile;
98 }
99 if (error < minError) {
100 minError = error;
101 }
102 }
103
104 return minError;
105 }
106
107 /**
108 * Add a new value from the stream.
109 *
110 * @param v
111 */
112 synchronized public void insert(long v) {
113 buffer[bufferCount] = v;
114 bufferCount++;
115
116 count++;
117
118 if (bufferCount == buffer.length) {
119 insertBatch();
120 compress();
121 }
122 }
123
124 /**
125 * Merges items from buffer into the samples array in one pass.
126 * This is more efficient than doing an insert on every item.
127 */
128 private void insertBatch() {
129 if (bufferCount == 0) {
130 return;
131 }
132
133 Arrays.sort(buffer, 0, bufferCount);
134
135 // Base case: no samples
136 int start = 0;
137 if (samples.size() == 0) {
138 SampleItem newItem = new SampleItem(buffer[0], 1, 0);
139 samples.add(newItem);
140 start++;
141 }
142
143 ListIterator<SampleItem> it = samples.listIterator();
144 SampleItem item = it.next();
145 for (int i = start; i < bufferCount; i++) {
146 long v = buffer[i];
147 while (it.nextIndex() < samples.size() && item.value < v) {
148 item = it.next();
149 }
150 // If we found that bigger item, back up so we insert ourselves before it
151 if (item.value > v) {
152 it.previous();
153 }
154 // We use different indexes for the edge comparisons, because of the above
155 // if statement that adjusts the iterator
156 int delta;
157 if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
158 delta = 0;
159 } else {
160 delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
161 }
162 SampleItem newItem = new SampleItem(v, 1, delta);
163 it.add(newItem);
164 item = newItem;
165 }
166
167 bufferCount = 0;
168 }
169
170 /**
171 * Try to remove extraneous items from the set of sampled items. This checks
172 * if an item is unnecessary based on the desired error bounds, and merges it
173 * with the adjacent item if it is.
174 */
175 private void compress() {
176 if (samples.size() < 2) {
177 return;
178 }
179
180 ListIterator<SampleItem> it = samples.listIterator();
181 SampleItem prev = null;
182 SampleItem next = it.next();
183
184 while (it.hasNext()) {
185 prev = next;
186 next = it.next();
187 if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
188 next.g += prev.g;
189 // Remove prev. it.remove() kills the last thing returned.
190 it.previous();
191 it.previous();
192 it.remove();
193 // it.next() is now equal to next, skip it back forward again
194 it.next();
195 }
196 }
197 }
198
199 /**
200 * Get the estimated value at the specified quantile.
201 *
202 * @param quantile Queried quantile, e.g. 0.50 or 0.99.
203 * @return Estimated value at that quantile.
204 */
205 private long query(double quantile) throws IOException {
206 if (samples.size() == 0) {
207 throw new IOException("No samples present");
208 }
209
210 int rankMin = 0;
211 int desired = (int) (quantile * count);
212
213 for (int i = 1; i < samples.size(); i++) {
214 SampleItem prev = samples.get(i - 1);
215 SampleItem cur = samples.get(i);
216
217 rankMin += prev.g;
218
219 if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
220 return prev.value;
221 }
222 }
223
224 // edge case of wanting max value
225 return samples.get(samples.size() - 1).value;
226 }
227
228 /**
229 * Get a snapshot of the current values of all the tracked quantiles.
230 *
231 * @return snapshot of the tracked quantiles
232 * @throws IOException
233 * if no items have been added to the estimator
234 */
235 synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
236 // flush the buffer first for best results
237 insertBatch();
238 Map<MetricQuantile, Long> values = new HashMap<MetricQuantile, Long>(quantiles.length);
239 for (int i = 0; i < quantiles.length; i++) {
240 values.put(quantiles[i], query(quantiles[i].quantile));
241 }
242
243 return values;
244 }
245
246 /**
247 * Returns the number of items that the estimator has processed
248 *
249 * @return count total number of items processed
250 */
251 synchronized public long getCount() {
252 return count;
253 }
254
255 /**
256 * Returns the number of samples kept by the estimator
257 *
258 * @return count current number of samples
259 */
260 @VisibleForTesting
261 synchronized public int getSampleCount() {
262 return samples.size();
263 }
264
265 /**
266 * Resets the estimator, clearing out all previously inserted items
267 */
268 synchronized public void clear() {
269 count = 0;
270 bufferCount = 0;
271 samples.clear();
272 }
273
274 /**
275 * Describes a measured value passed to the estimator, tracking additional
276 * metadata required by the CKMS algorithm.
277 */
278 private static class SampleItem {
279
280 /**
281 * Value of the sampled item (e.g. a measured latency value)
282 */
283 public final long value;
284
285 /**
286 * Difference between the lowest possible rank of the previous item, and
287 * the lowest possible rank of this item.
288 *
289 * The sum of the g of all previous items yields this item's lower bound.
290 */
291 public int g;
292
293 /**
294 * Difference between the item's greatest possible rank and lowest possible
295 * rank.
296 */
297 public final int delta;
298
299 public SampleItem(long value, int lowerDelta, int delta) {
300 this.value = value;
301 this.g = lowerDelta;
302 this.delta = delta;
303 }
304
305 @Override
306 public String toString() {
307 return String.format("%d, %d, %d", value, g, delta);
308 }
309 }
310 }