1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.util.concurrent.ExecutionException;
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.RunnableFuture;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.TimeoutException;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.htrace.Trace;
29  
30  
31  
32  
33  
34  
35  
36  @InterfaceAudience.Private
37  public class ResultBoundedCompletionService<V> {
38    private final RpcRetryingCallerFactory retryingCallerFactory;
39    private final Executor executor;
40    private final QueueingFuture<V>[] tasks; 
41    private volatile QueueingFuture<V> completed = null;
42    private volatile boolean cancelled = false;
43    
44    class QueueingFuture<T> implements RunnableFuture<T> {
45      private final RetryingCallable<T> future;
46      private T result = null;
47      private ExecutionException exeEx = null;
48      private volatile boolean cancelled = false;
49      private final int callTimeout;
50      private final RpcRetryingCaller<T> retryingCaller;
51      private boolean resultObtained = false;
52  
53  
54      public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
55        this.future = future;
56        this.callTimeout = callTimeout;
57        this.retryingCaller = retryingCallerFactory.<T>newCaller();
58      }
59  
60      @SuppressWarnings("unchecked")
61      @Override
62      public void run() {
63        try {
64          if (!cancelled) {
65            result = this.retryingCaller.callWithRetries(future, callTimeout);
66            resultObtained = true;
67          }
68        } catch (Throwable t) {
69          exeEx = new ExecutionException(t);
70        } finally {
71          synchronized (tasks) {
72            
73            if (!cancelled && completed == null) {
74              completed = (QueueingFuture<V>) QueueingFuture.this;
75            }
76  
77            
78            
79            tasks.notify();
80          }
81        }
82      }
83      @Override
84      public boolean cancel(boolean mayInterruptIfRunning) {
85        if (resultObtained || exeEx != null) return false;
86        retryingCaller.cancel();
87        if (future instanceof Cancellable) ((Cancellable)future).cancel();
88        cancelled = true;
89        return true;
90      }
91  
92      @Override
93      public boolean isCancelled() {
94        return cancelled;
95      }
96  
97      @Override
98      public boolean isDone() {
99        return resultObtained || exeEx != null;
100     }
101 
102     @Override
103     public T get() throws InterruptedException, ExecutionException {
104       try {
105         return get(1000, TimeUnit.DAYS);
106       } catch (TimeoutException e) {
107         throw new RuntimeException("You did wait for 1000 days here?", e);
108       }
109     }
110 
111     @Override
112     public T get(long timeout, TimeUnit unit)
113         throws InterruptedException, ExecutionException, TimeoutException {
114       synchronized (tasks) {
115         if (resultObtained) {
116           return result;
117         }
118         if (exeEx != null) {
119           throw exeEx;
120         }
121         unit.timedWait(tasks, timeout);
122       }
123       if (resultObtained) {
124         return result;
125       }
126       if (exeEx != null) {
127         throw exeEx;
128       }
129 
130       throw new TimeoutException("timeout=" + timeout + ", " + unit);
131     }
132   }
133 
134   @SuppressWarnings("unchecked")
135   public ResultBoundedCompletionService(
136       RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
137       int maxTasks) {
138     this.retryingCallerFactory = retryingCallerFactory;
139     this.executor = executor;
140     this.tasks = new QueueingFuture[maxTasks];
141   }
142 
143 
144   public void submit(RetryingCallable<V> task, int callTimeout, int id) {
145     QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
146     executor.execute(Trace.wrap(newFuture));
147     tasks[id] = newFuture;
148   }
149 
150   public QueueingFuture<V> take() throws InterruptedException {
151     synchronized (tasks) {
152       while (completed == null && !cancelled) tasks.wait();
153     }
154     return completed;
155   }
156 
157   public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
158     synchronized (tasks) {
159       if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
160     }
161     return completed;
162   }
163 
164   public void cancelAll() {
165     
166     synchronized (tasks) {
167       cancelled = true;
168     }
169     for (QueueingFuture<V> future : tasks) {
170       if (future != null) future.cancel(true);
171     }
172   }
173 }