1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.util.List;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27
28 /**
29 * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
30 * invocations of {@link InternalScanner#next(java.util.List)} and
31 * {@link RegionScanner#next(java.util.List)}.
32 * <p>
33 * A ScannerContext instance should be updated periodically throughout execution whenever progress
34 * towards a limit has been made. Each limit can be checked via the appropriate checkLimit method.
35 * <p>
36 * Once a limit has been reached, the scan will stop. The invoker of
37 * {@link InternalScanner#next(java.util.List)} or {@link RegionScanner#next(java.util.List)} can
38 * use the appropriate check*Limit methods to see exactly which limits have been reached.
39 * Alternatively, {@link #checkAnyLimitReached(LimitScope)} is provided to see if ANY limit was
40 * reached
41 * <p>
42 * {@link NoLimitScannerContext#NO_LIMIT} is an immutable static definition that can be used
43 * whenever a {@link ScannerContext} is needed but limits do not need to be enforced.
44 * <p>
45 * NOTE: It is important that this class only ever expose setter methods that can be safely skipped
46 * when limits should be NOT enforced. This is because of the necessary immutability of the class
47 * {@link NoLimitScannerContext}. If a setter cannot be safely skipped, the immutable nature of
48 * {@link NoLimitScannerContext} will lead to incorrect behavior.
49 */
50 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
51 @InterfaceStability.Evolving
52 public class ScannerContext {
53 private final Log LOG = LogFactory.getLog(this.getClass());
54
55 /**
56 * Two sets of the same fields. One for the limits, another for the progress towards those limits
57 */
58 LimitFields limits;
59 LimitFields progress;
60
61 /**
62 * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
63 * or {@link RegionScanner#next(java.util.List)}.
64 */
65 NextState scannerState;
66 private static final NextState DEFAULT_STATE = NextState.MORE_VALUES;
67
68 /**
69 * Used as an indication to invocations of {@link InternalScanner#next(java.util.List)} and
70 * {@link RegionScanner#next(java.util.List)} that, if true, the progress tracked within this
71 * {@link ScannerContext} instance should be considered while evaluating the limits. Useful for
72 * enforcing a set of limits across multiple calls (i.e. the limit may not be reached in a single
73 * invocation, but any progress made should be considered in future invocations)
74 * <p>
75 * Defaulting this value to false means that, by default, any tracked progress will be wiped clean
76 * on invocations to {@link InternalScanner#next(java.util.List)} and
77 * {@link RegionScanner#next(java.util.List)} and the call will be treated as though no progress
78 * has been made towards the limits so far.
79 * <p>
80 * This is an important mechanism. Users of Internal/Region scanners expect that they can define
81 * some limits and then repeatedly invoke {@link InternalScanner#next(List)} or
82 * {@link RegionScanner#next(List)} where each invocation respects these limits separately.
83 * <p>
84 * For example: <code><pre>
85 * ScannerContext context = new ScannerContext.newBuilder().setBatchLimit(5).build();
86 * RegionScanner scanner = ...
87 * List<Cell> results = new ArrayList<Cell>();
88 * while(scanner.next(results, context)) {
89 * // Do something with a batch of 5 cells
90 * }
91 * </pre></code> However, in the case of RPCs, the server wants to be able to define a set of
92 * limits for a particular RPC request and have those limits respected across multiple
93 * invocations. This means that the progress made towards the limits in earlier calls will be
94 * saved and considered in future invocations
95 */
96 boolean keepProgress;
97 private static boolean DEFAULT_KEEP_PROGRESS = false;
98
99 ScannerContext(boolean keepProgress, LimitFields limitsToCopy) {
100 this.limits = new LimitFields();
101 if (limitsToCopy != null) this.limits.copy(limitsToCopy);
102
103 // Progress fields are initialized to 0
104 progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
105
106 this.keepProgress = keepProgress;
107 this.scannerState = DEFAULT_STATE;
108 }
109
110 /**
111 * @return true if the progress tracked so far in this instance will be considered during an
112 * invocation of {@link InternalScanner#next(java.util.List)} or
113 * {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far
114 * should not be considered and should instead be wiped away via {@link #clearProgress()}
115 */
116 boolean getKeepProgress() {
117 return keepProgress;
118 }
119
120 void setKeepProgress(boolean keepProgress) {
121 this.keepProgress = keepProgress;
122 }
123
124 /**
125 * Progress towards the batch limit has been made. Increment internal tracking of batch progress
126 */
127 void incrementBatchProgress(int batch) {
128 int currentBatch = progress.getBatch();
129 progress.setBatch(currentBatch + batch);
130 }
131
132 /**
133 * Progress towards the size limit has been made. Increment internal tracking of size progress
134 */
135 void incrementSizeProgress(long size) {
136 long currentSize = progress.getSize();
137 progress.setSize(currentSize + size);
138 }
139
140 /**
141 * Update the time progress with {@link System#currentTimeMillis()}
142 */
143 void updateTimeProgress() {
144 progress.setTime(System.currentTimeMillis());
145 }
146
147 int getBatchProgress() {
148 return progress.getBatch();
149 }
150
151 long getSizeProgress() {
152 return progress.getSize();
153 }
154
155 long getTimeProgress() {
156 return progress.getTime();
157 }
158
159 void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
160 setBatchProgress(batchProgress);
161 setSizeProgress(sizeProgress);
162 setTimeProgress(timeProgress);
163 }
164
165 void setSizeProgress(long sizeProgress) {
166 progress.setSize(sizeProgress);
167 }
168
169 void setBatchProgress(int batchProgress) {
170 progress.setBatch(batchProgress);
171 }
172
173 void setTimeProgress(long timeProgress) {
174 progress.setTime(timeProgress);
175 }
176
177 /**
178 * Clear away any progress that has been made so far. All progress fields are reset to initial
179 * values
180 */
181 void clearProgress() {
182 progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
183 }
184
185 /**
186 * Note that this is not a typical setter. This setter returns the {@link NextState} that was
187 * passed in so that methods can be invoked against the new state. Furthermore, this pattern
188 * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
189 * new state, thus preserving the immutability of {@link NoLimitScannerContext}
190 * @param state
191 * @return The state that was passed in.
192 */
193 NextState setScannerState(NextState state) {
194 if (!NextState.isValidState(state)) {
195 throw new IllegalArgumentException("Cannot set to invalid state: " + state);
196 }
197
198 this.scannerState = state;
199 return state;
200 }
201
202 /**
203 * @return true when a partial result is formed. A partial result is formed when a limit is
204 * reached in the middle of a row.
205 */
206 boolean partialResultFormed() {
207 return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
208 || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW;
209 }
210
211 /**
212 * @param checkerScope
213 * @return true if the batch limit can be enforced in the checker's scope
214 */
215 boolean hasBatchLimit(LimitScope checkerScope) {
216 return limits.canEnforceBatchLimitFromScope(checkerScope) && limits.getBatch() > 0;
217 }
218
219 /**
220 * @param checkerScope
221 * @return true if the size limit can be enforced in the checker's scope
222 */
223 boolean hasSizeLimit(LimitScope checkerScope) {
224 return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0;
225 }
226
227 /**
228 * @param checkerScope
229 * @return true if the time limit can be enforced in the checker's scope
230 */
231 boolean hasTimeLimit(LimitScope checkerScope) {
232 return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
233 }
234
235 /**
236 * @param checkerScope
237 * @return true if any limit can be enforced within the checker's scope
238 */
239 boolean hasAnyLimit(LimitScope checkerScope) {
240 return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope);
241 }
242
243 /**
244 * @param scope The scope in which the size limit will be enforced
245 */
246 void setSizeLimitScope(LimitScope scope) {
247 limits.setSizeScope(scope);
248 }
249
250 /**
251 * @param scope The scope in which the time limit will be enforced
252 */
253 void setTimeLimitScope(LimitScope scope) {
254 limits.setTimeScope(scope);
255 }
256
257 int getBatchLimit() {
258 return limits.getBatch();
259 }
260
261 long getSizeLimit() {
262 return limits.getSize();
263 }
264
265 long getTimeLimit() {
266 return limits.getTime();
267 }
268
269 /**
270 * @param checkerScope The scope that the limit is being checked from
271 * @return true when the limit is enforceable from the checker's scope and it has been reached
272 */
273 boolean checkBatchLimit(LimitScope checkerScope) {
274 return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch();
275 }
276
277 /**
278 * @param checkerScope The scope that the limit is being checked from
279 * @return true when the limit is enforceable from the checker's scope and it has been reached
280 */
281 boolean checkSizeLimit(LimitScope checkerScope) {
282 return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize();
283 }
284
285 /**
286 * @param checkerScope The scope that the limit is being checked from. The time limit is always
287 * checked against {@link System#currentTimeMillis()}
288 * @return true when the limit is enforceable from the checker's scope and it has been reached
289 */
290 boolean checkTimeLimit(LimitScope checkerScope) {
291 return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
292 }
293
294 /**
295 * @param checkerScope The scope that the limits are being checked from
296 * @return true when some limit is enforceable from the checker's scope and it has been reached
297 */
298 boolean checkAnyLimitReached(LimitScope checkerScope) {
299 return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope)
300 || checkTimeLimit(checkerScope);
301 }
302
303 @Override
304 public String toString() {
305 StringBuilder sb = new StringBuilder();
306 sb.append("{");
307
308 sb.append("limits:");
309 sb.append(limits);
310
311 sb.append(", progress:");
312 sb.append(progress);
313
314 sb.append(", keepProgress:");
315 sb.append(keepProgress);
316
317 sb.append(", state:");
318 sb.append(scannerState);
319
320 sb.append("}");
321 return sb.toString();
322 }
323
324 public static Builder newBuilder() {
325 return new Builder();
326 }
327
328 public static Builder newBuilder(boolean keepProgress) {
329 return new Builder(keepProgress);
330 }
331
332 public static final class Builder {
333 boolean keepProgress = DEFAULT_KEEP_PROGRESS;
334 LimitFields limits = new LimitFields();
335
336 private Builder() {
337 }
338
339 private Builder(boolean keepProgress) {
340 this.keepProgress = keepProgress;
341 }
342
343 public Builder setKeepProgress(boolean keepProgress) {
344 this.keepProgress = keepProgress;
345 return this;
346 }
347
348 public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
349 limits.setSize(sizeLimit);
350 limits.setSizeScope(sizeScope);
351 return this;
352 }
353
354 public Builder setTimeLimit(LimitScope timeScope, long timeLimit) {
355 limits.setTime(timeLimit);
356 limits.setTimeScope(timeScope);
357 return this;
358 }
359
360 public Builder setBatchLimit(int batchLimit) {
361 limits.setBatch(batchLimit);
362 return this;
363 }
364
365 public ScannerContext build() {
366 return new ScannerContext(keepProgress, limits);
367 }
368 }
369
370 /**
371 * The possible states a scanner may be in following a call to {@link InternalScanner#next(List)}
372 */
373 public enum NextState {
374 MORE_VALUES(true, false),
375 NO_MORE_VALUES(false, false),
376 SIZE_LIMIT_REACHED(true, true),
377
378 /**
379 * Special case of size limit reached to indicate that the size limit was reached in the middle
380 * of a row and thus a partial results was formed
381 */
382 SIZE_LIMIT_REACHED_MID_ROW(true, true),
383 TIME_LIMIT_REACHED(true, true),
384
385 /**
386 * Special case of time limit reached to indicate that the time limit was reached in the middle
387 * of a row and thus a partial results was formed
388 */
389 TIME_LIMIT_REACHED_MID_ROW(true, true),
390 BATCH_LIMIT_REACHED(true, true);
391
392 private boolean moreValues;
393 private boolean limitReached;
394
395 private NextState(boolean moreValues, boolean limitReached) {
396 this.moreValues = moreValues;
397 this.limitReached = limitReached;
398 }
399
400 /**
401 * @return true when the state indicates that more values may follow those that have been
402 * returned
403 */
404 public boolean hasMoreValues() {
405 return this.moreValues;
406 }
407
408 /**
409 * @return true when the state indicates that a limit has been reached and scan should stop
410 */
411 public boolean limitReached() {
412 return this.limitReached;
413 }
414
415 public static boolean isValidState(NextState state) {
416 return state != null;
417 }
418
419 public static boolean hasMoreValues(NextState state) {
420 return isValidState(state) && state.hasMoreValues();
421 }
422 }
423
424 /**
425 * The various scopes where a limit can be enforced. Used to differentiate when a limit should be
426 * enforced or not.
427 */
428 public enum LimitScope {
429 /**
430 * Enforcing a limit between rows means that the limit will not be considered until all the
431 * cells for a particular row have been retrieved
432 */
433 BETWEEN_ROWS(0),
434
435 /**
436 * Enforcing a limit between cells means that the limit will be considered after each full cell
437 * has been retrieved
438 */
439 BETWEEN_CELLS(1);
440
441 /**
442 * When enforcing a limit, we must check that the scope is appropriate for enforcement.
443 * <p>
444 * To communicate this concept, each scope has a depth. A limit will be enforced if the depth of
445 * the checker's scope is less than or equal to the limit's scope. This means that when checking
446 * limits, the checker must know their own scope (i.e. are they checking the limits between
447 * rows, between cells, etc...)
448 */
449 int depth;
450
451 LimitScope(int depth) {
452 this.depth = depth;
453 }
454
455 int depth() {
456 return depth;
457 }
458
459 /**
460 * @param checkerScope The scope in which the limit is being checked
461 * @return true when the checker is in a scope that indicates the limit can be enforced. Limits
462 * can be enforced from "higher or equal" scopes (i.e. the checker's scope is at a
463 * lesser depth than the limit)
464 */
465 boolean canEnforceLimitFromScope(LimitScope checkerScope) {
466 return checkerScope != null && checkerScope.depth() <= depth;
467 }
468 }
469
470 /**
471 * The different fields that can be used as limits in calls to
472 * {@link InternalScanner#next(java.util.List)} and {@link RegionScanner#next(java.util.List)}
473 */
474 private static class LimitFields {
475 /**
476 * Default values of the limit fields. Defined such that if a field does NOT change from its
477 * default, it will not be enforced
478 */
479 private static int DEFAULT_BATCH = -1;
480 private static long DEFAULT_SIZE = -1L;
481 private static long DEFAULT_TIME = -1L;
482
483 /**
484 * Default scope that is assigned to a limit if a scope is not specified.
485 */
486 private static final LimitScope DEFAULT_SCOPE = LimitScope.BETWEEN_ROWS;
487
488 // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
489 // batch scope
490 int batch = DEFAULT_BATCH;
491
492 LimitScope sizeScope = DEFAULT_SCOPE;
493 long size = DEFAULT_SIZE;
494
495 LimitScope timeScope = DEFAULT_SCOPE;
496 long time = DEFAULT_TIME;
497
498 /**
499 * Fields keep their default values.
500 */
501 LimitFields() {
502 }
503
504 LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
505 setFields(batch, sizeScope, size, timeScope, time);
506 }
507
508 void copy(LimitFields limitsToCopy) {
509 if (limitsToCopy != null) {
510 setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(),
511 limitsToCopy.getTimeScope(), limitsToCopy.getTime());
512 }
513 }
514
515 /**
516 * Set all fields together.
517 * @param batch
518 * @param sizeScope
519 * @param size
520 */
521 void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
522 setBatch(batch);
523 setSizeScope(sizeScope);
524 setSize(size);
525 setTimeScope(timeScope);
526 setTime(time);
527 }
528
529 int getBatch() {
530 return this.batch;
531 }
532
533 void setBatch(int batch) {
534 this.batch = batch;
535 }
536
537 /**
538 * @param checkerScope
539 * @return true when the limit can be enforced from the scope of the checker
540 */
541 boolean canEnforceBatchLimitFromScope(LimitScope checkerScope) {
542 return LimitScope.BETWEEN_CELLS.canEnforceLimitFromScope(checkerScope);
543 }
544
545 long getSize() {
546 return this.size;
547 }
548
549 void setSize(long size) {
550 this.size = size;
551 }
552
553 /**
554 * @return {@link LimitScope} indicating scope in which the size limit is enforced
555 */
556 LimitScope getSizeScope() {
557 return this.sizeScope;
558 }
559
560 /**
561 * Change the scope in which the size limit is enforced
562 */
563 void setSizeScope(LimitScope scope) {
564 this.sizeScope = scope;
565 }
566
567 /**
568 * @param checkerScope
569 * @return true when the limit can be enforced from the scope of the checker
570 */
571 boolean canEnforceSizeLimitFromScope(LimitScope checkerScope) {
572 return this.sizeScope.canEnforceLimitFromScope(checkerScope);
573 }
574
575 long getTime() {
576 return this.time;
577 }
578
579 void setTime(long time) {
580 this.time = time;
581 }
582
583 /**
584 * @return {@link LimitScope} indicating scope in which the time limit is enforced
585 */
586 LimitScope getTimeScope() {
587 return this.timeScope;
588 }
589
590 /**
591 * Change the scope in which the time limit is enforced
592 */
593 void setTimeScope(LimitScope scope) {
594 this.timeScope = scope;
595 }
596
597 /**
598 * @param checkerScope
599 * @return true when the limit can be enforced from the scope of the checker
600 */
601 boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) {
602 return this.sizeScope.canEnforceLimitFromScope(checkerScope);
603 }
604
605 @Override
606 public String toString() {
607 StringBuilder sb = new StringBuilder();
608 sb.append("{");
609
610 sb.append("batch:");
611 sb.append(batch);
612
613 sb.append(", size:");
614 sb.append(size);
615
616 sb.append(", sizeScope:");
617 sb.append(sizeScope);
618
619 sb.append(", time:");
620 sb.append(time);
621
622 sb.append(", timeScope:");
623 sb.append(timeScope);
624
625 sb.append("}");
626 return sb.toString();
627 }
628 }
629 }