1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.ThreadLocalRandom;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Abortable;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
35  import org.apache.hadoop.util.StringUtils;
36  
37  import com.google.common.base.Preconditions;
38  import com.google.common.base.Strings;
39  
40  @InterfaceAudience.Private
41  @InterfaceStability.Evolving
42  public abstract class RpcExecutor {
43    private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
44  
45    private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
46    private final List<Thread> handlers;
47    private final int handlerCount;
48    private final String name;
49    private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
50  
51    private boolean running;
52  
53    private Configuration conf = null;
54    private Abortable abortable = null;
55  
56    public RpcExecutor(final String name, final int handlerCount) {
57      this.handlers = new ArrayList<Thread>(handlerCount);
58      this.handlerCount = handlerCount;
59      this.name = Strings.nullToEmpty(name);
60    }
61  
62    public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
63        final Abortable abortable) {
64      this(name, handlerCount);
65      this.conf = conf;
66      this.abortable = abortable;
67    }
68  
69    public void start(final int port) {
70      running = true;
71      startHandlers(port);
72    }
73  
74    public void stop() {
75      running = false;
76      for (Thread handler : handlers) {
77        handler.interrupt();
78      }
79    }
80  
81    public int getActiveHandlerCount() {
82      return activeHandlerCount.get();
83    }
84  
85    
86    public abstract int getQueueLength();
87  
88    
89    public abstract void dispatch(final CallRunner callTask) throws InterruptedException;
90  
91    
92    protected abstract List<BlockingQueue<CallRunner>> getQueues();
93  
94    protected void startHandlers(final int port) {
95      List<BlockingQueue<CallRunner>> callQueues = getQueues();
96      startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
97    }
98  
99    protected void startHandlers(final String nameSuffix, final int numHandlers,
100       final List<BlockingQueue<CallRunner>> callQueues,
101       final int qindex, final int qsize, final int port) {
102     final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
103     for (int i = 0; i < numHandlers; i++) {
104       final int index = qindex + (i % qsize);
105       Thread t = new Thread(new Runnable() {
106         @Override
107         public void run() {
108           consumerLoop(callQueues.get(index));
109         }
110       });
111       t.setDaemon(true);
112       t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
113         ",queue=" + index + ",port=" + port);
114       t.start();
115       LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
116       handlers.add(t);
117     }
118   }
119 
120   protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
121     boolean interrupted = false;
122     double handlerFailureThreshhold =
123         conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
124           HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
125     try {
126       while (running) {
127         try {
128           MonitoredRPCHandler status = RpcServer.getStatus();
129           CallRunner task = myQueue.take();
130           task.setStatus(status);
131           try {
132             activeHandlerCount.incrementAndGet();
133             task.run();
134           } catch (Throwable e) {
135             if (e instanceof Error) {
136               int failedCount = failedHandlerCount.incrementAndGet();
137               if (handlerFailureThreshhold >= 0
138                   && failedCount > handlerCount * handlerFailureThreshhold) {
139                 String message =
140                     "Number of failed RpcServer handler exceeded threshhold "
141                         + handlerFailureThreshhold + "  with failed reason: "
142                         + StringUtils.stringifyException(e);
143                 if (abortable != null) {
144                   abortable.abort(message, e);
145                 } else {
146                   LOG.error("Received " + StringUtils.stringifyException(e)
147                     + " but not aborting due to abortable being null");
148                   throw e;
149                 }
150               } else {
151                 LOG.warn("RpcServer handler threads encountered errors "
152                     + StringUtils.stringifyException(e));
153               }
154             } else {
155               LOG.warn("RpcServer handler threads encountered exceptions "
156                   + StringUtils.stringifyException(e));
157             }
158           } finally {
159             activeHandlerCount.decrementAndGet();
160           }
161         } catch (InterruptedException e) {
162           interrupted = true;
163         }
164       }
165     } finally {
166       if (interrupted) {
167         Thread.currentThread().interrupt();
168       }
169     }
170   }
171 
172   public static abstract class QueueBalancer {
173     
174 
175 
176     public abstract int getNextQueue();
177   }
178 
179   public static QueueBalancer getBalancer(int queueSize) {
180     Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
181     if (queueSize == 1) {
182       return ONE_QUEUE;
183     } else {
184       return new RandomQueueBalancer(queueSize);
185     }
186   }
187 
188   
189 
190 
191   private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
192 
193     @Override
194     public int getNextQueue() {
195       return 0;
196     }
197   };
198 
199   
200 
201 
202   private static class RandomQueueBalancer extends QueueBalancer {
203     private final int queueSize;
204 
205     public RandomQueueBalancer(int queueSize) {
206       this.queueSize = queueSize;
207     }
208 
209     public int getNextQueue() {
210       return ThreadLocalRandom.current().nextInt(queueSize);
211     }
212   }
213 }