1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19
20 package org.apache.hadoop.hbase;
21
22 import java.text.MessageFormat;
23
24 import junit.framework.Assert;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30
31 /**
32 * A class that provides a standard waitFor pattern
33 * See details at https://issues.apache.org/jira/browse/HBASE-7384
34 */
35 @InterfaceAudience.Private
36 public final class Waiter {
37
38 private static final Log LOG = LogFactory.getLog(Waiter.class);
39
40 /**
41 * System property name whose value is a scale factor to increase time out values dynamically used
42 * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)},
43 * {@link #waitFor(Configuration, long, long, Predicate)}, and
44 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method
45 * <p/>
46 * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout
47 */
48 public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio";
49
50 private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1;
51
52 private static float waitForRatio = -1;
53
54 private Waiter() {
55 }
56
57 /**
58 * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)},
59 * {@link #waitFor(Configuration, long, Predicate)},
60 * {@link #waitFor(Configuration, long, long, Predicate)} and
61 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class
62 * <p/>
63 * This is useful to dynamically adjust max time out values when same test cases run in different
64 * test machine settings without recompiling & re-deploying code.
65 * <p/>
66 * The value is obtained from the Java System property or configuration setting
67 * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>.
68 * @param conf the configuration
69 * @return the 'wait for ratio' for the current test run.
70 */
71 public static float getWaitForRatio(Configuration conf) {
72 if (waitForRatio < 0) {
73 // System property takes precedence over configuration setting
74 if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) {
75 waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO));
76 } else {
77 waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT);
78 }
79 }
80 return waitForRatio;
81 }
82
83 /**
84 * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and
85 * {@link Waiter#waitFor(Configuration, long, Predicate)} and
86 * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate) methods.
87 */
88 @InterfaceAudience.Private
89 public interface Predicate<E extends Exception> {
90
91 /**
92 * Perform a predicate evaluation.
93 * @return the boolean result of the evaluation.
94 * @throws Exception thrown if the predicate evaluation could not evaluate.
95 */
96 boolean evaluate() throws E;
97
98 }
99
100 /**
101 * A mixin interface, can be used with {@link Waiter} to explain failed state.
102 */
103 @InterfaceAudience.Private
104 public interface ExplainingPredicate<E extends Exception> extends Predicate<E> {
105
106 /**
107 * Perform a predicate evaluation.
108 *
109 * @return explanation of failed state
110 */
111 String explainFailure() throws E;
112
113 }
114
115 /**
116 * Makes the current thread sleep for the duration equal to the specified time in milliseconds
117 * multiplied by the {@link #getWaitForRatio(Configuration)}.
118 * @param conf the configuration
119 * @param time the number of milliseconds to sleep.
120 */
121 public static void sleep(Configuration conf, long time) {
122 try {
123 Thread.sleep((long) (getWaitForRatio(conf) * time));
124 } catch (InterruptedException ex) {
125 LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
126 }
127 }
128
129 /**
130 * Waits up to the duration equal to the specified timeout multiplied by the
131 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
132 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
133 * <code>false</code>.
134 * <p/>
135 * @param conf the configuration
136 * @param timeout the timeout in milliseconds to wait for the predicate.
137 * @param predicate the predicate to evaluate.
138 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
139 * wait is interrupted otherwise <code>-1</code> when times out
140 */
141 public static <E extends Exception> long waitFor(Configuration conf, long timeout,
142 Predicate<E> predicate) throws E {
143 return waitFor(conf, timeout, 100, true, predicate);
144 }
145
146 /**
147 * Waits up to the duration equal to the specified timeout multiplied by the
148 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
149 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
150 * <code>false</code>.
151 * <p/>
152 * @param conf the configuration
153 * @param timeout the max timeout in milliseconds to wait for the predicate.
154 * @param interval the interval in milliseconds to evaluate predicate.
155 * @param predicate the predicate to evaluate.
156 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
157 * wait is interrupted otherwise <code>-1</code> when times out
158 */
159 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
160 Predicate<E> predicate) throws E {
161 return waitFor(conf, timeout, interval, true, predicate);
162 }
163
164 /**
165 * Waits up to the duration equal to the specified timeout multiplied by the
166 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
167 * <code>true</code>, failing the test if the timeout is reached, the Predicate is still
168 * <code>false</code> and failIfTimeout is set as <code>true</code>.
169 * <p/>
170 * @param conf the configuration
171 * @param timeout the timeout in milliseconds to wait for the predicate.
172 * @param interval the interval in milliseconds to evaluate predicate.
173 * @param failIfTimeout indicates if should fail current test case when times out.
174 * @param predicate the predicate to evaluate.
175 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
176 * wait is interrupted otherwise <code>-1</code> when times out
177 */
178 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
179 boolean failIfTimeout, Predicate<E> predicate) throws E {
180 long started = System.currentTimeMillis();
181 long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout);
182 long mustEnd = started + adjustedTimeout;
183 long remainderWait = 0;
184 long sleepInterval = 0;
185 Boolean eval = false;
186 Boolean interrupted = false;
187
188 try {
189 LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])",
190 adjustedTimeout, getWaitForRatio(conf)));
191 while (!(eval = predicate.evaluate())
192 && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) {
193 try {
194 // handle tail case when remainder wait is less than one interval
195 sleepInterval = (remainderWait > interval) ? interval : remainderWait;
196 Thread.sleep(sleepInterval);
197 } catch (InterruptedException e) {
198 eval = predicate.evaluate();
199 interrupted = true;
200 break;
201 }
202 }
203 if (!eval) {
204 if (interrupted) {
205 LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
206 System.currentTimeMillis() - started));
207 } else if (failIfTimeout) {
208 String msg = getExplanation(predicate);
209 Assert.fail(MessageFormat
210 .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
211 } else {
212 String msg = getExplanation(predicate);
213 LOG.warn(
214 MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
215 }
216 }
217 return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1;
218 } catch (Exception ex) {
219 throw new RuntimeException(ex);
220 }
221 }
222
223 public static String getExplanation(Predicate explain) {
224 if (explain instanceof ExplainingPredicate) {
225 try {
226 return " " + ((ExplainingPredicate) explain).explainFailure();
227 } catch (Exception e) {
228 LOG.error("Failed to get explanation, ", e);
229 return e.getMessage();
230 }
231 } else {
232 return "";
233 }
234 }
235
236 }