1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.coprocessor.Batch;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
35  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.PoolMap;
38  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
39  
40  import com.google.protobuf.Descriptors;
41  import com.google.protobuf.Message;
42  import com.google.protobuf.Service;
43  import com.google.protobuf.ServiceException;
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  @InterfaceAudience.Private
69  @Deprecated
70  public class HTablePool implements Closeable {
71    private final PoolMap<String, HTableInterface> tables;
72    private final int maxSize;
73    private final PoolType poolType;
74    private final Configuration config;
75    private final HTableInterfaceFactory tableFactory;
76  
77    
78  
79  
80    public HTablePool() {
81      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
82    }
83  
84    
85  
86  
87  
88  
89  
90  
91  
92    public HTablePool(final Configuration config, final int maxSize) {
93      this(config, maxSize, null, null);
94    }
95  
96    
97  
98  
99  
100 
101 
102 
103 
104 
105 
106 
107   public HTablePool(final Configuration config, final int maxSize,
108       final HTableInterfaceFactory tableFactory) {
109     this(config, maxSize, tableFactory, PoolType.Reusable);
110   }
111 
112   
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124   public HTablePool(final Configuration config, final int maxSize,
125       final PoolType poolType) {
126     this(config, maxSize, null, poolType);
127   }
128 
129   
130 
131 
132 
133 
134 
135 
136 
137 
138 
139 
140 
141 
142 
143 
144 
145 
146   public HTablePool(final Configuration config, final int maxSize,
147       final HTableInterfaceFactory tableFactory, PoolType poolType) {
148     
149     
150     this.config = config == null ? HBaseConfiguration.create() : config;
151     this.maxSize = maxSize;
152     this.tableFactory = tableFactory == null ? new HTableFactory()
153         : tableFactory;
154     if (poolType == null) {
155       this.poolType = PoolType.Reusable;
156     } else {
157       switch (poolType) {
158       case Reusable:
159       case ThreadLocal:
160         this.poolType = poolType;
161         break;
162       default:
163         this.poolType = PoolType.Reusable;
164         break;
165       }
166     }
167     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
168         this.maxSize);
169   }
170 
171   
172 
173 
174 
175 
176 
177 
178 
179 
180 
181 
182   public HTableInterface getTable(String tableName) {
183     
184     HTableInterface table = findOrCreateTable(tableName);
185     
186     
187     return new PooledHTable(table);
188   }
189 
190   
191 
192 
193 
194 
195 
196 
197 
198 
199 
200 
201 
202   private HTableInterface findOrCreateTable(String tableName) {
203     HTableInterface table = tables.get(tableName);
204     if (table == null) {
205       table = createHTable(tableName);
206     }
207     return table;
208   }
209 
210   
211 
212 
213 
214 
215 
216 
217 
218 
219 
220 
221 
222   public HTableInterface getTable(byte[] tableName) {
223     return getTable(Bytes.toString(tableName));
224   }
225 
226   
227 
228 
229 
230 
231 
232 
233 
234   public void putTable(HTableInterface table) throws IOException {
235     
236     
237     
238     
239     
240     
241     if (table instanceof PooledHTable) {
242       returnTable(((PooledHTable) table).getWrappedTable());
243     } else {
244       
245       
246       
247       
248       throw new IllegalArgumentException("not a pooled table: " + table);
249     }
250   }
251 
252   
253 
254 
255 
256 
257 
258 
259 
260 
261 
262   private void returnTable(HTableInterface table) throws IOException {
263     
264     String tableName = Bytes.toString(table.getTableName());
265     if (tables.size(tableName) >= maxSize) {
266       
267       this.tables.removeValue(tableName, table);
268       this.tableFactory.releaseHTableInterface(table);
269       return;
270     }
271     tables.put(tableName, table);
272   }
273 
274   protected HTableInterface createHTable(String tableName) {
275     return this.tableFactory.createHTableInterface(config,
276         Bytes.toBytes(tableName));
277   }
278 
279   
280 
281 
282 
283 
284 
285 
286 
287 
288 
289   public void closeTablePool(final String tableName) throws IOException {
290     Collection<HTableInterface> tables = this.tables.values(tableName);
291     if (tables != null) {
292       for (HTableInterface table : tables) {
293         this.tableFactory.releaseHTableInterface(table);
294       }
295     }
296     this.tables.remove(tableName);
297   }
298 
299   
300 
301 
302 
303 
304   public void closeTablePool(final byte[] tableName) throws IOException {
305     closeTablePool(Bytes.toString(tableName));
306   }
307 
308   
309 
310 
311 
312 
313 
314   public void close() throws IOException {
315     for (String tableName : tables.keySet()) {
316       closeTablePool(tableName);
317     }
318     this.tables.clear();
319   }
320 
321   public int getCurrentPoolSize(String tableName) {
322     return tables.size(tableName);
323   }
324 
325   
326 
327 
328 
329 
330   class PooledHTable implements HTableInterface {
331 
332     private boolean open = false;
333 
334     private HTableInterface table; 
335 
336     public PooledHTable(HTableInterface table) {
337       this.table = table;
338       this.open = true;
339     }
340 
341     @Override
342     public byte[] getTableName() {
343       checkState();
344       return table.getTableName();
345     }
346 
347     @Override
348     public TableName getName() {
349       return table.getName();
350     }
351 
352     @Override
353     public Configuration getConfiguration() {
354       checkState();
355       return table.getConfiguration();
356     }
357 
358     @Override
359     public HTableDescriptor getTableDescriptor() throws IOException {
360       checkState();
361       return table.getTableDescriptor();
362     }
363 
364     @Override
365     public boolean exists(Get get) throws IOException {
366       checkState();
367       return table.exists(get);
368     }
369 
370     @Override
371     public boolean[] existsAll(List<Get> gets) throws IOException {
372       checkState();
373       return table.existsAll(gets);
374     }
375 
376     @Override
377     public Boolean[] exists(List<Get> gets) throws IOException {
378       checkState();
379       return table.exists(gets);
380     }
381 
382     @Override
383     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
384         InterruptedException {
385       checkState();
386       table.batch(actions, results);
387     }
388 
389     
390 
391 
392 
393 
394     @Override
395     public Object[] batch(List<? extends Row> actions) throws IOException,
396         InterruptedException {
397       checkState();
398       return table.batch(actions);
399     }
400 
401     @Override
402     public Result get(Get get) throws IOException {
403       checkState();
404       return table.get(get);
405     }
406 
407     @Override
408     public Result[] get(List<Get> gets) throws IOException {
409       checkState();
410       return table.get(gets);
411     }
412 
413     @Override
414     @SuppressWarnings("deprecation")
415     @Deprecated
416     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
417       checkState();
418       return table.getRowOrBefore(row, family);
419     }
420 
421     @Override
422     public ResultScanner getScanner(Scan scan) throws IOException {
423       checkState();
424       return table.getScanner(scan);
425     }
426 
427     @Override
428     public ResultScanner getScanner(byte[] family) throws IOException {
429       checkState();
430       return table.getScanner(family);
431     }
432 
433     @Override
434     public ResultScanner getScanner(byte[] family, byte[] qualifier)
435         throws IOException {
436       checkState();
437       return table.getScanner(family, qualifier);
438     }
439 
440     @Override
441     public void put(Put put) throws IOException {
442       checkState();
443       table.put(put);
444     }
445 
446     @Override
447     public void put(List<Put> puts) throws IOException {
448       checkState();
449       table.put(puts);
450     }
451 
452     @Override
453     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
454         byte[] value, Put put) throws IOException {
455       checkState();
456       return table.checkAndPut(row, family, qualifier, value, put);
457     }
458 
459     @Override
460     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
461         CompareOp compareOp, byte[] value, Put put) throws IOException {
462       checkState();
463       return table.checkAndPut(row, family, qualifier, compareOp, value, put);
464     }
465 
466     @Override
467     public void delete(Delete delete) throws IOException {
468       checkState();
469       table.delete(delete);
470     }
471 
472     @Override
473     public void delete(List<Delete> deletes) throws IOException {
474       checkState();
475       table.delete(deletes);
476     }
477 
478     @Override
479     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
480         byte[] value, Delete delete) throws IOException {
481       checkState();
482       return table.checkAndDelete(row, family, qualifier, value, delete);
483     }
484 
485     @Override
486     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
487         CompareOp compareOp, byte[] value, Delete delete) throws IOException {
488       checkState();
489       return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
490     }
491 
492     @Override
493     public Result increment(Increment increment) throws IOException {
494       checkState();
495       return table.increment(increment);
496     }
497 
498     @Override
499     public long incrementColumnValue(byte[] row, byte[] family,
500         byte[] qualifier, long amount) throws IOException {
501       checkState();
502       return table.incrementColumnValue(row, family, qualifier, amount);
503     }
504 
505     @Override
506     public long incrementColumnValue(byte[] row, byte[] family,
507         byte[] qualifier, long amount, Durability durability) throws IOException {
508       checkState();
509       return table.incrementColumnValue(row, family, qualifier, amount,
510           durability);
511     }
512 
513     @Override
514     public boolean isAutoFlush() {
515       checkState();
516       return table.isAutoFlush();
517     }
518 
519     @Override
520     public void flushCommits() throws IOException {
521       checkState();
522       table.flushCommits();
523     }
524 
525     
526 
527 
528 
529 
530     public void close() throws IOException {
531       checkState();
532       open = false;
533       returnTable(table);
534     }
535 
536     @Override
537     public CoprocessorRpcChannel coprocessorService(byte[] row) {
538       checkState();
539       return table.coprocessorService(row);
540     }
541 
542     @Override
543     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
544         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
545         throws ServiceException, Throwable {
546       checkState();
547       return table.coprocessorService(service, startKey, endKey, callable);
548     }
549 
550     @Override
551     public <T extends Service, R> void coprocessorService(Class<T> service,
552         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
553         throws ServiceException, Throwable {
554       checkState();
555       table.coprocessorService(service, startKey, endKey, callable, callback);
556     }
557 
558     @Override
559     public String toString() {
560       return "PooledHTable{" + ", table=" + table + '}';
561     }
562 
563     
564 
565 
566 
567 
568     HTableInterface getWrappedTable() {
569       return table;
570     }
571 
572     @Override
573     public <R> void batchCallback(List<? extends Row> actions,
574         Object[] results, Callback<R> callback) throws IOException,
575         InterruptedException {
576       checkState();
577       table.batchCallback(actions, results, callback);
578     }
579 
580     
581 
582 
583 
584 
585 
586 
587     @Override
588     public <R> Object[] batchCallback(List<? extends Row> actions,
589         Callback<R> callback) throws IOException, InterruptedException {
590       checkState();
591       return table.batchCallback(actions,  callback);
592     }
593 
594     @Override
595     public void mutateRow(RowMutations rm) throws IOException {
596       checkState();
597       table.mutateRow(rm);
598     }
599 
600     @Override
601     public Result append(Append append) throws IOException {
602       checkState();
603       return table.append(append);
604     }
605 
606     @Override
607     public void setAutoFlush(boolean autoFlush) {
608       checkState();
609       table.setAutoFlush(autoFlush, autoFlush);
610     }
611 
612     @Override
613     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
614       checkState();
615       table.setAutoFlush(autoFlush, clearBufferOnFail);
616     }
617 
618     @Override
619     public void setAutoFlushTo(boolean autoFlush) {
620       table.setAutoFlushTo(autoFlush);
621     }
622 
623     @Override
624     public long getWriteBufferSize() {
625       checkState();
626       return table.getWriteBufferSize();
627     }
628 
629     @Override
630     public void setWriteBufferSize(long writeBufferSize) throws IOException {
631       checkState();
632       table.setWriteBufferSize(writeBufferSize);
633     }
634 
635     boolean isOpen() {
636       return open;
637     }
638 
639     private void checkState() {
640       if (!isOpen()) {
641         throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
642       }
643     }
644 
645     @Override
646     public long incrementColumnValue(byte[] row, byte[] family,
647         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
648       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
649     }
650 
651     @Override
652     public <R extends Message> Map<byte[], R> batchCoprocessorService(
653         Descriptors.MethodDescriptor method, Message request,
654         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
655       checkState();
656       return table.batchCoprocessorService(method, request, startKey, endKey,
657           responsePrototype);
658     }
659 
660     @Override
661     public <R extends Message> void batchCoprocessorService(
662         Descriptors.MethodDescriptor method, Message request,
663         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
664         throws ServiceException, Throwable {
665       checkState();
666       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
667     }
668 
669     @Override
670     public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
671         byte[] value, RowMutations mutation) throws IOException {
672       checkState();
673       return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
674     }
675   }
676 }