1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.client;
20  
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.HBaseTestingUtility;
28  import org.apache.hadoop.hbase.testclassification.MediumTests;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
31  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
32  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
33  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.util.Threads;
36  import org.junit.AfterClass;
37  import org.junit.Assert;
38  import org.junit.BeforeClass;
39  import org.junit.Test;
40  import org.junit.experimental.categories.Category;
41  
42  import java.io.IOException;
43  import java.io.InterruptedIOException;
44  import java.net.SocketTimeoutException;
45  import java.util.ArrayList;
46  import java.util.List;
47  import java.util.concurrent.atomic.AtomicInteger;
48  
49  @Category(MediumTests.class)
50  public class TestClientOperationInterrupt {
51    private static final Log LOG = LogFactory.getLog(TestClientOperationInterrupt.class);
52  
53    private static HBaseTestingUtility util;
54    private static final TableName tableName = TableName.valueOf("test");
55    private static final byte[] dummy = Bytes.toBytes("dummy");
56    private static final byte[] row1 = Bytes.toBytes("r1");
57    private static final byte[] test = Bytes.toBytes("test");
58    private static Configuration conf;
59  
60    public static class TestCoprocessor extends BaseRegionObserver {
61      @Override
62      public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
63                           final Get get, final List<Cell> results) throws IOException {
64        Threads.sleep(2500);
65      }
66    }
67  
68  
69    @BeforeClass
70    public static void setUpBeforeClass() throws Exception {
71      conf = HBaseConfiguration.create();
72      conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
73          TestCoprocessor.class.getName());
74      util = new HBaseTestingUtility(conf);
75      util.startMiniCluster();
76  
77      Admin admin = util.getHBaseAdmin();
78      if (admin.tableExists(tableName)) {
79        if (admin.isTableEnabled(tableName)) {
80          admin.disableTable(tableName);
81        }
82        admin.deleteTable(tableName);
83      }
84      util.createTable(tableName, new byte[][]{dummy, test});
85  
86      Table ht = new HTable(conf, tableName);
87      Put p = new Put(row1);
88      p.add(dummy, dummy, dummy);
89      ht.put(p);
90    }
91  
92  
93    @Test
94    public void testInterrupt50Percent() throws IOException, InterruptedException {
95      final AtomicInteger noEx = new AtomicInteger(0);
96      final AtomicInteger badEx = new AtomicInteger(0);
97      final AtomicInteger noInt = new AtomicInteger(0);
98      final AtomicInteger done = new AtomicInteger(0);
99      List<Thread> threads = new ArrayList<Thread>();
100 
101     final int nbThread = 100;
102 
103     for (int i = 0; i < nbThread; i++) {
104       Thread t = new Thread() {
105         @Override
106         public void run() {
107           try {
108             Table ht = new HTable(conf, tableName);
109             Result r = ht.get(new Get(row1));
110             noEx.incrementAndGet();
111           } catch (IOException e) {
112             LOG.info("exception", e);
113             if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
114               badEx.incrementAndGet();
115             } else {
116               if (Thread.currentThread().isInterrupted()) {
117                 noInt.incrementAndGet();
118                 LOG.info("The thread should NOT be with the 'interrupt' status.");
119               }
120             }
121           } finally {
122             done.incrementAndGet();
123           }
124         }
125       };
126       t.setName("TestClientOperationInterrupt #" + i);
127       threads.add(t);
128       t.start();
129     }
130 
131     for (int i = 0; i < nbThread / 2; i++) {
132       threads.get(i).interrupt();
133     }
134 
135 
136     boolean stillAlive = true;
137     while (stillAlive) {
138       stillAlive = false;
139       for (Thread t : threads) {
140         if (t.isAlive()) {
141           stillAlive = true;
142         }
143       }
144       Threads.sleep(10);
145     }
146 
147     Assert.assertFalse(Thread.currentThread().isInterrupted());
148 
149     Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
150         noEx.get() == nbThread / 2 && badEx.get() == 0);
151 
152     
153     while (done.get() != nbThread){
154       Thread.sleep(1);
155     }
156 
157     Table ht = new HTable(conf, tableName);
158     Result r = ht.get(new Get(row1));
159     Assert.assertFalse(r.isEmpty());
160   }
161 
162   @AfterClass
163   public static void tearDownAfterClass() throws Exception {
164     util.shutdownMiniCluster();
165   }
166 }