1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.SortedSet;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  import java.util.concurrent.ConcurrentSkipListSet;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.classification.InterfaceStability;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Abortable;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.HTableInterface;
50  import org.apache.hadoop.hbase.client.HTableWrapper;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
53  import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
54  import org.apache.hadoop.hbase.util.VersionInfo;
55  
56  
57  
58  
59  
60  
61  
62  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
63  @InterfaceStability.Evolving
64  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
65    public static final String REGION_COPROCESSOR_CONF_KEY =
66        "hbase.coprocessor.region.classes";
67    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
68        "hbase.coprocessor.regionserver.classes";
69    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
70        "hbase.coprocessor.user.region.classes";
71    public static final String MASTER_COPROCESSOR_CONF_KEY =
72        "hbase.coprocessor.master.classes";
73    public static final String WAL_COPROCESSOR_CONF_KEY =
74      "hbase.coprocessor.wal.classes";
75    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
76    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
77    public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
78    public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
79    public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
80      "hbase.coprocessor.user.enabled";
81    public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
82  
83    private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
84    protected Abortable abortable;
85    
86    protected SortedSet<E> coprocessors =
87        new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
88    protected Configuration conf;
89    
90    protected String pathPrefix;
91    protected AtomicInteger loadSequence = new AtomicInteger();
92  
93    public CoprocessorHost(Abortable abortable) {
94      this.abortable = abortable;
95      this.pathPrefix = UUID.randomUUID().toString();
96    }
97  
98    
99  
100 
101 
102 
103 
104 
105 
106   private static Set<String> coprocessorNames =
107       Collections.synchronizedSet(new HashSet<String>());
108 
109   public static Set<String> getLoadedCoprocessors() {
110     synchronized (coprocessorNames) {
111       return new HashSet(coprocessorNames);
112     }
113   }
114 
115   
116 
117 
118 
119 
120 
121 
122   public Set<String> getCoprocessors() {
123     Set<String> returnValue = new TreeSet<String>();
124     for(CoprocessorEnvironment e: coprocessors) {
125       returnValue.add(e.getInstance().getClass().getSimpleName());
126     }
127     return returnValue;
128   }
129 
130   
131 
132 
133 
134   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
135     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
136       DEFAULT_COPROCESSORS_ENABLED);
137     if (!coprocessorsEnabled) {
138       return;
139     }
140 
141     Class<?> implClass = null;
142 
143     
144     String[] defaultCPClasses = conf.getStrings(confKey);
145     if (defaultCPClasses == null || defaultCPClasses.length == 0)
146       return;
147 
148     int priority = Coprocessor.PRIORITY_SYSTEM;
149     List<E> configured = new ArrayList<E>();
150     for (String className : defaultCPClasses) {
151       className = className.trim();
152       if (findCoprocessor(className) != null) {
153         continue;
154       }
155       ClassLoader cl = this.getClass().getClassLoader();
156       Thread.currentThread().setContextClassLoader(cl);
157       try {
158         implClass = cl.loadClass(className);
159         configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
160         LOG.info("System coprocessor " + className + " was loaded " +
161             "successfully with priority (" + priority++ + ").");
162       } catch (Throwable t) {
163         
164         abortServer(className, t);
165       }
166     }
167 
168     
169     coprocessors.addAll(configured);
170   }
171 
172   
173 
174 
175 
176 
177 
178 
179 
180   public E load(Path path, String className, int priority,
181       Configuration conf) throws IOException {
182     Class<?> implClass = null;
183     LOG.debug("Loading coprocessor class " + className + " with path " +
184         path + " and priority " + priority);
185 
186     ClassLoader cl = null;
187     if (path == null) {
188       try {
189         implClass = getClass().getClassLoader().loadClass(className);
190       } catch (ClassNotFoundException e) {
191         throw new IOException("No jar path specified for " + className);
192       }
193     } else {
194       cl = CoprocessorClassLoader.getClassLoader(
195         path, getClass().getClassLoader(), pathPrefix, conf);
196       try {
197         implClass = cl.loadClass(className);
198       } catch (ClassNotFoundException e) {
199         throw new IOException("Cannot load external coprocessor class " + className, e);
200       }
201     }
202 
203     
204     Thread currentThread = Thread.currentThread();
205     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
206     try{
207       
208       currentThread.setContextClassLoader(cl);
209       E cpInstance = loadInstance(implClass, priority, conf);
210       return cpInstance;
211     } finally {
212       
213       currentThread.setContextClassLoader(hostClassLoader);
214     }
215   }
216 
217   
218 
219 
220 
221 
222 
223   public void load(Class<?> implClass, int priority, Configuration conf)
224       throws IOException {
225     E env = loadInstance(implClass, priority, conf);
226     coprocessors.add(env);
227   }
228 
229   
230 
231 
232 
233 
234 
235   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
236       throws IOException {
237     if (!Coprocessor.class.isAssignableFrom(implClass)) {
238       throw new IOException("Configured class " + implClass.getName() + " must implement "
239           + Coprocessor.class.getName() + " interface ");
240     }
241 
242     
243     Coprocessor impl;
244     Object o = null;
245     try {
246       o = implClass.newInstance();
247       impl = (Coprocessor)o;
248     } catch (InstantiationException e) {
249       throw new IOException(e);
250     } catch (IllegalAccessException e) {
251       throw new IOException(e);
252     }
253     
254     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
255     if (env instanceof Environment) {
256       ((Environment)env).startup();
257     }
258     
259     
260     coprocessorNames.add(implClass.getName());
261     return env;
262   }
263 
264   
265 
266 
267   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
268       int priority, int sequence, Configuration conf);
269 
270   public void shutdown(CoprocessorEnvironment e) {
271     if (e instanceof Environment) {
272       if (LOG.isDebugEnabled()) {
273         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
274       }
275       ((Environment)e).shutdown();
276     } else {
277       LOG.warn("Shutdown called on unknown environment: "+
278           e.getClass().getName());
279     }
280   }
281 
282   
283 
284 
285 
286 
287   public Coprocessor findCoprocessor(String className) {
288     for (E env: coprocessors) {
289       if (env.getInstance().getClass().getName().equals(className) ||
290           env.getInstance().getClass().getSimpleName().equals(className)) {
291         return env.getInstance();
292       }
293     }
294     return null;
295   }
296 
297   
298 
299 
300 
301 
302   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
303     ArrayList<T> ret = new ArrayList<T>();
304 
305     for (E env: coprocessors) {
306       Coprocessor cp = env.getInstance();
307 
308       if(cp != null) {
309         if (cls.isAssignableFrom(cp.getClass())) {
310           ret.add((T)cp);
311         }
312       }
313     }
314     return ret;
315   }
316 
317   
318 
319 
320 
321 
322   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
323     for (E env: coprocessors) {
324       if (env.getInstance().getClass().getName().equals(className) ||
325           env.getInstance().getClass().getSimpleName().equals(className)) {
326         return env;
327       }
328     }
329     return null;
330   }
331 
332   
333 
334 
335 
336 
337   Set<ClassLoader> getExternalClassLoaders() {
338     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
339     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
340     for (E env : coprocessors) {
341       ClassLoader cl = env.getInstance().getClass().getClassLoader();
342       if (cl != systemClassLoader ){
343         
344         externalClassLoaders.add(cl);
345       }
346     }
347     return externalClassLoaders;
348   }
349 
350   
351 
352 
353 
354   static class EnvironmentPriorityComparator
355       implements Comparator<CoprocessorEnvironment> {
356     @Override
357     public int compare(final CoprocessorEnvironment env1,
358         final CoprocessorEnvironment env2) {
359       if (env1.getPriority() < env2.getPriority()) {
360         return -1;
361       } else if (env1.getPriority() > env2.getPriority()) {
362         return 1;
363       }
364       if (env1.getLoadSequence() < env2.getLoadSequence()) {
365         return -1;
366       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
367         return 1;
368       }
369       return 0;
370     }
371   }
372 
373   
374 
375 
376   public static class Environment implements CoprocessorEnvironment {
377 
378     
379     public Coprocessor impl;
380     
381     protected int priority = Coprocessor.PRIORITY_USER;
382     
383     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
384     
385     protected List<HTableInterface> openTables =
386       Collections.synchronizedList(new ArrayList<HTableInterface>());
387     private int seq;
388     private Configuration conf;
389     private ClassLoader classLoader;
390 
391     
392 
393 
394 
395 
396     public Environment(final Coprocessor impl, final int priority,
397         final int seq, final Configuration conf) {
398       this.impl = impl;
399       this.classLoader = impl.getClass().getClassLoader();
400       this.priority = priority;
401       this.state = Coprocessor.State.INSTALLED;
402       this.seq = seq;
403       this.conf = conf;
404     }
405 
406     
407     public void startup() throws IOException {
408       if (state == Coprocessor.State.INSTALLED ||
409           state == Coprocessor.State.STOPPED) {
410         state = Coprocessor.State.STARTING;
411         Thread currentThread = Thread.currentThread();
412         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
413         try {
414           currentThread.setContextClassLoader(this.getClassLoader());
415           impl.start(this);
416           state = Coprocessor.State.ACTIVE;
417         } finally {
418           currentThread.setContextClassLoader(hostClassLoader);
419         }
420       } else {
421         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
422             " because not inactive (state="+state.toString()+")");
423       }
424     }
425 
426     
427     protected void shutdown() {
428       if (state == Coprocessor.State.ACTIVE) {
429         state = Coprocessor.State.STOPPING;
430         Thread currentThread = Thread.currentThread();
431         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
432         try {
433           currentThread.setContextClassLoader(this.getClassLoader());
434           impl.stop(this);
435           state = Coprocessor.State.STOPPED;
436         } catch (IOException ioe) {
437           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
438         } finally {
439           currentThread.setContextClassLoader(hostClassLoader);
440         }
441       } else {
442         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
443             " because not active (state="+state.toString()+")");
444       }
445       synchronized (openTables) {
446         
447         for (HTableInterface table: openTables) {
448           try {
449             ((HTableWrapper)table).internalClose();
450           } catch (IOException e) {
451             
452             LOG.warn("Failed to close " +
453                 Bytes.toStringBinary(table.getTableName()), e);
454           }
455         }
456       }
457     }
458 
459     @Override
460     public Coprocessor getInstance() {
461       return impl;
462     }
463 
464     @Override
465     public ClassLoader getClassLoader() {
466       return classLoader;
467     }
468 
469     @Override
470     public int getPriority() {
471       return priority;
472     }
473 
474     @Override
475     public int getLoadSequence() {
476       return seq;
477     }
478 
479     
480     @Override
481     public int getVersion() {
482       return Coprocessor.VERSION;
483     }
484 
485     
486     @Override
487     public String getHBaseVersion() {
488       return VersionInfo.getVersion();
489     }
490 
491     @Override
492     public Configuration getConfiguration() {
493       return conf;
494     }
495 
496     
497 
498 
499 
500 
501 
502     @Override
503     public HTableInterface getTable(TableName tableName) throws IOException {
504       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
505     }
506 
507     
508 
509 
510 
511 
512 
513     @Override
514     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
515       return HTableWrapper.createWrapper(openTables, tableName, this, pool);
516     }
517   }
518 
519   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
520     abortServer(environment.getInstance().getClass().getName(), e);
521   }
522 
523   protected void abortServer(final String coprocessorName, final Throwable e) {
524     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
525     LOG.error(message, e);
526     if (abortable != null) {
527       abortable.abort(message, e);
528     } else {
529       LOG.warn("No available Abortable, process was not aborted");
530     }
531   }
532 
533   
534 
535 
536 
537 
538 
539 
540 
541 
542 
543 
544 
545 
546 
547 
548   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
549       throws IOException {
550     if (e instanceof IOException) {
551       throw (IOException)e;
552     }
553     
554     
555     
556     
557     
558     
559     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
560       
561       abortServer(env, e);
562     } else {
563       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
564           "environment because it threw:  " + e,e);
565       coprocessors.remove(env);
566       try {
567         shutdown(env);
568       } catch (Exception x) {
569         LOG.error("Uncaught exception when shutting down coprocessor '"
570             + env.toString() + "'", x);
571       }
572       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
573           "' threw: '" + e + "' and has been removed from the active " +
574           "coprocessor set.", e);
575     }
576   }
577 
578   
579 
580 
581 
582 
583 
584 
585 
586 
587 
588 
589 
590 
591 
592 
593 
594 
595 
596 
597 
598   @InterfaceAudience.Private
599   protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
600       final String methodName, final Class<?>... parameterTypes) {
601     boolean useLegacy;
602     
603     try {
604       clazz.getDeclaredMethod(methodName, parameterTypes);
605       LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
606           "signature. Skipping legacy support for invocations in '" + clazz +"'.");
607       useLegacy = false;
608     } catch (NoSuchMethodException exception) {
609       useLegacy = true;
610     } catch (SecurityException exception) {
611       LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
612           "' requires legacy support; assuming it does. If you get later errors about legacy " +
613           "coprocessor use, consider updating your security policy to allow access to the package" +
614           " and declared members of your implementation.");
615       LOG.debug("Details of Security Manager rejection.", exception);
616       useLegacy = true;
617     }
618     return useLegacy;
619   }
620 
621   
622 
623 
624   private static final Set<Class<? extends Coprocessor>> legacyWarning =
625       new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
626           new Comparator<Class<? extends Coprocessor>>() {
627             @Override
628             public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
629               if (c1.equals(c2)) {
630                 return 0;
631               }
632               return c1.getName().compareTo(c2.getName());
633             }
634           });
635 
636   
637 
638 
639 
640 
641 
642   @InterfaceAudience.Private
643   protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
644     if(legacyWarning.add(clazz)) {
645       LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
646           " deprecated API. Your coprocessor will not see these events.  Please update '" + clazz +
647           "'. Details of the problem: " + message);
648     }
649   }
650 }