1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.client;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.hbase.classification.InterfaceAudience;
23 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
24
25 import java.util.List;
26 import java.util.Map;
27
28 /**
29 * A wrapper for a runnable for a group of actions for a single regionserver.
30 * <p>
31 * This can be used to build up the actions that should be taken and then
32 * </p>
33 * <p>
34 * This class exists to simulate using a ScheduledExecutorService with just a regular
35 * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could
36 * only be removed if we change the expectations in HTable around the pool the client is able to
37 * pass in and even if we deprecate the current APIs would require keeping this class around
38 * for the interim to bridge between the legacy ExecutorServices and the scheduled pool.
39 * </p>
40 */
41 @InterfaceAudience.Private
42 public class DelayingRunner<T> implements Runnable {
43 private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
44
45 private final Object sleepLock = new Object();
46 private boolean triggerWake = false;
47 private long sleepTime;
48 private MultiAction<T> actions = new MultiAction<T>();
49 private Runnable runnable;
50
51 public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) {
52 this.sleepTime = sleepTime;
53 add(e);
54 }
55
56 public void setRunner(Runnable runner) {
57 this.runnable = runner;
58 }
59
60 @Override
61 public void run() {
62 if (!sleep()) {
63 LOG.warn(
64 "Interrupted while sleeping for expected sleep time " + sleepTime + " ms");
65 }
66 //TODO maybe we should consider switching to a listenableFuture for the actual callable and
67 // then handling the results/errors as callbacks. That way we can decrement outstanding tasks
68 // even if we get interrupted here, but for now, we still need to run so we decrement the
69 // outstanding tasks
70 this.runnable.run();
71 }
72
73 /**
74 * Sleep for an expected amount of time.
75 * <p>
76 * This is nearly a copy of what the Sleeper does, but with the ability to know if you
77 * got interrupted while sleeping.
78 * </p>
79 *
80 * @return <tt>true</tt> if the sleep completely entirely successfully,
81 * but otherwise <tt>false</tt> if the sleep was interrupted.
82 */
83 private boolean sleep() {
84 long now = EnvironmentEdgeManager.currentTime();
85 long startTime = now;
86 long waitTime = sleepTime;
87 while (waitTime > 0) {
88 long woke = -1;
89 try {
90 synchronized (sleepLock) {
91 if (triggerWake) break;
92 sleepLock.wait(waitTime);
93 }
94 woke = EnvironmentEdgeManager.currentTime();
95 } catch (InterruptedException iex) {
96 return false;
97 }
98 // Recalculate waitTime.
99 woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke;
100 waitTime = waitTime - (woke - startTime);
101 }
102 return true;
103 }
104
105 public void add(Map.Entry<byte[], List<Action<T>>> e) {
106 actions.add(e.getKey(), e.getValue());
107 }
108
109 public MultiAction<T> getActions() {
110 return actions;
111 }
112
113 public long getSleepTime() {
114 return sleepTime;
115 }
116 }