1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.util.Map;
24  
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.conf.Configured;
31  import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
32  import org.apache.hadoop.hbase.util.Pair;
33  import org.apache.hadoop.hbase.util.RetryCounter;
34  import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
35  import org.apache.hadoop.hbase.util.RetryCounterFactory;
36  import org.apache.hadoop.util.Shell;
37  
38  
39  
40  
41  
42  
43  
44  
45  @InterfaceAudience.Private
46  public class HBaseClusterManager extends Configured implements ClusterManager {
47    private static final String SIGKILL = "SIGKILL";
48    private static final String SIGSTOP = "SIGSTOP";
49    private static final String SIGCONT = "SIGCONT";
50  
51    protected static final Log LOG = LogFactory.getLog(HBaseClusterManager.class);
52    private String sshUserName;
53    private String sshOptions;
54  
55    
56  
57  
58  
59    private static final String DEFAULT_TUNNEL_CMD = "/usr/bin/ssh %1$s %2$s%3$s%4$s \"%5$s\"";
60    private String tunnelCmd;
61  
62    private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
63    private static final int DEFAULT_RETRY_ATTEMPTS = 5;
64  
65    private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
66    private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
67  
68    protected RetryCounterFactory retryCounterFactory;
69  
70    @Override
71    public void setConf(Configuration conf) {
72      super.setConf(conf);
73      if (conf == null) {
74        
75        return;
76      }
77      sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
78      String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
79      sshOptions = System.getenv("HBASE_SSH_OPTS");
80      if (!extraSshOptions.isEmpty()) {
81        sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
82      }
83      sshOptions = (sshOptions == null) ? "" : sshOptions;
84      tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
85      
86      if ((sshUserName != null && sshUserName.length() > 0) ||
87          (sshOptions != null && sshOptions.length() > 0)) {
88        LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
89      }
90  
91      this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
92          .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
93          .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
94    }
95  
96    
97  
98  
99    protected class RemoteShell extends Shell.ShellCommandExecutor {
100     private String hostname;
101 
102     public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
103         long timeout) {
104       super(execString, dir, env, timeout);
105       this.hostname = hostname;
106     }
107 
108     public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
109       super(execString, dir, env);
110       this.hostname = hostname;
111     }
112 
113     public RemoteShell(String hostname, String[] execString, File dir) {
114       super(execString, dir);
115       this.hostname = hostname;
116     }
117 
118     public RemoteShell(String hostname, String[] execString) {
119       super(execString);
120       this.hostname = hostname;
121     }
122 
123     @Override
124     public String[] getExecString() {
125       String at = sshUserName.isEmpty() ? "" : "@";
126       String remoteCmd = StringUtils.join(super.getExecString(), " ");
127       String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd);
128       LOG.info("Executing full command [" + cmd + "]");
129       return new String[] { "/usr/bin/env", "bash", "-c", cmd };
130     }
131 
132     @Override
133     public void execute() throws IOException {
134       super.execute();
135     }
136   }
137 
138   
139 
140 
141 
142 
143   static abstract class CommandProvider {
144 
145     enum Operation {
146       START, STOP, RESTART
147     }
148 
149     public abstract String getCommand(ServiceType service, Operation op);
150 
151     public String isRunningCommand(ServiceType service) {
152       return findPidCommand(service);
153     }
154 
155     protected String findPidCommand(ServiceType service) {
156       return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
157           service);
158     }
159 
160     public String signalCommand(ServiceType service, String signal) {
161       return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
162     }
163   }
164 
165   
166 
167 
168   static class HBaseShellCommandProvider extends CommandProvider {
169     private final String hbaseHome;
170     private final String confDir;
171 
172     HBaseShellCommandProvider(Configuration conf) {
173       hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
174         System.getenv("HBASE_HOME"));
175       String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
176         System.getenv("HBASE_CONF_DIR"));
177       if (tmp != null) {
178         confDir = String.format("--config %s", tmp);
179       } else {
180         confDir = "";
181       }
182     }
183 
184     @Override
185     public String getCommand(ServiceType service, Operation op) {
186       return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
187           op.toString().toLowerCase(), service);
188     }
189   }
190 
191   public HBaseClusterManager() {
192   }
193 
194   protected CommandProvider getCommandProvider(ServiceType service) {
195     
196     
197     return new HBaseShellCommandProvider(getConf());
198   }
199 
200   
201 
202 
203 
204 
205   private Pair<Integer, String> exec(String hostname, String... cmd) throws IOException {
206     LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
207 
208     RemoteShell shell = new RemoteShell(hostname, cmd);
209     try {
210       shell.execute();
211     } catch (Shell.ExitCodeException ex) {
212       
213       String output = shell.getOutput();
214       
215       throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
216         + ", stdout: " + output);
217     }
218 
219     LOG.info("Executed remote command, exit code:" + shell.getExitCode()
220         + " , output:" + shell.getOutput());
221 
222     return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
223   }
224 
225   private Pair<Integer, String> execWithRetries(String hostname, String... cmd)
226       throws IOException {
227     RetryCounter retryCounter = retryCounterFactory.create();
228     while (true) {
229       try {
230         return exec(hostname, cmd);
231       } catch (IOException e) {
232         retryOrThrow(retryCounter, e, hostname, cmd);
233       }
234       try {
235         retryCounter.sleepUntilNextRetry();
236       } catch (InterruptedException ex) {
237         
238         LOG.warn("Sleep Interrupted:" + ex);
239       }
240     }
241   }
242 
243   private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
244       String hostname, String[] cmd) throws E {
245     if (retryCounter.shouldRetry()) {
246       LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
247         + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
248           + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
249       return;
250     }
251     throw ex;
252   }
253 
254   private void exec(String hostname, ServiceType service, Operation op) throws IOException {
255     execWithRetries(hostname, getCommandProvider(service).getCommand(service, op));
256   }
257 
258   @Override
259   public void start(ServiceType service, String hostname, int port) throws IOException {
260     exec(hostname, service, Operation.START);
261   }
262 
263   @Override
264   public void stop(ServiceType service, String hostname, int port) throws IOException {
265     exec(hostname, service, Operation.STOP);
266   }
267 
268   @Override
269   public void restart(ServiceType service, String hostname, int port) throws IOException {
270     exec(hostname, service, Operation.RESTART);
271   }
272 
273   public void signal(ServiceType service, String signal, String hostname) throws IOException {
274     execWithRetries(hostname, getCommandProvider(service).signalCommand(service, signal));
275   }
276 
277   @Override
278   public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
279     String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service))
280         .getSecond();
281     return ret.length() > 0;
282   }
283 
284   @Override
285   public void kill(ServiceType service, String hostname, int port) throws IOException {
286     signal(service, SIGKILL, hostname);
287   }
288 
289   @Override
290   public void suspend(ServiceType service, String hostname, int port) throws IOException {
291     signal(service, SIGSTOP, hostname);
292   }
293 
294   @Override
295   public void resume(ServiceType service, String hostname, int port) throws IOException {
296     signal(service, SIGCONT, hostname);
297   }
298 }