1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import java.io.IOException;
26  import java.util.AbstractMap.SimpleEntry;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.ScheduledExecutorService;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicLong;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.HBaseConfiguration;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HRegionLocation;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  @InterfaceAudience.Public
71  @InterfaceStability.Evolving
72  public class HTableMultiplexer {
73    private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
74  
75    public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
76        "hbase.tablemultiplexer.flush.period.ms";
77    public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
78    public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
79        "hbase.client.max.retries.in.queue";
80  
81    
82    private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
83        new ConcurrentHashMap<>();
84  
85    private final Configuration workerConf;
86    private final ClusterConnection conn;
87    private final ExecutorService pool;
88    private final int retryNum;
89    private final int perRegionServerBufferQueueSize;
90    private final int maxKeyValueSize;
91    private final ScheduledExecutorService executor;
92    private final long flushPeriod;
93  
94    
95  
96  
97  
98  
99    public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
100       throws IOException {
101     this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
102   }
103 
104   
105 
106 
107 
108 
109 
110   public HTableMultiplexer(Connection conn, Configuration conf,
111       int perRegionServerBufferQueueSize) {
112     this.conn = (ClusterConnection) conn;
113     this.pool = HTable.getDefaultExecutor(conf);
114     this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
115         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
116     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
117     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
118     this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
119     int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
120     this.executor =
121         Executors.newScheduledThreadPool(initThreads,
122           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
123 
124     this.workerConf = HBaseConfiguration.create(conf);
125     
126     
127     this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
128   }
129 
130   
131 
132 
133 
134 
135   @SuppressWarnings("deprecation")
136   public synchronized void close() throws IOException {
137     if (!getConnection().isClosed()) {
138       getConnection().close();
139     }
140   }
141 
142   
143 
144 
145 
146 
147 
148 
149 
150   public boolean put(TableName tableName, final Put put) {
151     return put(tableName, put, this.retryNum);
152   }
153 
154   
155 
156 
157 
158 
159 
160 
161 
162   public List<Put> put(TableName tableName, final List<Put> puts) {
163     if (puts == null)
164       return null;
165 
166     List <Put> failedPuts = null;
167     boolean result;
168     for (Put put : puts) {
169       result = put(tableName, put, this.retryNum);
170       if (result == false) {
171 
172         
173         if (failedPuts == null) {
174           failedPuts = new ArrayList<Put>();
175         }
176         
177         failedPuts.add(put);
178       }
179     }
180     return failedPuts;
181   }
182 
183   
184 
185 
186   @Deprecated
187   public List<Put> put(byte[] tableName, final List<Put> puts) {
188     return put(TableName.valueOf(tableName), puts);
189   }
190 
191   
192 
193 
194 
195 
196 
197 
198   public boolean put(final TableName tableName, final Put put, int retry) {
199     if (retry <= 0) {
200       return false;
201     }
202 
203     try {
204       HTable.validatePut(put, maxKeyValueSize);
205       
206       ClusterConnection conn = (ClusterConnection) getConnection();
207       
208       
209       HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
210       if (loc != null) {
211         
212         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
213 
214         
215         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
216 
217         return queue.offer(s);
218       }
219     } catch (IOException e) {
220       LOG.debug("Cannot process the put " + put, e);
221     }
222     return false;
223   }
224 
225   
226 
227 
228   @Deprecated
229   public boolean put(final byte[] tableName, final Put put, int retry) {
230     return put(TableName.valueOf(tableName), put, retry);
231   }
232 
233   
234 
235 
236   @Deprecated
237   public boolean put(final byte[] tableName, Put put) {
238     return put(TableName.valueOf(tableName), put);
239   }
240 
241   
242 
243 
244   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
245     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
246   }
247 
248   @VisibleForTesting
249   LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
250     FlushWorker worker = serverToFlushWorkerMap.get(addr);
251     if (worker == null) {
252       synchronized (this.serverToFlushWorkerMap) {
253         worker = serverToFlushWorkerMap.get(addr);
254         if (worker == null) {
255           
256           worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize,
257                   pool, executor);
258           this.serverToFlushWorkerMap.put(addr, worker);
259           executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
260         }
261       }
262     }
263     return worker.getQueue();
264   }
265 
266   @VisibleForTesting
267   ClusterConnection getConnection() {
268     return this.conn;
269   }
270 
271   
272 
273 
274 
275 
276   @InterfaceAudience.Public
277   @InterfaceStability.Evolving
278   public static class HTableMultiplexerStatus {
279     private long totalFailedPutCounter;
280     private long totalBufferedPutCounter;
281     private long maxLatency;
282     private long overallAverageLatency;
283     private Map<String, Long> serverToFailedCounterMap;
284     private Map<String, Long> serverToBufferedCounterMap;
285     private Map<String, Long> serverToAverageLatencyMap;
286     private Map<String, Long> serverToMaxLatencyMap;
287 
288     public HTableMultiplexerStatus(
289         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
290       this.totalBufferedPutCounter = 0;
291       this.totalFailedPutCounter = 0;
292       this.maxLatency = 0;
293       this.overallAverageLatency = 0;
294       this.serverToBufferedCounterMap = new HashMap<String, Long>();
295       this.serverToFailedCounterMap = new HashMap<String, Long>();
296       this.serverToAverageLatencyMap = new HashMap<String, Long>();
297       this.serverToMaxLatencyMap = new HashMap<String, Long>();
298       this.initialize(serverToFlushWorkerMap);
299     }
300 
301     private void initialize(
302         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
303       if (serverToFlushWorkerMap == null) {
304         return;
305       }
306 
307       long averageCalcSum = 0;
308       int averageCalcCount = 0;
309       for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
310           .entrySet()) {
311         HRegionLocation addr = entry.getKey();
312         FlushWorker worker = entry.getValue();
313 
314         long bufferedCounter = worker.getTotalBufferedCount();
315         long failedCounter = worker.getTotalFailedCount();
316         long serverMaxLatency = worker.getMaxLatency();
317         AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
318         
319         SimpleEntry<Long, Integer> averageComponents = averageCounter
320             .getComponents();
321         long serverAvgLatency = averageCounter.getAndReset();
322 
323         this.totalBufferedPutCounter += bufferedCounter;
324         this.totalFailedPutCounter += failedCounter;
325         if (serverMaxLatency > this.maxLatency) {
326           this.maxLatency = serverMaxLatency;
327         }
328         averageCalcSum += averageComponents.getKey();
329         averageCalcCount += averageComponents.getValue();
330 
331         this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
332             bufferedCounter);
333         this.serverToFailedCounterMap
334             .put(addr.getHostnamePort(),
335             failedCounter);
336         this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
337             serverAvgLatency);
338         this.serverToMaxLatencyMap
339             .put(addr.getHostnamePort(),
340             serverMaxLatency);
341       }
342       this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
343           / averageCalcCount : 0;
344     }
345 
346     public long getTotalBufferedCounter() {
347       return this.totalBufferedPutCounter;
348     }
349 
350     public long getTotalFailedCounter() {
351       return this.totalFailedPutCounter;
352     }
353 
354     public long getMaxLatency() {
355       return this.maxLatency;
356     }
357 
358     public long getOverallAverageLatency() {
359       return this.overallAverageLatency;
360     }
361 
362     public Map<String, Long> getBufferedCounterForEachRegionServer() {
363       return this.serverToBufferedCounterMap;
364     }
365 
366     public Map<String, Long> getFailedCounterForEachRegionServer() {
367       return this.serverToFailedCounterMap;
368     }
369 
370     public Map<String, Long> getMaxLatencyForEachRegionServer() {
371       return this.serverToMaxLatencyMap;
372     }
373 
374     public Map<String, Long> getAverageLatencyForEachRegionServer() {
375       return this.serverToAverageLatencyMap;
376     }
377   }
378 
379   @VisibleForTesting
380   static class PutStatus {
381     public final HRegionInfo regionInfo;
382     public final Put put;
383     public final int retryCount;
384 
385     public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
386       this.regionInfo = regionInfo;
387       this.put = put;
388       this.retryCount = retryCount;
389     }
390   }
391 
392   
393 
394 
395   private static class AtomicAverageCounter {
396     private long sum;
397     private int count;
398 
399     public AtomicAverageCounter() {
400       this.sum = 0L;
401       this.count = 0;
402     }
403 
404     public synchronized long getAndReset() {
405       long result = this.get();
406       this.reset();
407       return result;
408     }
409 
410     public synchronized long get() {
411       if (this.count == 0) {
412         return 0;
413       }
414       return this.sum / this.count;
415     }
416 
417     public synchronized SimpleEntry<Long, Integer> getComponents() {
418       return new SimpleEntry<Long, Integer>(sum, count);
419     }
420 
421     public synchronized void reset() {
422       this.sum = 0l;
423       this.count = 0;
424     }
425 
426     public synchronized void add(long value) {
427       this.sum += value;
428       this.count++;
429     }
430   }
431 
432   @VisibleForTesting
433   static class FlushWorker implements Runnable {
434     private final HRegionLocation addr;
435     private final LinkedBlockingQueue<PutStatus> queue;
436     private final HTableMultiplexer multiplexer;
437     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
438     private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
439     private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
440     private final AtomicLong maxLatency = new AtomicLong(0);
441 
442     private final AsyncProcess ap;
443     private final List<PutStatus> processingList = new ArrayList<>();
444     private final ScheduledExecutorService executor;
445     private final int maxRetryInQueue;
446     private final AtomicInteger retryInQueue = new AtomicInteger(0);
447 
448     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
449         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
450         ExecutorService pool, ScheduledExecutorService executor) {
451       this.addr = addr;
452       this.multiplexer = htableMultiplexer;
453       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
454       RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
455       RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
456       this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
457       this.executor = executor;
458       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
459     }
460 
461     protected LinkedBlockingQueue<PutStatus> getQueue() {
462       return this.queue;
463     }
464 
465     public long getTotalFailedCount() {
466       return totalFailedPutCount.get();
467     }
468 
469     public long getTotalBufferedCount() {
470       return queue.size() + currentProcessingCount.get();
471     }
472 
473     public AtomicAverageCounter getAverageLatencyCounter() {
474       return this.averageLatency;
475     }
476 
477     public long getMaxLatency() {
478       return this.maxLatency.getAndSet(0);
479     }
480 
481     boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
482       
483       final int retryCount = ps.retryCount - 1;
484 
485       if (retryCount <= 0) {
486         
487         return false;
488       }
489 
490       int cnt = getRetryInQueue().incrementAndGet();
491       if (cnt > getMaxRetryInQueue()) {
492         
493         getRetryInQueue().decrementAndGet();
494         return false;
495       }
496 
497       final Put failedPut = ps.put;
498       
499       final TableName tableName = ps.regionInfo.getTable();
500 
501       long delayMs = getNextDelay(retryCount);
502       if (LOG.isDebugEnabled()) {
503         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
504       }
505 
506       
507       
508       
509       
510       getExecutor().schedule(new Runnable() {
511         @Override
512         public void run() {
513           boolean succ = false;
514           try {
515             succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
516           } finally {
517             FlushWorker.this.getRetryInQueue().decrementAndGet();
518             if (!succ) {
519               FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
520             }
521           }
522         }
523       }, delayMs, TimeUnit.MILLISECONDS);
524       return true;
525     }
526 
527     @VisibleForTesting
528     long getNextDelay(int retryCount) {
529       return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
530           multiplexer.retryNum - retryCount - 1);
531     }
532 
533     @VisibleForTesting
534     AtomicInteger getRetryInQueue() {
535       return this.retryInQueue;
536     }
537 
538     @VisibleForTesting
539     int getMaxRetryInQueue() {
540       return this.maxRetryInQueue;
541     }
542 
543     @VisibleForTesting
544     AtomicLong getTotalFailedPutCount() {
545       return this.totalFailedPutCount;
546     }
547 
548     @VisibleForTesting
549     HTableMultiplexer getMultiplexer() {
550       return this.multiplexer;
551     }
552 
553     @VisibleForTesting
554     ScheduledExecutorService getExecutor() {
555       return this.executor;
556     }
557 
558     @Override
559     public void run() {
560       int failedCount = 0;
561       try {
562         long start = EnvironmentEdgeManager.currentTime();
563 
564         
565         processingList.clear();
566         queue.drainTo(processingList);
567         if (processingList.size() == 0) {
568           
569           return;
570         }
571 
572         currentProcessingCount.set(processingList.size());
573         
574         failedCount = processingList.size();
575 
576         List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
577         MultiAction<Row> actions = new MultiAction<>();
578         for (int i = 0; i < processingList.size(); i++) {
579           PutStatus putStatus = processingList.get(i);
580           Action<Row> action = new Action<Row>(putStatus.put, i);
581           actions.add(putStatus.regionInfo.getRegionName(), action);
582           retainedActions.add(action);
583         }
584 
585         
586         List<PutStatus> failed = null;
587         Object[] results = new Object[actions.size()];
588         ServerName server = addr.getServerName();
589         Map<ServerName, MultiAction<Row>> actionsByServer =
590             Collections.singletonMap(server, actions);
591         try {
592           AsyncRequestFuture arf =
593               ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
594                 null, actionsByServer, null);
595           arf.waitUntilDone();
596           if (arf.hasError()) {
597             
598             LOG.debug("Caught some exceptions when flushing puts to region server "
599                 + addr.getHostnamePort(), arf.getErrors());
600           }
601         } finally {
602           for (int i = 0; i < results.length; i++) {
603             if (results[i] instanceof Result) {
604               failedCount--;
605             } else {
606               if (failed == null) {
607                 failed = new ArrayList<PutStatus>();
608               }
609               failed.add(processingList.get(i));
610             }
611           }
612         }
613 
614         if (failed != null) {
615           
616           for (PutStatus putStatus : failed) {
617             if (resubmitFailedPut(putStatus, this.addr)) {
618               failedCount--;
619             }
620           }
621         }
622 
623         long elapsed = EnvironmentEdgeManager.currentTime() - start;
624         
625         averageLatency.add(elapsed);
626         if (elapsed > maxLatency.get()) {
627           maxLatency.set(elapsed);
628         }
629 
630         
631         if (LOG.isDebugEnabled()) {
632           LOG.debug("Processed " + currentProcessingCount + " put requests for "
633               + addr.getHostnamePort() + " and " + failedCount + " failed"
634               + ", latency for this send: " + elapsed);
635         }
636 
637         
638         currentProcessingCount.set(0);
639       } catch (RuntimeException e) {
640         
641         
642         LOG.debug(
643           "Caught some exceptions " + e + " when flushing puts to region server "
644               + addr.getHostnamePort(), e);
645       } catch (Exception e) {
646         if (e instanceof InterruptedException) {
647           Thread.currentThread().interrupt();
648         }
649         
650         LOG.debug(
651           "Caught some exceptions " + e + " when flushing puts to region server "
652               + addr.getHostnamePort(), e);
653       } finally {
654         
655         this.totalFailedPutCount.addAndGet(failedCount);
656       }
657     }
658   }
659 }