1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.util.concurrent.ExecutionException;
21  import java.util.concurrent.TimeUnit;
22  
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.htrace.Span;
25  
26  /**
27   * A Future on a filesystem sync call.  It given to a client or 'Handler' for it to wait on till
28   * the sync completes.
29   *
30   * <p>Handlers coming in call append, append, append, and then do a flush/sync of
31   * the edits they have appended the WAL before returning. Since sync takes a while to
32   * complete, we give the Handlers back this sync future to wait on until the
33   * actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a
34   * sync runner thread; when it completes, it finishes up the future, the handler get or failed
35   * check completes and the Handler can then progress.
36   * <p>
37   * This is just a partial implementation of Future; we just implement get and
38   * failure.  Unimplemented methods throw {@link UnsupportedOperationException}.
39   * <p>
40   * There is not a one-to-one correlation between dfs sync invocations and
41   * instances of this class. A single dfs sync call may complete and mark many
42   * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
43   * call every time a Handler asks for it.
44   * <p>
45   * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even
46   * if it the first time, start the sync, then park the 'hitched' thread on a call to
47   * #get().
48   */
49  @InterfaceAudience.Private
50  class SyncFuture {
51    // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
52    // to coordinate on but it did not give any obvious advantage and some issues with order in which
53    // events happen.
54    private static final long NOT_DONE = 0;
55  
56    /**
57     * The sequence at which we were added to the ring buffer.
58     */
59    private long ringBufferSequence;
60  
61    /**
62     * The sequence that was set in here when we were marked done. Should be equal
63     * or > ringBufferSequence.  Put this data member into the NOT_DONE state while this
64     * class is in use.  But for the first position on construction, let it be -1 so we can
65     * immediately call {@link #reset(long, Span)} below and it will work.
66     */
67    private long doneSequence = -1;
68  
69    /**
70     * If error, the associated throwable. Set when the future is 'done'.
71     */
72    private Throwable throwable = null;
73  
74    private Thread t;
75  
76    /**
77     * Optionally carry a disconnected scope to the SyncRunner.
78     */
79    private Span span;
80  
81    /**
82     * Call this method to clear old usage and get it ready for new deploy. Call
83     * this method even if it is being used for the first time.
84     *
85     * @param sequence sequenceId from this Future's position in the RingBuffer
86     * @return this
87     */
88    synchronized SyncFuture reset(final long sequence) {
89      return reset(sequence, null);
90    }
91  
92    /**
93     * Call this method to clear old usage and get it ready for new deploy. Call
94     * this method even if it is being used for the first time.
95     *
96     * @param sequence sequenceId from this Future's position in the RingBuffer
97     * @param span curren span, detached from caller. Don't forget to attach it when
98     *             resuming after a call to {@link #get()}.
99     * @return this
100    */
101   synchronized SyncFuture reset(final long sequence, Span span) {
102     if (t != null && t != Thread.currentThread()) throw new IllegalStateException();
103     t = Thread.currentThread();
104     if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread());
105     this.doneSequence = NOT_DONE;
106     this.ringBufferSequence = sequence;
107     this.span = span;
108     return this;
109   }
110 
111   @Override
112   public synchronized String toString() {
113     return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
114   }
115 
116   synchronized long getRingBufferSequence() {
117     return this.ringBufferSequence;
118   }
119 
120   /**
121    * Retrieve the {@code span} instance from this Future. EventHandler calls
122    * this method to continue the span. Thread waiting on this Future musn't call
123    * this method until AFTER calling {@link #get()} and the future has been
124    * released back to the originating thread.
125    */
126   synchronized Span getSpan() {
127     return this.span;
128   }
129 
130   /**
131    * Used to re-attach a {@code span} to the Future. Called by the EventHandler
132    * after a it has completed processing and detached the span from its scope.
133    */
134   synchronized void setSpan(Span span) {
135     this.span = span;
136   }
137 
138   /**
139    * @param sequence Sync sequence at which this future 'completed'.
140    * @param t Can be null.  Set if we are 'completing' on error (and this 't' is the error).
141    * @return True if we successfully marked this outstanding future as completed/done.
142    * Returns false if this future is already 'done' when this method called.
143    */
144   synchronized boolean done(final long sequence, final Throwable t) {
145     if (isDone()) return false;
146     this.throwable = t;
147     if (sequence < this.ringBufferSequence) {
148       // Something badly wrong.
149       if (throwable == null) {
150         this.throwable = new IllegalStateException("sequence=" + sequence +
151           ", ringBufferSequence=" + this.ringBufferSequence);
152       }
153     }
154     // Mark done.
155     this.doneSequence = sequence;
156     // Wake up waiting threads.
157     notify();
158     return true;
159   }
160 
161   public boolean cancel(boolean mayInterruptIfRunning) {
162     throw new UnsupportedOperationException();
163   }
164 
165   public synchronized long get() throws InterruptedException, ExecutionException {
166     while (!isDone()) {
167       wait(1000);
168     }
169     if (this.throwable != null) throw new ExecutionException(this.throwable);
170     return this.doneSequence;
171   }
172 
173   public Long get(long timeout, TimeUnit unit)
174   throws InterruptedException, ExecutionException {
175     throw new UnsupportedOperationException();
176   }
177 
178   public boolean isCancelled() {
179     throw new UnsupportedOperationException();
180   }
181 
182   synchronized boolean isDone() {
183     return this.doneSequence != NOT_DONE;
184   }
185 
186   synchronized boolean isThrowable() {
187     return isDone() && getThrowable() != null;
188   }
189 
190   synchronized Throwable getThrowable() {
191     return this.throwable;
192   }
193 }