1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase;
20
21 import java.util.concurrent.ScheduledThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27
28 import com.google.common.annotations.VisibleForTesting;
29
30 /**
31 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
32 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
33 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
34 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
35 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
36 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
37 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
38 * thread pool.
39 * <p>
40 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
41 * an entry being added to a queue, etc.
42 */
43 @InterfaceAudience.Private
44 public abstract class ScheduledChore implements Runnable {
45 private final Log LOG = LogFactory.getLog(this.getClass());
46
47 private final String name;
48
49 /**
50 * Default values for scheduling parameters should they be excluded during construction
51 */
52 private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
53 private final static long DEFAULT_INITIAL_DELAY = 0;
54
55 /**
56 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
57 */
58 private final int period;
59 private final TimeUnit timeUnit;
60 private final long initialDelay;
61
62 /**
63 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
64 * not scheduled.
65 */
66 private ChoreServicer choreServicer;
67
68 /**
69 * Variables that encapsulate the meaningful state information
70 */
71 private long timeOfLastRun = -1;
72 private long timeOfThisRun = -1;
73 private boolean initialChoreComplete = false;
74
75 /**
76 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
77 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
78 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
79 * command can cause many chores to stop together.
80 */
81 private final Stoppable stopper;
82
83 interface ChoreServicer {
84 /**
85 * Cancel any ongoing schedules that this chore has with the implementer of this interface.
86 */
87 public void cancelChore(ScheduledChore chore);
88 public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
89
90 /**
91 * @return true when the chore is scheduled with the implementer of this interface
92 */
93 public boolean isChoreScheduled(ScheduledChore chore);
94
95 /**
96 * This method tries to execute the chore immediately. If the chore is executing at the time of
97 * this call, the chore will begin another execution as soon as the current execution finishes
98 * <p>
99 * If the chore is not scheduled with a ChoreService, this call will fail.
100 * @return false when the chore could not be triggered immediately
101 */
102 public boolean triggerNow(ScheduledChore chore);
103
104 /**
105 * A callback that tells the implementer of this interface that one of the scheduled chores is
106 * missing its start time. The implication of a chore missing its start time is that the
107 * service's current means of scheduling may not be sufficient to handle the number of ongoing
108 * chores (the other explanation is that the chore's execution time is greater than its
109 * scheduled period). The service should try to increase its concurrency when this callback is
110 * received.
111 * @param chore The chore that missed its start time
112 */
113 public void onChoreMissedStartTime(ScheduledChore chore);
114 }
115
116 /**
117 * This constructor is for test only. It allows us to create an object and to call chore() on it.
118 */
119 protected ScheduledChore() {
120 this.name = null;
121 this.stopper = null;
122 this.period = 0;
123 this.initialDelay = DEFAULT_INITIAL_DELAY;
124 this.timeUnit = DEFAULT_TIME_UNIT;
125 }
126
127 /**
128 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
129 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
130 * @param period Period with which this Chore repeats execution when scheduled.
131 */
132 public ScheduledChore(final String name, Stoppable stopper, final int period) {
133 this(name, stopper, period, DEFAULT_INITIAL_DELAY);
134 }
135
136 /**
137 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
138 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
139 * @param period Period with which this Chore repeats execution when scheduled.
140 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
141 * value of 0 means the chore will begin to execute immediately. Negative delays are
142 * invalid and will be corrected to a value of 0.
143 */
144 public ScheduledChore(final String name, Stoppable stopper, final int period,
145 final long initialDelay) {
146 this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
147 }
148
149 /**
150 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
151 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
152 * @param period Period with which this Chore repeats execution when scheduled.
153 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
154 * value of 0 means the chore will begin to execute immediately. Negative delays are
155 * invalid and will be corrected to a value of 0.
156 * @param unit The unit that is used to measure period and initialDelay
157 */
158 public ScheduledChore(final String name, Stoppable stopper, final int period,
159 final long initialDelay, final TimeUnit unit) {
160 this.name = name;
161 this.stopper = stopper;
162 this.period = period;
163 this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
164 this.timeUnit = unit;
165 }
166
167 /**
168 * @see java.lang.Thread#run()
169 */
170 @Override
171 public void run() {
172 updateTimeTrackingBeforeRun();
173 if (missedStartTime() && isScheduled()) {
174 onChoreMissedStartTime();
175 if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
176 } else if (stopper.isStopped() || !isScheduled()) {
177 cancel(false);
178 cleanup();
179 if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");
180 } else {
181 try {
182 if (!initialChoreComplete) {
183 initialChoreComplete = initialChore();
184 } else {
185 chore();
186 }
187 } catch (Throwable t) {
188 if (LOG.isErrorEnabled()) LOG.error("Caught error", t);
189 if (this.stopper.isStopped()) {
190 cancel(false);
191 cleanup();
192 }
193 }
194 }
195 }
196
197 /**
198 * Update our time tracking members. Called at the start of an execution of this chore's run()
199 * method so that a correct decision can be made as to whether or not we missed the start time
200 */
201 private synchronized void updateTimeTrackingBeforeRun() {
202 timeOfLastRun = timeOfThisRun;
203 timeOfThisRun = System.currentTimeMillis();
204 }
205
206 /**
207 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
208 * make the decision as to whether or not it would be worthwhile to increase the number of core
209 * pool threads
210 */
211 private synchronized void onChoreMissedStartTime() {
212 if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
213 }
214
215 /**
216 * @return How long has it been since this chore last run. Useful for checking if the chore has
217 * missed its scheduled start time by too large of a margin
218 */
219 synchronized long getTimeBetweenRuns() {
220 return timeOfThisRun - timeOfLastRun;
221 }
222
223 /**
224 * @return true when the time between runs exceeds the acceptable threshold
225 */
226 private synchronized boolean missedStartTime() {
227 return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
228 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
229 }
230
231 private synchronized double getMaximumAllowedTimeBetweenRuns() {
232 // Threshold used to determine if the Chore's current run started too late
233 return 1.5 * timeUnit.toMillis(period);
234 }
235
236 private synchronized boolean isValidTime(final long time) {
237 return time > 0 && time <= System.currentTimeMillis();
238 }
239
240 /**
241 * @return false when the Chore is not currently scheduled with a ChoreService
242 */
243 public synchronized boolean triggerNow() {
244 if (choreServicer != null) {
245 return choreServicer.triggerNow(this);
246 } else {
247 return false;
248 }
249 }
250
251 synchronized void setChoreServicer(ChoreServicer service) {
252 // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
253 // is changing, cancel any existing schedules of this chore.
254 if (choreServicer != null && choreServicer != service) {
255 choreServicer.cancelChore(this, false);
256 }
257 choreServicer = service;
258 timeOfThisRun = System.currentTimeMillis();
259 }
260
261 public synchronized void cancel() {
262 cancel(true);
263 }
264
265 public synchronized void cancel(boolean mayInterruptIfRunning) {
266 if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning);
267
268 choreServicer = null;
269 }
270
271 public synchronized String getName() {
272 return name;
273 }
274
275 public synchronized Stoppable getStopper() {
276 return stopper;
277 }
278
279 public synchronized int getPeriod() {
280 return period;
281 }
282
283 public synchronized long getInitialDelay() {
284 return initialDelay;
285 }
286
287 public final synchronized TimeUnit getTimeUnit() {
288 return timeUnit;
289 }
290
291 public synchronized boolean isInitialChoreComplete() {
292 return initialChoreComplete;
293 }
294
295 @VisibleForTesting
296 synchronized ChoreServicer getChoreServicer() {
297 return choreServicer;
298 }
299
300 @VisibleForTesting
301 synchronized long getTimeOfLastRun() {
302 return timeOfLastRun;
303 }
304
305 @VisibleForTesting
306 synchronized long getTimeOfThisRun() {
307 return timeOfThisRun;
308 }
309
310 /**
311 * @return true when this Chore is scheduled with a ChoreService
312 */
313 public synchronized boolean isScheduled() {
314 return choreServicer != null && choreServicer.isChoreScheduled(this);
315 }
316
317 @VisibleForTesting
318 public synchronized void choreForTesting() {
319 chore();
320 }
321
322 /**
323 * The task to execute on each scheduled execution of the Chore
324 */
325 protected abstract void chore();
326
327 /**
328 * Override to run a task before we start looping.
329 * @return true if initial chore was successful
330 */
331 protected boolean initialChore() {
332 // Default does nothing
333 return true;
334 }
335
336 /**
337 * Override to run cleanup tasks when the Chore encounters an error and must stop running
338 */
339 protected synchronized void cleanup() {
340 }
341
342 @Override
343 public String toString() {
344 return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: "
345 + getTimeUnit() + "]";
346 }
347 }