1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FSDataOutputStream;
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.hbase.CellUtil;
25  import org.apache.hadoop.hbase.DoNotRetryIOException;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.HColumnDescriptor;
28  import org.apache.hadoop.hbase.HRegionInfo;
29  import org.apache.hadoop.hbase.HTableDescriptor;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.testclassification.SmallTests;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.io.hfile.HFile;
34  import org.apache.hadoop.hbase.io.hfile.HFileContext;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
37  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
38  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.Pair;
41  import org.apache.hadoop.hbase.wal.WAL;
42  import org.apache.hadoop.hbase.wal.WALKey;
43  import org.hamcrest.Description;
44  import org.hamcrest.Matcher;
45  import org.hamcrest.TypeSafeMatcher;
46  import org.jmock.Expectations;
47  import org.jmock.integration.junit4.JUnitRuleMockery;
48  import org.jmock.lib.concurrent.Synchroniser;
49  import org.junit.Before;
50  import org.junit.ClassRule;
51  import org.junit.Rule;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  import org.junit.rules.TemporaryFolder;
55  import org.junit.rules.TestName;
56  
57  import java.io.File;
58  import java.io.FileNotFoundException;
59  import java.io.FileOutputStream;
60  import java.io.IOException;
61  import java.util.ArrayList;
62  import java.util.Arrays;
63  import java.util.List;
64  import java.util.Random;
65  import java.util.concurrent.atomic.AtomicLong;
66  
67  import static java.util.Arrays.asList;
68  import static org.junit.Assert.assertEquals;
69  import static org.junit.Assert.assertNotNull;
70  import static org.junit.Assert.assertTrue;
71  
72  
73  
74  
75  @Category(SmallTests.class)
76  public class TestBulkLoad {
77  
78    @ClassRule
79    public static TemporaryFolder testFolder = new TemporaryFolder();
80    @Rule
81    public final JUnitRuleMockery context = new JUnitRuleMockery() {{
82      setThreadingPolicy(new Synchroniser());
83    }};
84    private final WAL log = context.mock(WAL.class);
85    private final Configuration conf = HBaseConfiguration.create();
86    private final Random random = new Random();
87    private final byte[] randomBytes = new byte[100];
88    private final byte[] family1 = Bytes.toBytes("family1");
89    private final byte[] family2 = Bytes.toBytes("family2");
90    private final Expectations callOnce;
91    @Rule
92    public TestName name = new TestName();
93  
94    public TestBulkLoad() throws IOException {
95      callOnce = new Expectations() {
96        {
97          oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
98                  with(any(WALKey.class)), with(bulkLogWalEditType(WALEdit.BULK_LOAD)),
99                  with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
100         will(returnValue(0l));
101         oneOf(log).sync(with(any(long.class)));
102       }
103     };
104   }
105 
106   @Before
107   public void before() throws IOException {
108     random.nextBytes(randomBytes);
109   }
110 
111   @Test
112   public void verifyBulkLoadEvent() throws IOException {
113     TableName tableName = TableName.valueOf("test", "test");
114     List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
115     byte[] familyName = familyPaths.get(0).getFirst();
116     String storeFileName = familyPaths.get(0).getSecond();
117     storeFileName = (new Path(storeFileName)).getName();
118     List<String> storeFileNames = new ArrayList<String>();
119     storeFileNames.add(storeFileName);
120     final Matcher<WALEdit> bulkEventMatcher = bulkLogWalEdit(WALEdit.BULK_LOAD,
121       tableName.toBytes(), familyName, storeFileNames);
122     Expectations expection = new Expectations() {
123       {
124         oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
125                 with(any(WALKey.class)), with(bulkEventMatcher),
126                 with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
127         will(returnValue(0l));
128         oneOf(log).sync(with(any(long.class)));
129       }
130     };
131     context.checking(expection);
132     testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
133         .bulkLoadHFiles(familyPaths, false, null);
134   }
135 
136   @Test
137   public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
138     testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<Pair<byte[], String>>(),
139       false, null);
140   }
141 
142   @Test
143   public void shouldBulkLoadSingleFamilyHLog() throws IOException {
144     context.checking(callOnce);
145     testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
146   }
147 
148   @Test
149   public void shouldBulkLoadManyFamilyHLog() throws IOException {
150     context.checking(callOnce);
151     testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
152         false, null);
153   }
154 
155   @Test
156   public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
157     context.checking(callOnce);
158     TableName tableName = TableName.valueOf("test", "test");
159     testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
160         .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
161   }
162 
163   @Test(expected = DoNotRetryIOException.class)
164   public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
165     testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
166       null);
167   }
168 
169   @Test(expected = DoNotRetryIOException.class)
170   public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
171       throws IOException {
172     testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
173   }
174 
175   @Test(expected = DoNotRetryIOException.class)
176   public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
177     testRegionWithFamilies()
178         .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
179             false, null);
180   }
181 
182   @Test(expected = FileNotFoundException.class)
183   public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
184     List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
185     testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
186   }
187 
188   private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
189     return new Pair<byte[], String>(family, "/tmp/does_not_exist");
190   }
191 
192   private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
193       throws IOException {
194     createHFileForFamilies(family);
195     return new Pair<byte[], String>(new byte[]{0x00, 0x01, 0x02}, "/tmp/does_not_exist");
196   }
197 
198 
199   private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
200                                                               byte[]... families)
201   throws IOException {
202     HRegionInfo hRegionInfo = new HRegionInfo(tableName);
203     HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
204     for (byte[] family : families) {
205       hTableDescriptor.addFamily(new HColumnDescriptor(family));
206     }
207 
208     
209     return HRegion.createHRegion(hRegionInfo,
210         new Path(testFolder.newFolder().toURI()),
211         conf,
212         hTableDescriptor,
213         log);
214 
215   }
216 
217   private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
218     TableName tableName = TableName.valueOf(name.getMethodName());
219     return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
220   }
221 
222   private List<Pair<byte[], String>> getBlankFamilyPaths(){
223     return new ArrayList<Pair<byte[], String>>();
224   }
225 
226   private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
227     List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
228     for (byte[] family : families) {
229       familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(family)));
230     }
231     return familyPaths;
232   }
233 
234   private String createHFileForFamilies(byte[] family) throws IOException {
235     HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
236     
237     File hFileLocation = testFolder.newFile();
238     FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation));
239     try {
240       hFileFactory.withOutputStream(out);
241       hFileFactory.withFileContext(new HFileContext());
242       HFile.Writer writer = hFileFactory.create();
243       try {
244         writer.append(new KeyValue(CellUtil.createCell(randomBytes,
245             family,
246             randomBytes,
247             0l,
248             KeyValue.Type.Put.getCode(),
249             randomBytes)));
250       } finally {
251         writer.close();
252       }
253     } finally {
254       out.close();
255     }
256     return hFileLocation.getAbsoluteFile().getAbsolutePath();
257   }
258 
259   private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
260     return new WalMatcher(typeBytes);
261   }
262 
263   private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
264       byte[] familyName, List<String> storeFileNames) {
265     return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
266   }
267 
268   private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
269     private final byte[] typeBytes;
270     private final byte[] tableName;
271     private final byte[] familyName;
272     private final List<String> storeFileNames;
273 
274     public WalMatcher(byte[] typeBytes) {
275       this(typeBytes, null, null, null);
276     }
277 
278     public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
279         List<String> storeFileNames) {
280       this.typeBytes = typeBytes;
281       this.tableName = tableName;
282       this.familyName = familyName;
283       this.storeFileNames = storeFileNames;
284     }
285 
286     @Override
287     protected boolean matchesSafely(WALEdit item) {
288       assertTrue(Arrays.equals(item.getCells().get(0).getQualifier(), typeBytes));
289       BulkLoadDescriptor desc;
290       try {
291         desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
292       } catch (IOException e) {
293         return false;
294       }
295       assertNotNull(desc);
296 
297       if (tableName != null) {
298         assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
299           tableName));
300       }
301 
302       if(storeFileNames != null) {
303         int index=0;
304         StoreDescriptor store = desc.getStores(0);
305         assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
306         assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
307         assertEquals(storeFileNames.size(), store.getStoreFileCount());
308       }
309 
310       return true;
311     }
312 
313     @Override
314     public void describeTo(Description description) {
315 
316     }
317   }
318 }