1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.master;
20  
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.Collection;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.MiniHBaseCluster;
37  import org.apache.hadoop.hbase.TableDescriptors;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Durability;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.client.RegionLocator;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.ResultScanner;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47  import org.apache.hadoop.hbase.regionserver.HRegionServer;
48  import org.apache.hadoop.hbase.testclassification.MediumTests;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.Threads;
51  import org.junit.AfterClass;
52  import org.junit.Assert;
53  import org.junit.Before;
54  import org.junit.BeforeClass;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  import org.mockito.Mockito;
58  import org.mockito.internal.util.reflection.Whitebox;
59  
60  
61  
62  
63  @Category(MediumTests.class)
64  public class TestZKBasedOpenCloseRegion {
65    private static final Log LOG = LogFactory.getLog(TestZKBasedOpenCloseRegion.class);
66    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67    private static final TableName TABLENAME =
68        TableName.valueOf("TestZKBasedOpenCloseRegion");
69    private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
70      Bytes.toBytes("b"), Bytes.toBytes("c")};
71    private static int countOfRegions;
72  
73    @BeforeClass public static void beforeAllTests() throws Exception {
74      Configuration c = TEST_UTIL.getConfiguration();
75      c.setBoolean("hbase.assignment.usezk", true);
76      c.setBoolean("dfs.support.append", true);
77      c.setInt("hbase.regionserver.info.port", 0);
78      TEST_UTIL.startMiniCluster(2);
79      TEST_UTIL.createMultiRegionTable(TABLENAME, FAMILIES);
80      HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
81      countOfRegions = -1;
82      try (RegionLocator r = t.getRegionLocator()) {
83        countOfRegions = r.getStartKeys().length;
84      }
85      waitUntilAllRegionsAssigned();
86      addToEachStartKey(countOfRegions);
87      t.close();
88      TEST_UTIL.getHBaseCluster().getMaster().assignmentManager.initializeHandlerTrackers();
89    }
90  
91    @AfterClass public static void afterAllTests() throws Exception {
92      TEST_UTIL.shutdownMiniCluster();
93    }
94  
95    @Before public void setup() throws IOException {
96      if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
97        
98        LOG.info("Started new server=" +
99          TEST_UTIL.getHBaseCluster().startRegionServer());
100 
101     }
102     waitUntilAllRegionsAssigned();
103     waitOnRIT();
104   }
105 
106   
107 
108 
109 
110   @Test (timeout=300000) public void testReOpenRegion()
111   throws Exception {
112     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
113     LOG.info("Number of region servers = " +
114       cluster.getLiveRegionServerThreads().size());
115 
116     int rsIdx = 0;
117     HRegionServer regionServer =
118       TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
119     HRegionInfo hri = getNonMetaRegion(
120       ProtobufUtil.getOnlineRegions(regionServer.getRSRpcServices()));
121     LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
122 
123     LOG.info("Unassign " + hri.getRegionNameAsString());
124     cluster.getMaster().assignmentManager.unassign(hri);
125 
126     while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) {
127       Threads.sleep(100);
128     }
129 
130     while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
131       Threads.sleep(100);
132     }
133 
134     LOG.info("Done with testReOpenRegion");
135   }
136 
137   private HRegionInfo getNonMetaRegion(final Collection<HRegionInfo> regions) {
138     HRegionInfo hri = null;
139     for (HRegionInfo i: regions) {
140       LOG.info(i.getRegionNameAsString());
141       if (!i.isMetaRegion()) {
142         hri = i;
143         break;
144       }
145     }
146     return hri;
147   }
148 
149   
150 
151 
152 
153 
154   @Test
155   public void testRSAlreadyProcessingRegion() throws Exception {
156     LOG.info("starting testRSAlreadyProcessingRegion");
157     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
158 
159     HRegionServer hr0 =
160         cluster.getLiveRegionServerThreads().get(0).getRegionServer();
161     HRegionServer hr1 =
162         cluster.getLiveRegionServerThreads().get(1).getRegionServer();
163     HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(hr0.getRSRpcServices()));
164 
165     
166     hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true);
167 
168     
169     TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
170         Bytes.toBytes(hr1.getServerName().toString()));
171 
172     
173     assertEquals(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()), null);
174 
175     
176     hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes());
177 
178     
179     hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(hr1.getRSRpcServices()));
180 
181     TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
182         Bytes.toBytes(hr0.getServerName().toString()));
183 
184     while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
185       Threads.sleep(100);
186     }
187 
188     
189     assertTrue(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()) == null);
190 
191   }
192 
193   private void waitOnRIT() {
194     
195     
196     while (TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
197         getRegionStates().isRegionsInTransition()) {
198       LOG.info("Waiting on regions in transition: " +
199         TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
200           getRegionStates().getRegionsInTransition());
201       Threads.sleep(10);
202     }
203   }
204 
205   
206 
207 
208 
209 
210   @Test
211   public void testRegionOpenFailsDueToIOException() throws Exception {
212     HRegionInfo REGIONINFO = new HRegionInfo(TableName.valueOf("t"),
213         HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
214     HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
215     TableDescriptors htd = Mockito.mock(TableDescriptors.class);
216     Object orizinalState = Whitebox.getInternalState(regionServer,"tableDescriptors");
217     Whitebox.setInternalState(regionServer, "tableDescriptors", htd);
218     Mockito.doThrow(new IOException()).when(htd).get((TableName) Mockito.any());
219     try {
220       ProtobufUtil.openRegion(null, regionServer.getRSRpcServices(),
221         regionServer.getServerName(), REGIONINFO);
222       fail("It should throw IOException ");
223     } catch (IOException e) {
224     }
225     Whitebox.setInternalState(regionServer, "tableDescriptors", orizinalState);
226     assertFalse("Region should not be in RIT",
227         regionServer.getRegionsInTransitionInRS().containsKey(REGIONINFO.getEncodedNameAsBytes()));
228   }
229 
230   private static void waitUntilAllRegionsAssigned()
231   throws IOException {
232     HTable meta = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
233     while (true) {
234       int rows = 0;
235       Scan scan = new Scan();
236       scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
237       ResultScanner s = meta.getScanner(scan);
238       for (Result r = null; (r = s.next()) != null;) {
239         byte [] b =
240           r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
241         if (b == null || b.length <= 0) {
242           break;
243         }
244         rows++;
245       }
246       s.close();
247       
248       if (rows >= countOfRegions) {
249         break;
250       }
251       LOG.info("Found=" + rows);
252       Threads.sleep(1000);
253     }
254     meta.close();
255   }
256 
257   
258 
259 
260 
261 
262 
263 
264   private static int addToEachStartKey(final int expected) throws IOException {
265     HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
266     HTable meta = new HTable(TEST_UTIL.getConfiguration(),
267         TableName.META_TABLE_NAME);
268     int rows = 0;
269     Scan scan = new Scan();
270     scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
271     ResultScanner s = meta.getScanner(scan);
272     for (Result r = null; (r = s.next()) != null;) {
273       HRegionInfo hri = HRegionInfo.getHRegionInfo(r);
274       if (hri == null) break;
275       if(!hri.getTable().equals(TABLENAME)) {
276         continue;
277       }
278       
279       byte [] row = getStartKey(hri);
280       Put p = new Put(row);
281       p.setDurability(Durability.SKIP_WAL);
282       p.add(getTestFamily(), getTestQualifier(), row);
283       t.put(p);
284       rows++;
285     }
286     s.close();
287     Assert.assertEquals(expected, rows);
288     t.close();
289     meta.close();
290     return rows;
291   }
292 
293   private static byte [] getStartKey(final HRegionInfo hri) {
294     return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
295         Bytes.toBytes("aaa"): hri.getStartKey();
296   }
297 
298   private static byte [] getTestFamily() {
299     return FAMILIES[0];
300   }
301 
302   private static byte [] getTestQualifier() {
303     return getTestFamily();
304   }
305 }
306