1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.security;
20  
21  import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
22  import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
23  import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertSame;
26  
27  import java.io.File;
28  import java.io.IOException;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.List;
32  import java.util.Properties;
33  
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.CommonConfigurationKeys;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
40  import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
41  import org.apache.hadoop.hbase.ipc.RpcClient;
42  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
43  import org.apache.hadoop.hbase.ipc.RpcClientImpl;
44  import org.apache.hadoop.hbase.ipc.RpcServer;
45  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
46  import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
47  import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
48  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
49  import org.apache.hadoop.hbase.testclassification.SmallTests;
50  import org.apache.hadoop.minikdc.MiniKdc;
51  import org.apache.hadoop.security.UserGroupInformation;
52  import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
53  import org.junit.AfterClass;
54  import org.junit.BeforeClass;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  import org.mockito.Mockito;
58  
59  import com.google.common.collect.Lists;
60  import com.google.protobuf.BlockingRpcChannel;
61  import com.google.protobuf.BlockingService;
62  
63  @Category(SmallTests.class)
64  public class TestSecureRPC {
65  
66    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67  
68    private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
69        .getPath());
70  
71    private static MiniKdc KDC;
72  
73    private static String HOST = "localhost";
74  
75    private static String PRINCIPAL;
76  
77    @BeforeClass
78    public static void setUp() throws Exception {
79      Properties conf = MiniKdc.createConf();
80      conf.put(MiniKdc.DEBUG, true);
81      KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
82      KDC.start();
83      PRINCIPAL = "hbase/" + HOST;
84      KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
85      HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
86      HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
87    }
88  
89    @AfterClass
90    public static void tearDown() throws IOException {
91      if (KDC != null) {
92        KDC.stop();
93      }
94      TEST_UTIL.cleanupTestDir();
95    }
96  
97    @Test
98    public void testRpc() throws Exception {
99      testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
100   }
101 
102   @Test
103   public void testAsyncRpc() throws Exception {
104     testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
105   }
106 
107   private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
108       throws Exception {
109     String krbKeytab = getKeytabFileForTesting();
110     String krbPrincipal = getPrincipalForTesting();
111 
112     Configuration cnf = new Configuration();
113     cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
114     UserGroupInformation.setConfiguration(cnf);
115     UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
116     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
117     UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
118 
119     
120     assertSame(ugi, ugi2);
121     assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
122     assertEquals(krbPrincipal, ugi.getUserName());
123 
124     Configuration conf = getSecuredConfiguration();
125     conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
126     SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
127     Mockito.when(securityInfoMock.getServerPrincipal())
128         .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
129     SecurityInfo.addInfo("TestDelayedService", securityInfoMock);
130 
131     boolean delayReturnValue = false;
132     InetSocketAddress isa = new InetSocketAddress(HOST, 0);
133     TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
134     BlockingService service =
135         TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
136 
137     RpcServerInterface rpcServer =
138         new RpcServer(null, "testSecuredDelayedRpc",
139             Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
140             conf, new FifoRpcScheduler(conf, 1));
141     rpcServer.start();
142     RpcClient rpcClient =
143         RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
144     try {
145       InetSocketAddress address = rpcServer.getListenerAddress();
146       if (address == null) {
147         throw new IOException("Listener channel is closed");
148       }
149       BlockingRpcChannel channel =
150           rpcClient.createBlockingRpcChannel(
151             ServerName.valueOf(address.getHostName(), address.getPort(),
152             System.currentTimeMillis()), User.getCurrent(), 5000);
153       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
154           TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
155       List<Integer> results = new ArrayList<Integer>();
156       TestThread th1 = new TestThread(stub, true, results);
157       th1.start();
158       th1.join();
159 
160       assertEquals(0xDEADBEEF, results.get(0).intValue());
161     } finally {
162       rpcClient.close();
163       rpcServer.stop();
164     }
165   }
166 }