1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.errorhandling.ForeignException;
32 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
33 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
34 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
35 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
36
37 import com.google.common.collect.Lists;
38
39 /**
40 * A globally-barriered distributed procedure. This class encapsulates state and methods for
41 * tracking and managing a distributed procedure, as well as aborting if any member encounters
42 * a problem or if a cancellation is requested.
43 * <p>
44 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
45 * method. The procedure contacts all members and waits for all subprocedures to execute
46 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
47 * send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed,
48 * the coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to
49 * execute the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all
50 * {@link Subprocedure#insideBarrier} executions complete at the members. When
51 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
52 * the coordinator. Once all members complete, the coordinator calls
53 * {@link #sendGlobalBarrierComplete()}.
54 * <p>
55 * If errors are encountered remotely, they are forwarded to the coordinator, and
56 * {@link Subprocedure#cleanup(Exception)} is called.
57 * <p>
58 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
59 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
60 * an {@link ForeignException} to abort the procedure. This is particularly useful for situations
61 * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
62 * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
63 * <p>
64 * Users should generally not directly create or subclass instances of this. They are created
65 * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
66 * String, byte[], List)}}
67 */
68 @InterfaceAudience.Private
69 public class Procedure implements Callable<Void>, ForeignExceptionListener {
70 private static final Log LOG = LogFactory.getLog(Procedure.class);
71
72 //
73 // Arguments and naming
74 //
75
76 // Name of the procedure
77 final private String procName;
78 // Arguments for this procedure execution
79 final private byte[] args;
80
81 //
82 // Execution State
83 //
84 /** latch for waiting until all members have acquire in barrier state */
85 final CountDownLatch acquiredBarrierLatch;
86 /** latch for waiting until all members have executed and released their in barrier state */
87 final CountDownLatch releasedBarrierLatch;
88 /** latch for waiting until a procedure has completed */
89 final CountDownLatch completedLatch;
90 /** monitor to check for errors */
91 private final ForeignExceptionDispatcher monitor;
92
93 //
94 // Execution Timeout Handling.
95 //
96
97 /** frequency to check for errors (ms) */
98 protected final long wakeFrequency;
99 protected final TimeoutExceptionInjector timeoutInjector;
100
101 //
102 // Members' and Coordinator's state
103 //
104
105 /** lock to prevent nodes from acquiring and then releasing before we can track them */
106 private Object joinBarrierLock = new Object();
107 private final List<String> acquiringMembers;
108 private final List<String> inBarrierMembers;
109 private final HashMap<String, byte[]> dataFromFinishedMembers;
110 private ProcedureCoordinator coord;
111
112 /**
113 * Creates a procedure. (FOR TESTING)
114 *
115 * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
116 * @param coord coordinator to call back to for general errors (e.g.
117 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
118 * @param monitor error monitor to check for external errors
119 * @param wakeFreq frequency to check for errors while waiting
120 * @param timeout amount of time to allow the procedure to run before cancelling
121 * @param procName name of the procedure instance
122 * @param args argument data associated with the procedure instance
123 * @param expectedMembers names of the expected members
124 */
125 public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
126 long timeout, String procName, byte[] args, List<String> expectedMembers) {
127 this.coord = coord;
128 this.acquiringMembers = new ArrayList<String>(expectedMembers);
129 this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
130 this.dataFromFinishedMembers = new HashMap<String, byte[]>();
131 this.procName = procName;
132 this.args = args;
133 this.monitor = monitor;
134 this.wakeFrequency = wakeFreq;
135
136 int count = expectedMembers.size();
137 this.acquiredBarrierLatch = new CountDownLatch(count);
138 this.releasedBarrierLatch = new CountDownLatch(count);
139 this.completedLatch = new CountDownLatch(1);
140 this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
141 }
142
143 /**
144 * Create a procedure.
145 *
146 * Users should generally not directly create instances of this. They are created them
147 * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
148 * String, byte[], List)}}
149 *
150 * @param coord coordinator to call back to for general errors (e.g.
151 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
152 * @param wakeFreq frequency to check for errors while waiting
153 * @param timeout amount of time to allow the procedure to run before cancelling
154 * @param procName name of the procedure instance
155 * @param args argument data associated with the procedure instance
156 * @param expectedMembers names of the expected members
157 */
158 public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
159 String procName, byte[] args, List<String> expectedMembers) {
160 this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
161 expectedMembers);
162 }
163
164 public String getName() {
165 return procName;
166 }
167
168 /**
169 * @return String of the procedure members both trying to enter the barrier and already in barrier
170 */
171 public String getStatus() {
172 String waiting, done;
173 synchronized (joinBarrierLock) {
174 waiting = acquiringMembers.toString();
175 done = inBarrierMembers.toString();
176 }
177 return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
178 }
179
180 /**
181 * Get the ForeignExceptionDispatcher
182 * @return the Procedure's monitor.
183 */
184 public ForeignExceptionDispatcher getErrorMonitor() {
185 return monitor;
186 }
187
188 /**
189 * This call is the main execution thread of the barriered procedure. It sends messages and
190 * essentially blocks until all procedure members acquire or later complete but periodically
191 * checks for foreign exceptions.
192 */
193 @Override
194 @SuppressWarnings("finally")
195 final public Void call() {
196 LOG.info("Starting procedure '" + procName + "'");
197 // start the timer
198 timeoutInjector.start();
199
200 // run the procedure
201 try {
202 // start by checking for error first
203 monitor.rethrowException();
204 LOG.debug("Procedure '" + procName + "' starting 'acquire'");
205 sendGlobalBarrierStart();
206
207 // wait for all the members to report acquisition
208 LOG.debug("Waiting for all members to 'acquire'");
209 waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
210 monitor.rethrowException();
211
212 LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
213 sendGlobalBarrierReached();
214
215 // wait for all members to report barrier release
216 waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
217
218 // make sure we didn't get an error during in barrier execution and release
219 monitor.rethrowException();
220 LOG.info("Procedure '" + procName + "' execution completed");
221 } catch (Exception e) {
222 if (e instanceof InterruptedException) {
223 Thread.currentThread().interrupt();
224 }
225 String msg = "Procedure '" + procName +"' execution failed!";
226 LOG.error(msg, e);
227 receive(new ForeignException(getName(), e));
228 } finally {
229 LOG.debug("Running finish phase.");
230 sendGlobalBarrierComplete();
231 completedLatch.countDown();
232
233 // tell the timer we are done, if we get here successfully
234 timeoutInjector.complete();
235 return null;
236 }
237 }
238
239 /**
240 * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
241 * the {@link Subprocedure#acquireBarrier} step.
242 * @throws ForeignException
243 */
244 public void sendGlobalBarrierStart() throws ForeignException {
245 // start the procedure
246 LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
247 try {
248 // send procedure barrier start to specified list of members. cloning the list to avoid
249 // concurrent modification from the controller setting the prepared nodes
250 coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
251 } catch (IOException e) {
252 coord.rpcConnectionFailure("Can't reach controller.", e);
253 } catch (IllegalArgumentException e) {
254 throw new ForeignException(getName(), e);
255 }
256 }
257
258 /**
259 * Sends a message to all members that the global barrier condition has been satisfied. This
260 * should only be executed after all members have completed its
261 * {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member
262 * {@link Subprocedure#insideBarrier} method.
263 * @throws ForeignException
264 */
265 public void sendGlobalBarrierReached() throws ForeignException {
266 try {
267 // trigger to have member run {@link Subprocedure#insideBarrier}
268 coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
269 } catch (IOException e) {
270 coord.rpcConnectionFailure("Can't reach controller.", e);
271 }
272 }
273
274 /**
275 * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
276 * After this executes, the coordinator can assume that any state resources about this barrier
277 * procedure state has been released.
278 */
279 public void sendGlobalBarrierComplete() {
280 LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
281 try {
282 coord.getRpcs().resetMembers(this);
283 } catch (IOException e) {
284 coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
285 }
286 }
287
288 //
289 // Call backs from other external processes.
290 //
291
292 /**
293 * Call back triggered by an individual member upon successful local barrier acquisition
294 * @param member
295 */
296 public void barrierAcquiredByMember(String member) {
297 LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
298 + "' on coordinator");
299 if (this.acquiringMembers.contains(member)) {
300 synchronized (joinBarrierLock) {
301 if (this.acquiringMembers.remove(member)) {
302 this.inBarrierMembers.add(member);
303 acquiredBarrierLatch.countDown();
304 }
305 }
306 LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
307 } else {
308 LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
309 " Continuing on.");
310 }
311 }
312
313 /**
314 * Call back triggered by a individual member upon successful local in-barrier execution and
315 * release
316 * @param member
317 * @param dataFromMember
318 */
319 public void barrierReleasedByMember(String member, byte[] dataFromMember) {
320 boolean removed = false;
321 synchronized (joinBarrierLock) {
322 removed = this.inBarrierMembers.remove(member);
323 if (removed) {
324 releasedBarrierLatch.countDown();
325 }
326 }
327 if (removed) {
328 LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
329 + "', counting down latch. Waiting for " + releasedBarrierLatch.getCount()
330 + " more");
331 } else {
332 LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
333 + "', but we weren't waiting on it to release!");
334 }
335 dataFromFinishedMembers.put(member, dataFromMember);
336 }
337
338 /**
339 * Waits until the entire procedure has globally completed, or has been aborted. If an
340 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
341 * yet.
342 * @throws ForeignException
343 * @throws InterruptedException
344 */
345 public void waitForCompleted() throws ForeignException, InterruptedException {
346 waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
347 }
348
349 /**
350 * Waits until the entire procedure has globally completed, or has been aborted. If an
351 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
352 * yet.
353 * @return data returned from procedure members upon successfully completing subprocedure.
354 * @throws ForeignException
355 * @throws InterruptedException
356 */
357 public HashMap<String, byte[]> waitForCompletedWithRet() throws ForeignException, InterruptedException {
358 waitForCompleted();
359 return dataFromFinishedMembers;
360 }
361
362 /**
363 * Check if the entire procedure has globally completed, or has been aborted.
364 * @throws ForeignException
365 */
366 public boolean isCompleted() throws ForeignException {
367 // Rethrow exception if any
368 monitor.rethrowException();
369 return (completedLatch.getCount() == 0);
370 }
371
372 /**
373 * A callback that handles incoming ForeignExceptions.
374 */
375 @Override
376 public void receive(ForeignException e) {
377 monitor.receive(e);
378 }
379
380 /**
381 * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
382 * check for errors
383 * @param latch latch to wait on
384 * @param monitor monitor to check for errors while waiting
385 * @param wakeFrequency frequency to wake up and check for errors (in
386 * {@link TimeUnit#MILLISECONDS})
387 * @param latchDescription description of the latch, for logging
388 * @throws ForeignException type of error the monitor can throw, if the task fails
389 * @throws InterruptedException if we are interrupted while waiting on latch
390 */
391 public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
392 long wakeFrequency, String latchDescription) throws ForeignException,
393 InterruptedException {
394 boolean released = false;
395 while (!released) {
396 if (monitor != null) {
397 monitor.rethrowException();
398 }
399 /*
400 ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
401 + wakeFrequency + " ms)"); */
402 released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
403 }
404 // check error again in case an error raised during last wait
405 if (monitor != null) {
406 monitor.rethrowException();
407 }
408 }
409 }