1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  import static org.mockito.Mockito.mock;
24  import static org.mockito.Mockito.when;
25  
26  import java.io.IOException;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.List;
30  
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.testclassification.SmallTests;
42  import org.apache.hadoop.hbase.Stoppable;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.client.Durability;
45  import org.apache.hadoop.hbase.client.Get;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.wal.WALFactory;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.FSUtils;
51  import org.apache.hadoop.hbase.util.StoppableImplementation;
52  import org.junit.Assert;
53  import org.junit.Before;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  
57  @Category(SmallTests.class)
58  public class TestStoreFileRefresherChore {
59  
60    private HBaseTestingUtility TEST_UTIL;
61    private Path testDir;
62  
63    @Before
64    public void setUp() throws IOException {
65      TEST_UTIL = new HBaseTestingUtility();
66      testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore");
67      FSUtils.setRootDir(TEST_UTIL.getConfiguration(), testDir);
68    }
69  
70    private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) {
71      HTableDescriptor htd = new HTableDescriptor(tableName);
72      for (byte[] family : families) {
73        HColumnDescriptor hcd = new HColumnDescriptor(family);
74        
75        hcd.setMaxVersions(Integer.MAX_VALUE);
76        htd.addFamily(hcd);
77      }
78      return htd;
79    }
80  
81    static class FailingHRegionFileSystem extends HRegionFileSystem {
82      boolean fail = false;
83      FailingHRegionFileSystem(Configuration conf, FileSystem fs, Path tableDir, HRegionInfo regionInfo) {
84        super(conf, fs, tableDir, regionInfo);
85      }
86  
87      @Override
88      public Collection<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
89        if (fail) {
90          throw new IOException("simulating FS failure");
91        }
92        return super.getStoreFiles(familyName);
93      }
94    }
95  
96    private Region initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId)
97        throws IOException {
98      Configuration conf = TEST_UTIL.getConfiguration();
99      Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
100 
101     HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);
102 
103     HRegionFileSystem fs = new FailingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir,
104       info);
105     final Configuration walConf = new Configuration(conf);
106     FSUtils.setRootDir(walConf, tableDir);
107     final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId);
108     HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes()), conf, htd, null);
109 
110     region.initialize();
111 
112     return region;
113   }
114 
115   private void putData(Region region, int startRow, int numRows, byte[] qf, byte[]... families)
116       throws IOException {
117     for (int i = startRow; i < startRow + numRows; i++) {
118       Put put = new Put(Bytes.toBytes("" + i));
119       put.setDurability(Durability.SKIP_WAL);
120       for (byte[] family : families) {
121         put.add(family, qf, null);
122       }
123       region.put(put);
124     }
125   }
126 
127   private void verifyData(Region newReg, int startRow, int numRows, byte[] qf, byte[]... families)
128       throws IOException {
129     for (int i = startRow; i < startRow + numRows; i++) {
130       byte[] row = Bytes.toBytes("" + i);
131       Get get = new Get(row);
132       for (byte[] family : families) {
133         get.addColumn(family, qf);
134       }
135       Result result = newReg.get(get);
136       Cell[] raw = result.rawCells();
137       assertEquals(families.length, result.size());
138       for (int j = 0; j < families.length; j++) {
139         assertTrue(CellUtil.matchingRow(raw[j], row));
140         assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
141         assertTrue(CellUtil.matchingQualifier(raw[j], qf));
142       }
143     }
144   }
145 
146   static class StaleStorefileRefresherChore extends StorefileRefresherChore {
147     boolean isStale = false;
148     public StaleStorefileRefresherChore(int period, HRegionServer regionServer,
149         Stoppable stoppable) {
150       super(period, false, regionServer, stoppable);
151     }
152     @Override
153     protected boolean isRegionStale(String encodedName, long time) {
154       return isStale;
155     }
156   }
157 
158   @Test (timeout = 60000)
159   public void testIsStale() throws IOException {
160     int period = 0;
161     byte[][] families = new byte[][] {Bytes.toBytes("cf")};
162     byte[] qf = Bytes.toBytes("cq");
163 
164     HRegionServer regionServer = mock(HRegionServer.class);
165     List<Region> regions = new ArrayList<Region>();
166     when(regionServer.getOnlineRegionsLocalContext()).thenReturn(regions);
167     when(regionServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
168 
169     HTableDescriptor htd = getTableDesc(TableName.valueOf("testIsStale"), families);
170     Region primary = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0);
171     Region replica1 = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1);
172     regions.add(primary);
173     regions.add(replica1);
174 
175     StaleStorefileRefresherChore chore = new StaleStorefileRefresherChore(period, regionServer, new StoppableImplementation());
176 
177     
178     putData(primary, 0, 100, qf, families);
179     primary.flush(true);
180     verifyData(primary, 0, 100, qf, families);
181 
182     try {
183       verifyData(replica1, 0, 100, qf, families);
184       Assert.fail("should have failed");
185     } catch(AssertionError ex) {
186       
187     }
188     chore.chore();
189     verifyData(replica1, 0, 100, qf, families);
190 
191     
192     ((FailingHRegionFileSystem)((HRegion)replica1).getRegionFileSystem()).fail = true;
193 
194     
195     putData(primary, 100, 100, qf, families);
196     primary.flush(true);
197     verifyData(primary, 0, 200, qf, families);
198 
199     chore.chore(); 
200 
201     verifyData(replica1, 0, 100, qf, families);
202     try {
203       verifyData(replica1, 100, 100, qf, families);
204       Assert.fail("should have failed");
205     } catch(AssertionError ex) {
206       
207     }
208 
209     chore.isStale = true;
210     chore.chore(); 
211     try {
212       verifyData(replica1, 0, 100, qf, families);
213       Assert.fail("should have failed with IOException");
214     } catch(IOException ex) {
215       
216     }
217   }
218 }