1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.security.token;
20  
21  import java.io.IOException;
22  import java.lang.reflect.UndeclaredThrowableException;
23  import java.security.PrivilegedExceptionAction;
24  
25  import com.google.protobuf.ServiceException;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.Table;
36  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
39  import org.apache.hadoop.hbase.security.User;
40  import org.apache.hadoop.hbase.security.UserProvider;
41  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43  import org.apache.hadoop.io.Text;
44  import org.apache.hadoop.mapred.JobConf;
45  import org.apache.hadoop.mapreduce.Job;
46  import org.apache.hadoop.security.UserGroupInformation;
47  import org.apache.hadoop.security.token.Token;
48  import org.apache.zookeeper.KeeperException;
49  
50  
51  
52  
53  @InterfaceAudience.Public
54  @InterfaceStability.Evolving
55  public class TokenUtil {
56    
57    private static Log LOG = LogFactory.getLog(TokenUtil.class);
58  
59    
60  
61  
62  
63  
64  
65    @Deprecated
66    public static Token<AuthenticationTokenIdentifier> obtainToken(
67        Configuration conf) throws IOException {
68      try (Connection connection = ConnectionFactory.createConnection(conf)) {
69        return obtainToken(connection);
70      }
71    }
72  
73    
74  
75  
76  
77  
78    public static Token<AuthenticationTokenIdentifier> obtainToken(
79        Connection conn) throws IOException {
80      Table meta = null;
81      try {
82        meta = conn.getTable(TableName.META_TABLE_NAME);
83        CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
84        AuthenticationProtos.AuthenticationService.BlockingInterface service =
85            AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
86        AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
87            AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
88  
89        return ProtobufUtil.toToken(response.getToken());
90      } catch (ServiceException se) {
91        ProtobufUtil.toIOException(se);
92      } finally {
93        if (meta != null) {
94          meta.close();
95        }
96      }
97      
98      return null;
99    }
100 
101   
102 
103 
104 
105 
106   public static Token<AuthenticationTokenIdentifier> obtainToken(
107       final Connection conn, User user) throws IOException, InterruptedException {
108     return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
109       @Override
110       public Token<AuthenticationTokenIdentifier> run() throws Exception {
111         return obtainToken(conn);
112       }
113     });
114   }
115 
116 
117   private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
118       throws IOException {
119     return token.getService() != null
120         ? token.getService() : new Text("default");
121   }
122 
123   
124 
125 
126 
127 
128 
129 
130 
131 
132   @Deprecated
133   public static void obtainAndCacheToken(final Configuration conf,
134                                          UserGroupInformation user)
135       throws IOException, InterruptedException {
136     Connection conn = ConnectionFactory.createConnection(conf);
137     try {
138       UserProvider userProvider = UserProvider.instantiate(conf);
139       obtainAndCacheToken(conn, userProvider.create(user));
140     } finally {
141       conn.close();
142     }
143   }
144 
145   
146 
147 
148 
149 
150 
151 
152 
153   public static void obtainAndCacheToken(final Connection conn,
154       User user)
155       throws IOException, InterruptedException {
156     try {
157       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
158 
159       if (token == null) {
160         throw new IOException("No token returned for user " + user.getName());
161       }
162       if (LOG.isDebugEnabled()) {
163         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
164             user.getName());
165       }
166       user.addToken(token);
167     } catch (IOException ioe) {
168       throw ioe;
169     } catch (InterruptedException ie) {
170       throw ie;
171     } catch (RuntimeException re) {
172       throw re;
173     } catch (Exception e) {
174       throw new UndeclaredThrowableException(e,
175           "Unexpected exception obtaining token for user " + user.getName());
176     }
177   }
178 
179   
180 
181 
182 
183 
184 
185 
186 
187 
188 
189   @Deprecated
190   public static void obtainTokenForJob(final Configuration conf,
191                                        UserGroupInformation user, Job job)
192       throws IOException, InterruptedException {
193     Connection conn = ConnectionFactory.createConnection(conf);
194     try {
195       UserProvider userProvider = UserProvider.instantiate(conf);
196       obtainTokenForJob(conn, userProvider.create(user), job);
197     } finally {
198       conn.close();
199     }
200   }
201 
202   
203 
204 
205 
206 
207 
208 
209 
210 
211   public static void obtainTokenForJob(final Connection conn,
212       User user, Job job)
213       throws IOException, InterruptedException {
214     try {
215       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
216 
217       if (token == null) {
218         throw new IOException("No token returned for user " + user.getName());
219       }
220       Text clusterId = getClusterId(token);
221       if (LOG.isDebugEnabled()) {
222         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
223             user.getName() + " on cluster " + clusterId.toString());
224       }
225       job.getCredentials().addToken(clusterId, token);
226     } catch (IOException ioe) {
227       throw ioe;
228     } catch (InterruptedException ie) {
229       throw ie;
230     } catch (RuntimeException re) {
231       throw re;
232     } catch (Exception e) {
233       throw new UndeclaredThrowableException(e,
234           "Unexpected exception obtaining token for user " + user.getName());
235     }
236   }
237 
238   
239 
240 
241 
242 
243 
244 
245 
246 
247   @Deprecated
248   public static void obtainTokenForJob(final JobConf job,
249                                        UserGroupInformation user)
250       throws IOException, InterruptedException {
251     Connection conn = ConnectionFactory.createConnection(job);
252     try {
253       UserProvider userProvider = UserProvider.instantiate(job);
254       obtainTokenForJob(conn, job, userProvider.create(user));
255     } finally {
256       conn.close();
257     }
258   }
259 
260   
261 
262 
263 
264 
265 
266 
267 
268 
269   public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
270       throws IOException, InterruptedException {
271     try {
272       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
273 
274       if (token == null) {
275         throw new IOException("No token returned for user " + user.getName());
276       }
277       Text clusterId = getClusterId(token);
278       if (LOG.isDebugEnabled()) {
279         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
280             user.getName() + " on cluster " + clusterId.toString());
281       }
282       job.getCredentials().addToken(clusterId, token);
283     } catch (IOException ioe) {
284       throw ioe;
285     } catch (InterruptedException ie) {
286       throw ie;
287     } catch (RuntimeException re) {
288       throw re;
289     } catch (Exception e) {
290       throw new UndeclaredThrowableException(e,
291           "Unexpected exception obtaining token for user "+user.getName());
292     }
293   }
294 
295   
296 
297 
298 
299 
300 
301 
302 
303 
304 
305   public static void addTokenForJob(final Connection conn, final JobConf job, User user)
306       throws IOException, InterruptedException {
307 
308     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
309     if (token == null) {
310       token = obtainToken(conn, user);
311     }
312     job.getCredentials().addToken(token.getService(), token);
313   }
314 
315   
316 
317 
318 
319 
320 
321 
322 
323 
324 
325   public static void addTokenForJob(final Connection conn, User user, Job job)
326       throws IOException, InterruptedException {
327     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
328     if (token == null) {
329       token = obtainToken(conn, user);
330     }
331     job.getCredentials().addToken(token.getService(), token);
332   }
333 
334   
335 
336 
337 
338 
339 
340 
341 
342 
343 
344   public static boolean addTokenIfMissing(Connection conn, User user)
345       throws IOException, InterruptedException {
346     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
347     if (token == null) {
348       token = obtainToken(conn, user);
349       user.getUGI().addToken(token.getService(), token);
350       return true;
351     }
352     return false;
353   }
354 
355   
356 
357 
358 
359   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
360       throws IOException, InterruptedException {
361     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
362     try {
363       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
364       if (clusterId == null) {
365         throw new IOException("Failed to get cluster ID");
366       }
367       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
368     } catch (KeeperException e) {
369       throw new IOException(e);
370     } finally {
371       zkw.close();
372     }
373   }
374 }