1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.security.token;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertNotNull;
25  import static org.junit.Assert.assertTrue;
26  
27  import java.io.IOException;
28  import java.net.InetSocketAddress;
29  import java.security.PrivilegedExceptionAction;
30  import java.util.ArrayList;
31  import java.util.List;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.concurrent.ExecutorService;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.ChoreService;
39  import org.apache.hadoop.hbase.ClusterId;
40  import org.apache.hadoop.hbase.CoordinatedStateManager;
41  import org.apache.hadoop.hbase.Coprocessor;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.ClusterConnection;
49  import org.apache.hadoop.hbase.client.Connection;
50  import org.apache.hadoop.hbase.client.ConnectionFactory;
51  import org.apache.hadoop.hbase.client.HTableInterface;
52  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
53  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
54  import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
55  import org.apache.hadoop.hbase.ipc.RpcClient;
56  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
57  import org.apache.hadoop.hbase.ipc.RpcServer;
58  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
59  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
60  import org.apache.hadoop.hbase.ipc.ServerRpcController;
61  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
62  import org.apache.hadoop.hbase.regionserver.HRegion;
63  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
64  import org.apache.hadoop.hbase.security.SecurityInfo;
65  import org.apache.hadoop.hbase.security.User;
66  import org.apache.hadoop.hbase.testclassification.MediumTests;
67  import org.apache.hadoop.hbase.util.Bytes;
68  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
69  import org.apache.hadoop.hbase.util.Sleeper;
70  import org.apache.hadoop.hbase.util.Strings;
71  import org.apache.hadoop.hbase.util.Threads;
72  import org.apache.hadoop.hbase.util.Writables;
73  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
74  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
75  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
76  import org.apache.hadoop.net.DNS;
77  import org.apache.hadoop.security.UserGroupInformation;
78  import org.apache.hadoop.security.authorize.PolicyProvider;
79  import org.apache.hadoop.security.authorize.Service;
80  import org.apache.hadoop.security.token.SecretManager;
81  import org.apache.hadoop.security.token.Token;
82  import org.apache.hadoop.security.token.TokenIdentifier;
83  import org.junit.AfterClass;
84  import org.junit.BeforeClass;
85  import org.junit.Test;
86  import org.junit.experimental.categories.Category;
87  
88  import com.google.protobuf.BlockingRpcChannel;
89  import com.google.protobuf.BlockingService;
90  import com.google.protobuf.RpcController;
91  import com.google.protobuf.ServiceException;
92  
93  
94  
95  
96  @Category(MediumTests.class)
97  public class TestTokenAuthentication {
98    static {
99      
100     
101     System.setProperty("java.security.krb5.realm", "hbase");
102     System.setProperty("java.security.krb5.kdc", "blah");
103   }
104   private static Log LOG = LogFactory.getLog(TestTokenAuthentication.class);
105 
106   public interface AuthenticationServiceSecurityInfo {}
107 
108   
109 
110 
111   private static class TokenServer extends TokenProvider
112   implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
113     private static Log LOG = LogFactory.getLog(TokenServer.class);
114     private Configuration conf;
115     private RpcServerInterface rpcServer;
116     private InetSocketAddress isa;
117     private ZooKeeperWatcher zookeeper;
118     private Sleeper sleeper;
119     private boolean started = false;
120     private boolean aborted = false;
121     private boolean stopped = false;
122     private long startcode;
123 
124     public TokenServer(Configuration conf) throws IOException {
125       this.conf = conf;
126       this.startcode = EnvironmentEdgeManager.currentTime();
127       
128       String hostname =
129         Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
130       int port = 0;
131       
132       InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
133       if (initialIsa.getAddress() == null) {
134         throw new IllegalArgumentException("Failed resolve of " + initialIsa);
135       }
136       final List<BlockingServiceAndInterface> sai =
137         new ArrayList<BlockingServiceAndInterface>(1);
138       BlockingService service =
139         AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
140       sai.add(new BlockingServiceAndInterface(service,
141         AuthenticationProtos.AuthenticationService.BlockingInterface.class));
142       this.rpcServer =
143         new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
144       InetSocketAddress address = rpcServer.getListenerAddress();
145       if (address == null) {
146         throw new IOException("Listener channel is closed");
147       }
148       this.isa = address;
149       this.sleeper = new Sleeper(1000, this);
150     }
151 
152     @Override
153     public Configuration getConfiguration() {
154       return conf;
155     }
156 
157     @Override
158     public ClusterConnection getConnection() {
159       return null;
160     }
161 
162     @Override
163     public MetaTableLocator getMetaTableLocator() {
164       return null;
165     }
166 
167     @Override
168     public ZooKeeperWatcher getZooKeeper() {
169       return zookeeper;
170     }
171 
172     @Override
173     public CoordinatedStateManager getCoordinatedStateManager() {
174       return null;
175     }
176 
177     @Override
178     public boolean isAborted() {
179       return aborted;
180     }
181 
182     @Override
183     public ServerName getServerName() {
184       return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
185     }
186 
187     @Override
188     public void abort(String reason, Throwable error) {
189       LOG.fatal("Aborting on: "+reason, error);
190       this.aborted = true;
191       this.stopped = true;
192       sleeper.skipSleepCycle();
193     }
194 
195     private void initialize() throws IOException {
196       
197       Configuration zkConf = new Configuration(conf);
198       zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
199       this.zookeeper = new ZooKeeperWatcher(zkConf, TokenServer.class.getSimpleName(),
200           this, true);
201       this.rpcServer.start();
202 
203       
204       final RegionServerServices mockServices = TEST_UTIL.createMockRegionServerService(rpcServer);
205 
206       
207       super.start(new RegionCoprocessorEnvironment() {
208         @Override
209         public HRegion getRegion() { return null; }
210 
211         @Override
212         public RegionServerServices getRegionServerServices() {
213           return mockServices;
214         }
215 
216         @Override
217         public ConcurrentMap<String, Object> getSharedData() { return null; }
218 
219         @Override
220         public int getVersion() { return 0; }
221 
222         @Override
223         public String getHBaseVersion() { return null; }
224 
225         @Override
226         public Coprocessor getInstance() { return null; }
227 
228         @Override
229         public int getPriority() { return 0; }
230 
231         @Override
232         public int getLoadSequence() { return 0; }
233 
234         @Override
235         public Configuration getConfiguration() { return conf; }
236 
237         @Override
238         public HTableInterface getTable(TableName tableName) throws IOException
239           { return null; }
240 
241         @Override
242         public HTableInterface getTable(TableName tableName, ExecutorService service)
243             throws IOException {
244           return null;
245         }
246 
247         @Override
248         public ClassLoader getClassLoader() {
249           return Thread.currentThread().getContextClassLoader();
250         }
251 
252         @Override
253         public HRegionInfo getRegionInfo() {
254           return null;
255         }
256       });
257 
258       started = true;
259     }
260 
261     public void run() {
262       try {
263         initialize();
264         while (!stopped) {
265           this.sleeper.sleep();
266         }
267       } catch (Exception e) {
268         abort(e.getMessage(), e);
269       }
270       this.rpcServer.stop();
271     }
272 
273     public boolean isStarted() {
274       return started;
275     }
276 
277     @Override
278     public void stop(String reason) {
279       LOG.info("Stopping due to: "+reason);
280       this.stopped = true;
281       sleeper.skipSleepCycle();
282     }
283 
284     @Override
285     public boolean isStopped() {
286       return stopped;
287     }
288 
289     public InetSocketAddress getAddress() {
290       return isa;
291     }
292 
293     public SecretManager<? extends TokenIdentifier> getSecretManager() {
294       return ((RpcServer)rpcServer).getSecretManager();
295     }
296 
297     @Override
298     public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
299         RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
300       throws ServiceException {
301       LOG.debug("Authentication token request from " + RpcServer.getRequestUserName());
302       
303       ServerRpcController serverController = new ServerRpcController();
304       BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
305           new BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>();
306       getAuthenticationToken(serverController, request, callback);
307       try {
308         serverController.checkFailed();
309         return callback.get();
310       } catch (IOException ioe) {
311         throw new ServiceException(ioe);
312       }
313     }
314 
315     @Override
316     public AuthenticationProtos.WhoAmIResponse whoAmI(
317         RpcController controller, AuthenticationProtos.WhoAmIRequest request)
318       throws ServiceException {
319       LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName());
320       
321       ServerRpcController serverController = new ServerRpcController();
322       BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
323           new BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse>();
324       whoAmI(serverController, request, callback);
325       try {
326         serverController.checkFailed();
327         return callback.get();
328       } catch (IOException ioe) {
329         throw new ServiceException(ioe);
330       }
331     }
332 
333     @Override
334     public ChoreService getChoreService() {
335       return null;
336     }
337   }
338 
339   private static HBaseTestingUtility TEST_UTIL;
340   private static TokenServer server;
341   private static Thread serverThread;
342   private static AuthenticationTokenSecretManager secretManager;
343   private static ClusterId clusterId = new ClusterId();
344 
345   @BeforeClass
346   public static void setupBeforeClass() throws Exception {
347     TEST_UTIL = new HBaseTestingUtility();
348     TEST_UTIL.startMiniZKCluster();
349     
350     SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
351       new SecurityInfo("hbase.test.kerberos.principal",
352         AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
353     
354     Configuration conf = TEST_UTIL.getConfiguration();
355     conf.set("hadoop.security.authentication", "kerberos");
356     conf.set("hbase.security.authentication", "kerberos");
357     conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
358     server = new TokenServer(conf);
359     serverThread = new Thread(server);
360     Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
361     
362     while (!server.isStarted() && !server.isStopped()) {
363       Thread.sleep(10);
364     }
365     server.rpcServer.refreshAuthManager(new PolicyProvider() {
366       @Override
367       public Service[] getServices() {
368         return new Service [] {
369           new Service("security.client.protocol.acl",
370             AuthenticationProtos.AuthenticationService.BlockingInterface.class)};
371       }
372     });
373     ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
374     secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
375     while(secretManager.getCurrentKey() == null) {
376       Thread.sleep(1);
377     }
378   }
379 
380   @AfterClass
381   public static void tearDownAfterClass() throws Exception {
382     server.stop("Test complete");
383     Threads.shutdown(serverThread);
384     TEST_UTIL.shutdownMiniZKCluster();
385   }
386 
387   @Test
388   public void testTokenCreation() throws Exception {
389     Token<AuthenticationTokenIdentifier> token =
390         secretManager.generateToken("testuser");
391 
392     AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
393     Writables.getWritable(token.getIdentifier(), ident);
394     assertEquals("Token username should match", "testuser",
395         ident.getUsername());
396     byte[] passwd = secretManager.retrievePassword(ident);
397     assertTrue("Token password and password from secret manager should match",
398         Bytes.equals(token.getPassword(), passwd));
399   }
400 
401   @Test
402   public void testTokenAuthentication() throws Exception {
403     UserGroupInformation testuser =
404         UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
405 
406     testuser.setAuthenticationMethod(
407         UserGroupInformation.AuthenticationMethod.TOKEN);
408     final Configuration conf = TEST_UTIL.getConfiguration();
409     UserGroupInformation.setConfiguration(conf);
410     Token<AuthenticationTokenIdentifier> token =
411         secretManager.generateToken("testuser");
412     LOG.debug("Got token: " + token.toString());
413     testuser.addToken(token);
414 
415     
416     testuser.doAs(new PrivilegedExceptionAction<Object>() {
417       public Object run() throws Exception {
418         Configuration c = server.getConfiguration();
419         RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
420         ServerName sn =
421             ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
422                 System.currentTimeMillis());
423         try {
424           BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
425               User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
426           AuthenticationProtos.AuthenticationService.BlockingInterface stub =
427               AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
428           AuthenticationProtos.WhoAmIResponse response =
429               stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
430           String myname = response.getUsername();
431           assertEquals("testuser", myname);
432           String authMethod = response.getAuthMethod();
433           assertEquals("TOKEN", authMethod);
434         } finally {
435           rpcClient.close();
436         }
437         return null;
438       }
439     });
440   }
441 
442   @Test
443   public void testUseExistingToken() throws Exception {
444     User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
445         new String[]{"testgroup"});
446     Token<AuthenticationTokenIdentifier> token =
447         secretManager.generateToken(user.getName());
448     assertNotNull(token);
449     user.addToken(token);
450 
451     
452     Token<AuthenticationTokenIdentifier> firstToken =
453         new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
454     assertNotNull(firstToken);
455     assertEquals(token, firstToken);
456 
457     Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
458     try {
459       assertFalse(TokenUtil.addTokenIfMissing(conn, user));
460       
461       Token<AuthenticationTokenIdentifier> secondToken =
462           new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
463       assertEquals(firstToken, secondToken);
464     } finally {
465       conn.close();
466     }
467   }
468 }