1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.util.Random;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
44  import org.junit.After;
45  import org.junit.Before;
46  import org.junit.BeforeClass;
47  import org.junit.Rule;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  import org.junit.rules.TestName;
51  
52  
53  
54  
55  
56  
57  @Category(MediumTests.class)
58  public class TestParallelPut {
59    static final Log LOG = LogFactory.getLog(TestParallelPut.class);
60    @Rule public TestName name = new TestName(); 
61    
62    private HRegion region = null;
63    private static HBaseTestingUtility HBTU = new HBaseTestingUtility();
64    private static final int THREADS100 = 100;
65  
66    
67    static byte[] tableName;
68    static final byte[] qual1 = Bytes.toBytes("qual1");
69    static final byte[] qual2 = Bytes.toBytes("qual2");
70    static final byte[] qual3 = Bytes.toBytes("qual3");
71    static final byte[] value1 = Bytes.toBytes("value1");
72    static final byte[] value2 = Bytes.toBytes("value2");
73    static final byte [] row = Bytes.toBytes("rowA");
74    static final byte [] row2 = Bytes.toBytes("rowB");
75  
76    @BeforeClass
77    public static void beforeClass() {
78      
79      HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100);
80    }
81  
82  
83    
84  
85  
86    @Before
87    public void setUp() throws Exception {
88      tableName = Bytes.toBytes(name.getMethodName());
89    }
90  
91    @After
92    public void tearDown() throws Exception {
93      EnvironmentEdgeManagerTestHelper.reset();
94      if (region != null) region.close(true);
95    }
96    
97    public String getName() {
98      return name.getMethodName();
99    }
100 
101   
102   
103   
104   
105 
106   
107 
108 
109   @Test
110   public void testPut() throws IOException {
111     LOG.info("Starting testPut");
112     this.region = initHRegion(tableName, getName(), fam1);
113 
114     long value = 1L;
115 
116     Put put = new Put(row);
117     put.add(fam1, qual1, Bytes.toBytes(value));
118     region.put(put);
119 
120     assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value));
121   }
122 
123   
124 
125 
126   @Test
127   public void testParallelPuts() throws IOException {
128 
129     LOG.info("Starting testParallelPuts");
130 
131     this.region = initHRegion(tableName, getName(), fam1);
132     int numOps = 1000; 
133 
134     
135     Putter[] all = new Putter[THREADS100];
136 
137     
138     for (int i = 0; i < THREADS100; i++) {
139       all[i] = new Putter(region, i, numOps);
140     }
141 
142     
143     for (int i = 0; i < THREADS100; i++) {
144       all[i].start();
145     }
146 
147     
148     for (int i = 0; i < THREADS100; i++) {
149       try {
150         all[i].join();
151       } catch (InterruptedException e) {
152         LOG.warn("testParallelPuts encountered InterruptedException." +
153                  " Ignoring....", e);
154       }
155     }
156     LOG.info("testParallelPuts successfully verified " + 
157              (numOps * THREADS100) + " put operations.");
158   }
159 
160 
161   private static void assertGet(final HRegion region, byte [] row, byte [] familiy,
162       byte[] qualifier, byte[] value) throws IOException {
163     
164     Get get = new Get(row);
165     get.addColumn(familiy, qualifier);
166     Result result = region.get(get);
167     assertEquals(1, result.size());
168 
169     Cell kv = result.rawCells()[0];
170     byte[] r = CellUtil.cloneValue(kv);
171     assertTrue(Bytes.compareTo(r, value) == 0);
172   }
173 
174   private HRegion initHRegion(byte [] tableName, String callingMethod,
175     byte[] ... families)
176   throws IOException {
177     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
178     for(byte [] family : families) {
179       htd.addFamily(new HColumnDescriptor(family));
180     }
181     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
182     return HBTU.createLocalHRegion(info, htd);
183   }
184 
185   
186 
187 
188   public static class Putter extends Thread {
189 
190     private final HRegion region;
191     private final int threadNumber;
192     private final int numOps;
193     private final Random rand = new Random();
194     byte [] rowkey = null;
195 
196     public Putter(HRegion region, int threadNumber, int numOps) {
197       this.region = region;
198       this.threadNumber = threadNumber;
199       this.numOps = numOps;
200       this.rowkey = Bytes.toBytes((long)threadNumber); 
201       setDaemon(true);
202     }
203 
204     @Override
205     public void run() {
206       byte[] value = new byte[100];
207       Put[]  in = new Put[1];
208 
209       
210       for (int i=0; i<numOps; i++) {
211         
212         rand.nextBytes(value);  
213 
214         
215         
216         Put put = new Put(rowkey);
217         put.add(fam1, qual1, value);
218         in[0] = put;
219         try {
220           OperationStatus[] ret = region.batchMutate(in);
221           assertEquals(1, ret.length);
222           assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
223           assertGet(this.region, rowkey, fam1, qual1, value);
224         } catch (IOException e) {
225           assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
226                      false);
227         }
228       }
229     }
230   }
231 }