1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.Map.Entry;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34
35 /**
36 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
37 * periodically while sharing threads. The ChoreService is backed by a
38 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
39 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
40 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
41 * <p>
42 * The ChoreService provides the ability to schedule, cancel, and trigger instances of
43 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
44 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
45 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
46 * there may be a need to increase the number of threads if it is noticed that chores are no longer
47 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
48 * made to reduce the number of running threads to see if chores can still meet their start times
49 * with a smaller thread pool.
50 * <p>
51 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
52 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
53 */
54 @InterfaceAudience.Private
55 public class ChoreService implements ChoreServicer {
56 private final Log LOG = LogFactory.getLog(this.getClass());
57
58 /**
59 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
60 */
61 public final static int MIN_CORE_POOL_SIZE = 1;
62
63 /**
64 * This thread pool is used to schedule all of the Chores
65 */
66 private final ScheduledThreadPoolExecutor scheduler;
67
68 /**
69 * Maps chores to their futures. Futures are used to control a chore's schedule
70 */
71 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
72
73 /**
74 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
75 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
76 * increase the core pool size by 1 (otherwise a single long running chore whose execution is
77 * longer than its period would be able to spawn too many threads).
78 */
79 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
80
81 /**
82 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
83 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
84 * running on. The prefix is useful because it allows us to monitor how the thread pool of a
85 * particular service changes over time VIA thread dumps.
86 */
87 private final String coreThreadPoolPrefix;
88
89 /**
90 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
91 * spawned by this service
92 */
93 public ChoreService(final String coreThreadPoolPrefix) {
94 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
95 }
96
97 /**
98 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
99 * spawned by this service
100 * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor
101 * to during initialization. The default size is 1, but specifying a larger size may be
102 * beneficial if you know that 1 thread will not be enough.
103 */
104 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
105 this.coreThreadPoolPrefix = coreThreadPoolPrefix;
106 if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
107 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
108 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
109 scheduler.setRemoveOnCancelPolicy(true);
110 scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
111 choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
112 }
113
114 /**
115 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
116 * spawned by this service
117 */
118 public static ChoreService getInstance(final String coreThreadPoolPrefix) {
119 return new ChoreService(coreThreadPoolPrefix);
120 }
121
122 /**
123 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
124 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
125 * with a single ChoreService instance).
126 * @return true when the chore was successfully scheduled. false when the scheduling failed
127 * (typically occurs when a chore is scheduled during shutdown of service)
128 */
129 public synchronized boolean scheduleChore(ScheduledChore chore) {
130 if (chore == null) return false;
131
132 try {
133 chore.setChoreServicer(this);
134 ScheduledFuture<?> future =
135 scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
136 chore.getTimeUnit());
137 scheduledChores.put(chore, future);
138 return true;
139 } catch (Exception exception) {
140 if (LOG.isInfoEnabled()) {
141 LOG.info("Could not successfully schedule chore: " + chore.getName());
142 }
143 return false;
144 }
145 }
146
147 /**
148 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
149 * yet then this call is equivalent to a call to scheduleChore.
150 */
151 private synchronized void rescheduleChore(ScheduledChore chore) {
152 if (chore == null) return;
153
154 if (scheduledChores.containsKey(chore)) {
155 ScheduledFuture<?> future = scheduledChores.get(chore);
156 future.cancel(false);
157 }
158 scheduleChore(chore);
159 }
160
161 @Override
162 public synchronized void cancelChore(ScheduledChore chore) {
163 cancelChore(chore, true);
164 }
165
166 @Override
167 public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
168 if (chore != null && scheduledChores.containsKey(chore)) {
169 ScheduledFuture<?> future = scheduledChores.get(chore);
170 future.cancel(mayInterruptIfRunning);
171 scheduledChores.remove(chore);
172
173 // Removing a chore that was missing its start time means it may be possible
174 // to reduce the number of threads
175 if (choresMissingStartTime.containsKey(chore)) {
176 choresMissingStartTime.remove(chore);
177 requestCorePoolDecrease();
178 }
179 }
180 }
181
182 @Override
183 public synchronized boolean isChoreScheduled(ScheduledChore chore) {
184 return chore != null && scheduledChores.containsKey(chore)
185 && !scheduledChores.get(chore).isDone();
186 }
187
188 @Override
189 public synchronized boolean triggerNow(ScheduledChore chore) {
190 if (chore == null) {
191 return false;
192 } else {
193 rescheduleChore(chore);
194 return true;
195 }
196 }
197
198 /**
199 * @return number of chores that this service currently has scheduled
200 */
201 int getNumberOfScheduledChores() {
202 return scheduledChores.size();
203 }
204
205 /**
206 * @return number of chores that this service currently has scheduled that are missing their
207 * scheduled start time
208 */
209 int getNumberOfChoresMissingStartTime() {
210 return choresMissingStartTime.size();
211 }
212
213 /**
214 * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
215 */
216 int getCorePoolSize() {
217 return scheduler.getCorePoolSize();
218 }
219
220 /**
221 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
222 * daemon threads, and thus, don't prevent the JVM from shutting down
223 */
224 static class ChoreServiceThreadFactory implements ThreadFactory {
225 private final String threadPrefix;
226 private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
227 private AtomicInteger threadNumber = new AtomicInteger(1);
228
229 /**
230 * @param threadPrefix The prefix given to all threads created by this factory
231 */
232 public ChoreServiceThreadFactory(final String threadPrefix) {
233 this.threadPrefix = threadPrefix;
234 }
235
236 @Override
237 public Thread newThread(Runnable r) {
238 Thread thread =
239 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
240 thread.setDaemon(true);
241 return thread;
242 }
243 }
244
245 /**
246 * Represents a request to increase the number of core pool threads. Typically a request
247 * originates from the fact that the current core pool size is not sufficient to service all of
248 * the currently running Chores
249 * @return true when the request to increase the core pool size succeeds
250 */
251 private synchronized boolean requestCorePoolIncrease() {
252 // There is no point in creating more threads than scheduledChores.size since scheduled runs
253 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
254 // amongst occurrences of the same chore).
255 if (scheduler.getCorePoolSize() < scheduledChores.size()) {
256 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
257 printChoreServiceDetails("requestCorePoolIncrease");
258 return true;
259 }
260 return false;
261 }
262
263 /**
264 * Represents a request to decrease the number of core pool threads. Typically a request
265 * originates from the fact that the current core pool size is more than sufficient to service the
266 * running Chores.
267 */
268 private synchronized void requestCorePoolDecrease() {
269 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
270 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
271 printChoreServiceDetails("requestCorePoolDecrease");
272 }
273 }
274
275 @Override
276 public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
277 if (chore == null || !scheduledChores.containsKey(chore)) return;
278
279 // If the chore has not caused an increase in the size of the core thread pool then request an
280 // increase. This allows each chore missing its start time to increase the core pool size by
281 // at most 1.
282 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
283 choresMissingStartTime.put(chore, requestCorePoolIncrease());
284 }
285
286 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
287 // the chore is NOT rescheduled, future executions of this chore will be delayed more and
288 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
289 // idle threads to chores based on how delayed they are.
290 rescheduleChore(chore);
291 printChoreDetails("onChoreMissedStartTime", chore);
292 }
293
294 /**
295 * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
296 * in the middle of execution will be interrupted and shutdown. This service will be unusable
297 * after this method has been called (i.e. future scheduling attempts will fail).
298 */
299 public void shutdown() {
300 scheduler.shutdownNow();
301 if (LOG.isInfoEnabled()) {
302 LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
303 + " on shutdown");
304 }
305 cancelAllChores(true);
306 scheduledChores.clear();
307 choresMissingStartTime.clear();
308 }
309
310 /**
311 * @return true when the service is shutdown and thus cannot be used anymore
312 */
313 public boolean isShutdown() {
314 return scheduler.isShutdown();
315 }
316
317 /**
318 * @return true when the service is shutdown and all threads have terminated
319 */
320 public boolean isTerminated() {
321 return scheduler.isTerminated();
322 }
323
324 private void cancelAllChores(final boolean mayInterruptIfRunning) {
325 ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
326 // Build list of chores to cancel so we can iterate through a set that won't change
327 // as chores are cancelled. If we tried to cancel each chore while iterating through
328 // keySet the results would be undefined because the keySet would be changing
329 for (ScheduledChore chore : scheduledChores.keySet()) {
330 choresToCancel.add(chore);
331 }
332 for (ScheduledChore chore : choresToCancel) {
333 cancelChore(chore, mayInterruptIfRunning);
334 }
335 choresToCancel.clear();
336 }
337
338 /**
339 * Prints a summary of important details about the chore. Used for debugging purposes
340 */
341 private void printChoreDetails(final String header, ScheduledChore chore) {
342 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
343 output.put(header, "");
344 output.put("Chore name: ", chore.getName());
345 output.put("Chore period: ", Integer.toString(chore.getPeriod()));
346 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
347
348 for (Entry<String, String> entry : output.entrySet()) {
349 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
350 }
351 }
352
353 /**
354 * Prints a summary of important details about the service. Used for debugging purposes
355 */
356 private void printChoreServiceDetails(final String header) {
357 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
358 output.put(header, "");
359 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
360 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
361 output.put("ChoreService missingStartTimeCount: ",
362 Integer.toString(getNumberOfChoresMissingStartTime()));
363
364 for (Entry<String, String> entry : output.entrySet()) {
365 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
366 }
367 }
368 }