1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.security.PrivilegedAction;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import javax.security.auth.callback.Callback;
35  import javax.security.auth.callback.UnsupportedCallbackException;
36  import javax.security.sasl.AuthorizeCallback;
37  import javax.security.sasl.Sasl;
38  import javax.security.sasl.SaslServer;
39  
40  import org.apache.commons.cli.CommandLine;
41  import org.apache.commons.cli.CommandLineParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionGroup;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.commons.cli.PosixParser;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54  import org.apache.hadoop.hbase.filter.ParseFilter;
55  import org.apache.hadoop.hbase.http.InfoServer;
56  import org.apache.hadoop.hbase.security.SecurityUtil;
57  import org.apache.hadoop.hbase.security.UserProvider;
58  import org.apache.hadoop.hbase.thrift.CallQueue;
59  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
60  import org.apache.hadoop.hbase.thrift.ThriftMetrics;
61  import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
62  import org.apache.hadoop.hbase.util.Strings;
63  import org.apache.hadoop.net.DNS;
64  import org.apache.hadoop.security.UserGroupInformation;
65  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
66  import org.apache.hadoop.util.GenericOptionsParser;
67  import org.apache.thrift.TException;
68  import org.apache.thrift.TProcessor;
69  import org.apache.thrift.protocol.TBinaryProtocol;
70  import org.apache.thrift.protocol.TCompactProtocol;
71  import org.apache.thrift.protocol.TProtocol;
72  import org.apache.thrift.protocol.TProtocolFactory;
73  import org.apache.thrift.server.THsHaServer;
74  import org.apache.thrift.server.TNonblockingServer;
75  import org.apache.thrift.server.TServer;
76  import org.apache.thrift.server.TThreadPoolServer;
77  import org.apache.thrift.transport.TFramedTransport;
78  import org.apache.thrift.transport.TNonblockingServerSocket;
79  import org.apache.thrift.transport.TNonblockingServerTransport;
80  import org.apache.thrift.transport.TSaslServerTransport;
81  import org.apache.thrift.transport.TServerSocket;
82  import org.apache.thrift.transport.TServerTransport;
83  import org.apache.thrift.transport.TTransportException;
84  import org.apache.thrift.transport.TTransportFactory;
85  
86  import com.google.common.util.concurrent.ThreadFactoryBuilder;
87  
88  
89  
90  
91  
92  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
93  @SuppressWarnings({ "rawtypes", "unchecked" })
94  public class ThriftServer {
95    private static final Log log = LogFactory.getLog(ThriftServer.class);
96  
97    
98  
99  
100 
101 
102 
103 
104 
105 
106   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
107 
108   public static final int DEFAULT_LISTEN_PORT = 9090;
109 
110 
111   public ThriftServer() {
112   }
113 
114   private static void printUsage() {
115     HelpFormatter formatter = new HelpFormatter();
116     formatter.printHelp("Thrift", null, getOptions(),
117         "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
118             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
119             " send a kill signal to the thrift server pid",
120         true);
121   }
122 
123   private static Options getOptions() {
124     Options options = new Options();
125     options.addOption("b", "bind", true,
126         "Address to bind the Thrift server to. [default: 0.0.0.0]");
127     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
128     options.addOption("f", "framed", false, "Use framed transport");
129     options.addOption("c", "compact", false, "Use the compact protocol");
130     options.addOption("h", "help", false, "Print help information");
131     options.addOption(null, "infoport", true, "Port for web UI");
132 
133     OptionGroup servers = new OptionGroup();
134     servers.addOption(
135         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
136     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
137     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
138     options.addOptionGroup(servers);
139     return options;
140   }
141 
142   private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
143       throws ParseException, IOException {
144     GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
145     String[] remainingArgs = genParser.getRemainingArgs();
146     CommandLineParser parser = new PosixParser();
147     return parser.parse(options, remainingArgs);
148   }
149 
150   private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
151     if (isCompact) {
152       log.debug("Using compact protocol");
153       return new TCompactProtocol.Factory();
154     } else {
155       log.debug("Using binary protocol");
156       return new TBinaryProtocol.Factory();
157     }
158   }
159 
160   private static TTransportFactory getTTransportFactory(
161       String qop, String name, String host, boolean framed, int frameSize) {
162     if (framed) {
163       if (qop != null) {
164         throw new RuntimeException("Thrift server authentication"
165           + " doesn't work with framed transport yet");
166       }
167       log.debug("Using framed transport");
168       return new TFramedTransport.Factory(frameSize);
169     } else if (qop == null) {
170       return new TTransportFactory();
171     } else {
172       Map<String, String> saslProperties = new HashMap<String, String>();
173       saslProperties.put(Sasl.QOP, qop);
174       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
175       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
176         new SaslGssCallbackHandler() {
177           @Override
178           public void handle(Callback[] callbacks)
179               throws UnsupportedCallbackException {
180             AuthorizeCallback ac = null;
181             for (Callback callback : callbacks) {
182               if (callback instanceof AuthorizeCallback) {
183                 ac = (AuthorizeCallback) callback;
184               } else {
185                 throw new UnsupportedCallbackException(callback,
186                     "Unrecognized SASL GSSAPI Callback");
187               }
188             }
189             if (ac != null) {
190               String authid = ac.getAuthenticationID();
191               String authzid = ac.getAuthorizationID();
192               if (!authid.equals(authzid)) {
193                 ac.setAuthorized(false);
194               } else {
195                 ac.setAuthorized(true);
196                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
197                 log.info("Effective user: " + userName);
198                 ac.setAuthorizedID(userName);
199               }
200             }
201           }
202         });
203       return saslFactory;
204     }
205   }
206 
207   
208 
209 
210   private static InetSocketAddress bindToPort(String bindValue, int listenPort)
211       throws UnknownHostException {
212     try {
213       if (bindValue == null) {
214         return new InetSocketAddress(listenPort);
215       } else {
216         return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
217       }
218     } catch (UnknownHostException e) {
219       throw new RuntimeException("Could not bind to provided ip address", e);
220     }
221   }
222 
223   private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
224       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
225     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
226     log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
227     TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
228     serverArgs.processor(processor);
229     serverArgs.transportFactory(transportFactory);
230     serverArgs.protocolFactory(protocolFactory);
231     return new TNonblockingServer(serverArgs);
232   }
233 
234   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
235       TProcessor processor, TTransportFactory transportFactory,
236       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
237       throws TTransportException {
238     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
239     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
240     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
241     ExecutorService executorService = createExecutor(
242         serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads(), metrics);
243     serverArgs.executorService(executorService);
244     serverArgs.processor(processor);
245     serverArgs.transportFactory(transportFactory);
246     serverArgs.protocolFactory(protocolFactory);
247     return new THsHaServer(serverArgs);
248   }
249 
250   private static ExecutorService createExecutor(
251       int minWorkers, int maxWorkers, ThriftMetrics metrics) {
252     CallQueue callQueue = new CallQueue(
253         new LinkedBlockingQueue<Call>(), metrics);
254     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
255     tfb.setDaemon(true);
256     tfb.setNameFormat("thrift2-worker-%d");
257     return new ThreadPoolExecutor(minWorkers, maxWorkers,
258             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
259   }
260 
261   private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
262       TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
263     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
264     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
265     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
266     serverArgs.processor(processor);
267     serverArgs.transportFactory(transportFactory);
268     serverArgs.protocolFactory(protocolFactory);
269     return new TThreadPoolServer(serverArgs);
270   }
271 
272   
273 
274 
275 
276 
277   protected static void registerFilters(Configuration conf) {
278     String[] filters = conf.getStrings("hbase.thrift.filters");
279     if(filters != null) {
280       for(String filterClass: filters) {
281         String[] filterPart = filterClass.split(":");
282         if(filterPart.length != 2) {
283           log.warn("Invalid filter specification " + filterClass + " - skipping");
284         } else {
285           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
286         }
287       }
288     }
289   }
290 
291   
292 
293 
294 
295 
296   public static void main(String[] args) throws Exception {
297     TServer server = null;
298     Options options = getOptions();
299     Configuration conf = HBaseConfiguration.create();
300     CommandLine cmd = parseArguments(conf, options, args);
301 
302     
303 
304 
305 
306     List<?> argList = cmd.getArgList();
307     if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
308       printUsage();
309       System.exit(1);
310     }
311 
312     
313     String bindAddress;
314     if (cmd.hasOption("bind")) {
315       bindAddress = cmd.getOptionValue("bind");
316       conf.set("hbase.thrift.info.bindAddress", bindAddress);
317     } else {
318       bindAddress = conf.get("hbase.thrift.info.bindAddress");
319     }
320 
321     
322     int listenPort = 0;
323     try {
324       if (cmd.hasOption("port")) {
325         listenPort = Integer.parseInt(cmd.getOptionValue("port"));
326       } else {
327         listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
328       }
329     } catch (NumberFormatException e) {
330       throw new RuntimeException("Could not parse the value provided for the port option", e);
331     }
332 
333     
334     
335     String host = null;
336     String name = null;
337 
338     UserProvider userProvider = UserProvider.instantiate(conf);
339     
340     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
341       && userProvider.isHBaseSecurityEnabled();
342     if (securityEnabled) {
343       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
344         conf.get("hbase.thrift.dns.interface", "default"),
345         conf.get("hbase.thrift.dns.nameserver", "default")));
346       userProvider.login("hbase.thrift.keytab.file",
347         "hbase.thrift.kerberos.principal", host);
348     }
349 
350     UserGroupInformation realUser = userProvider.getCurrent().getUGI();
351     String qop = conf.get(THRIFT_QOP_KEY);
352     if (qop != null) {
353       if (!qop.equals("auth") && !qop.equals("auth-int")
354           && !qop.equals("auth-conf")) {
355         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
356           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
357       }
358       if (!securityEnabled) {
359         throw new IOException("Thrift server must"
360           + " run in secure mode to support authentication");
361       }
362       
363       name = SecurityUtil.getUserFromPrincipal(
364         conf.get("hbase.thrift.kerberos.principal"));
365     }
366 
367     boolean nonblocking = cmd.hasOption("nonblocking");
368     boolean hsha = cmd.hasOption("hsha");
369 
370     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
371 
372     String implType = "threadpool";
373     if (nonblocking) {
374       implType = "nonblocking";
375     } else if (hsha) {
376       implType = "hsha";
377     }
378 
379     conf.set("hbase.regionserver.thrift.server.type", implType);
380     conf.setInt("hbase.regionserver.thrift.port", listenPort);
381     registerFilters(conf);
382 
383     
384     boolean compact = cmd.hasOption("compact") ||
385         conf.getBoolean("hbase.regionserver.thrift.compact", false);
386     TProtocolFactory protocolFactory = getTProtocolFactory(compact);
387     final ThriftHBaseServiceHandler hbaseHandler =
388       new ThriftHBaseServiceHandler(conf, userProvider);
389     THBaseService.Iface handler =
390       ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
391     final THBaseService.Processor p = new THBaseService.Processor(handler);
392     conf.setBoolean("hbase.regionserver.thrift.compact", compact);
393     TProcessor processor = p;
394 
395     boolean framed = cmd.hasOption("framed") ||
396         conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
397     TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
398         conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
399     InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
400     conf.setBoolean("hbase.regionserver.thrift.framed", framed);
401     if (qop != null) {
402       
403       processor = new TProcessor() {
404         @Override
405         public boolean process(TProtocol inProt,
406             TProtocol outProt) throws TException {
407           TSaslServerTransport saslServerTransport =
408             (TSaslServerTransport)inProt.getTransport();
409           SaslServer saslServer = saslServerTransport.getSaslServer();
410           String principal = saslServer.getAuthorizationID();
411           hbaseHandler.setEffectiveUser(principal);
412           return p.process(inProt, outProt);
413         }
414       };
415     }
416 
417     
418     try {
419       if (cmd.hasOption("infoport")) {
420         String val = cmd.getOptionValue("infoport");
421         conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
422         log.debug("Web UI port set to " + val);
423       }
424     } catch (NumberFormatException e) {
425       log.error("Could not parse the value provided for the infoport option", e);
426       printUsage();
427       System.exit(1);
428     }
429 
430     
431     int port = conf.getInt("hbase.thrift.info.port", 9095);
432     if (port >= 0) {
433       conf.setLong("startcode", System.currentTimeMillis());
434       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
435       InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
436       infoServer.setAttribute("hbase.conf", conf);
437       infoServer.start();
438     }
439 
440     if (nonblocking) {
441       server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
442     } else if (hsha) {
443       server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
444     } else {
445       server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
446     }
447 
448     final TServer tserver = server;
449     realUser.doAs(
450       new PrivilegedAction<Object>() {
451         @Override
452         public Object run() {
453           tserver.serve();
454           return null;
455         }
456       });
457   }
458 }