1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.replication.regionserver;
19
20 import org.apache.hadoop.hbase.classification.InterfaceAudience;
21 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
22
23 /**
24 * Per-peer per-node throttling controller for replication: enabled if
25 * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed
26 * to peer within each cycle won't exceed 'bandwidth' bytes
27 */
28 @InterfaceAudience.Private
29 public class ReplicationThrottler {
30 private final boolean enabled;
31 private final double bandwidth;
32 private long cyclePushSize;
33 private long cycleStartTick;
34
35 /**
36 * ReplicationThrottler constructor
37 * If bandwidth less than 1, throttling is disabled
38 * @param bandwidth per cycle(100ms)
39 */
40 public ReplicationThrottler(final double bandwidth) {
41 this.bandwidth = bandwidth;
42 this.enabled = this.bandwidth > 0;
43 if (this.enabled) {
44 this.cyclePushSize = 0;
45 this.cycleStartTick = EnvironmentEdgeManager.currentTime();
46 }
47 }
48
49 /**
50 * If throttling is enabled
51 * @return true if throttling is enabled
52 */
53 public boolean isEnabled() {
54 return this.enabled;
55 }
56
57 /**
58 * Get how long the caller should sleep according to the current size and
59 * current cycle's total push size and start tick, return the sleep interval
60 * for throttling control.
61 * @param size is the size of edits to be pushed
62 * @return sleep interval for throttling control
63 */
64 public long getNextSleepInterval(final int size) {
65 if (!this.enabled) {
66 return 0;
67 }
68
69 long sleepTicks = 0;
70 long now = EnvironmentEdgeManager.currentTime();
71 // 1. if cyclePushSize exceeds bandwidth, we need to sleep some
72 // following cycles to amortize, this case can occur when a single push
73 // exceeds the bandwidth
74 if ((double)this.cyclePushSize > bandwidth) {
75 double cycles = Math.ceil((double)this.cyclePushSize / bandwidth);
76 long shouldTillTo = this.cycleStartTick + (long)(cycles * 100);
77 if (shouldTillTo > now) {
78 sleepTicks = shouldTillTo - now;
79 } else {
80 // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
81 this.cycleStartTick = now;
82 }
83 this.cyclePushSize = 0;
84 } else {
85 long nextCycleTick = this.cycleStartTick + 100; //a cycle is 100ms
86 if (now >= nextCycleTick) {
87 // 2. switch to next cycle if the current cycle has passed
88 this.cycleStartTick = now;
89 this.cyclePushSize = 0;
90 } else if (this.cyclePushSize > 0 &&
91 (double)(this.cyclePushSize + size) >= bandwidth) {
92 // 3. delay the push to next cycle if exceeds throttling bandwidth.
93 // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
94 // where a cycle's first push size(currentSize) > bandwidth
95 sleepTicks = nextCycleTick - now;
96 this.cyclePushSize = 0;
97 }
98 }
99 return sleepTicks;
100 }
101
102 /**
103 * Add current size to the current cycle's total push size
104 * @param size is the current size added to the current cycle's
105 * total push size
106 */
107 public void addPushSize(final int size) {
108 if (this.enabled) {
109 this.cyclePushSize += size;
110 }
111 }
112
113 /**
114 * Reset the cycle start tick to NOW
115 */
116 public void resetStartTick() {
117 if (this.enabled) {
118 this.cycleStartTick = EnvironmentEdgeManager.currentTime();
119 }
120 }
121 }