1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  package org.apache.hadoop.hbase.coordination;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.lang.math.RandomUtils;
30  import org.apache.commons.lang.mutable.MutableInt;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.SplitLogCounters;
39  import org.apache.hadoop.hbase.SplitLogTask;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.exceptions.DeserializationException;
42  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
43  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
44  import org.apache.hadoop.hbase.regionserver.Region;
45  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
46  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
47  import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
48  import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
49  import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
50  import org.apache.hadoop.hbase.util.CancelableProgressable;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
53  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
54  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
56  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57  import org.apache.hadoop.util.StringUtils;
58  import org.apache.zookeeper.AsyncCallback;
59  import org.apache.zookeeper.KeeperException;
60  import org.apache.zookeeper.data.Stat;
61  
62  
63  
64  
65  
66  
67  @InterfaceAudience.Private
68  public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
69      SplitLogWorkerCoordination {
70  
71    private static final Log LOG = LogFactory.getLog(ZkSplitLogWorkerCoordination.class);
72  
73    private static final int checkInterval = 5000; 
74    private static final int FAILED_TO_OWN_TASK = -1;
75  
76    private  SplitLogWorker worker;
77  
78    private TaskExecutor splitTaskExecutor;
79  
80    private final Object taskReadyLock = new Object();
81    volatile int taskReadySeq = 0;
82    private volatile String currentTask = null;
83    private int currentVersion;
84    private volatile boolean shouldStop = false;
85    private final Object grabTaskLock = new Object();
86    private boolean workerInGrabTask = false;
87    private int reportPeriod;
88    private RegionServerServices server = null;
89    protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
90    private int maxConcurrentTasks = 0;
91  
92    private final ZkCoordinatedStateManager manager;
93  
94    public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager,
95        ZooKeeperWatcher watcher) {
96      super(watcher);
97      manager = zkCoordinatedStateManager;
98  
99    }
100 
101   
102 
103 
104   @Override
105   public void nodeChildrenChanged(String path) {
106     if (path.equals(watcher.splitLogZNode)) {
107       LOG.debug("tasks arrived or departed");
108       synchronized (taskReadyLock) {
109         taskReadySeq++;
110         taskReadyLock.notify();
111       }
112     }
113   }
114 
115   
116 
117 
118   @Override
119   public void nodeDataChanged(String path) {
120     
121     
122     synchronized (grabTaskLock) {
123       if (workerInGrabTask) {
124         
125         String taskpath = currentTask;
126         if (taskpath != null && taskpath.equals(path)) {
127           getDataSetWatchAsync();
128         }
129       }
130     }
131   }
132 
133   
134 
135 
136   @Override
137   public void init(RegionServerServices server, Configuration conf,
138       TaskExecutor splitExecutor, SplitLogWorker worker) {
139     this.server = server;
140     this.worker = worker;
141     this.splitTaskExecutor = splitExecutor;
142     maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
143     reportPeriod =
144         conf.getInt("hbase.splitlog.report.period",
145           conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
146             ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
147   }
148 
149   
150 
151   void getDataSetWatchFailure(String path) {
152     synchronized (grabTaskLock) {
153       if (workerInGrabTask) {
154         
155         String taskpath = currentTask;
156         if (taskpath != null && taskpath.equals(path)) {
157           LOG.info("retrying data watch on " + path);
158           SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
159           getDataSetWatchAsync();
160         } else {
161           
162           
163         }
164       }
165     }
166   }
167 
168   public void getDataSetWatchAsync() {
169     watcher.getRecoverableZooKeeper().getZooKeeper()
170         .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
171     SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
172   }
173 
174   void getDataSetWatchSuccess(String path, byte[] data) {
175     SplitLogTask slt;
176     try {
177       slt = SplitLogTask.parseFrom(data);
178     } catch (DeserializationException e) {
179       LOG.warn("Failed parse", e);
180       return;
181     }
182     synchronized (grabTaskLock) {
183       if (workerInGrabTask) {
184         
185         String taskpath = currentTask;
186         if (taskpath != null && taskpath.equals(path)) {
187           ServerName serverName = manager.getServer().getServerName();
188           
189           
190           
191           
192           
193           
194           if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
195               && !slt.isResigned(serverName)) {
196             LOG.info("task " + taskpath + " preempted from " + serverName
197                 + ", current task state and owner=" + slt.toString());
198             worker.stopTask();
199           }
200         }
201       }
202     }
203   }
204 
205   
206 
207 
208 
209 
210   private void grabTask(String path) {
211     Stat stat = new Stat();
212     byte[] data;
213     synchronized (grabTaskLock) {
214       currentTask = path;
215       workerInGrabTask = true;
216       if (Thread.interrupted()) {
217         return;
218       }
219     }
220     try {
221       try {
222         if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
223           SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
224           return;
225         }
226       } catch (KeeperException e) {
227         LOG.warn("Failed to get data for znode " + path, e);
228         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
229         return;
230       }
231       SplitLogTask slt;
232       try {
233         slt = SplitLogTask.parseFrom(data);
234       } catch (DeserializationException e) {
235         LOG.warn("Failed parse data for znode " + path, e);
236         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
237         return;
238       }
239       if (!slt.isUnassigned()) {
240         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
241         return;
242       }
243 
244       currentVersion =
245           attemptToOwnTask(true, watcher, server.getServerName(), path,
246             slt.getMode(), stat.getVersion());
247       if (currentVersion < 0) {
248         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
249         return;
250       }
251 
252       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
253         ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
254             new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
255         splitTaskDetails.setTaskNode(currentTask);
256         splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
257 
258         endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
259           SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
260         return;
261       }
262 
263       LOG.info("worker " + server.getServerName() + " acquired task " + path);
264       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
265       getDataSetWatchAsync();
266 
267       submitTask(path, slt.getMode(), currentVersion, reportPeriod);
268 
269       
270       try {
271         int sleepTime = RandomUtils.nextInt(500) + 500;
272         Thread.sleep(sleepTime);
273       } catch (InterruptedException e) {
274         LOG.warn("Interrupted while yielding for other region servers", e);
275         Thread.currentThread().interrupt();
276       }
277     } finally {
278       synchronized (grabTaskLock) {
279         workerInGrabTask = false;
280         
281         
282         Thread.interrupted();
283       }
284     }
285   }
286 
287   
288 
289 
290 
291 
292   void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
293       final int reportPeriod) {
294     final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
295 
296     CancelableProgressable reporter = new CancelableProgressable() {
297       private long last_report_at = 0;
298 
299       @Override
300       public boolean progress() {
301         long t = EnvironmentEdgeManager.currentTime();
302         if ((t - last_report_at) > reportPeriod) {
303           last_report_at = t;
304           int latestZKVersion =
305               attemptToOwnTask(false, watcher, server.getServerName(), curTask,
306                 mode, zkVersion.intValue());
307           if (latestZKVersion < 0) {
308             LOG.warn("Failed to heartbeat the task" + curTask);
309             return false;
310           }
311           zkVersion.setValue(latestZKVersion);
312         }
313         return true;
314       }
315     };
316     ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
317         new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
318     splitTaskDetails.setTaskNode(curTask);
319     splitTaskDetails.setCurTaskZKVersion(zkVersion);
320 
321     WALSplitterHandler hsh =
322         new WALSplitterHandler(server, this, splitTaskDetails, reporter,
323             this.tasksInProgress, splitTaskExecutor, mode);
324     server.getExecutorService().submit(hsh);
325   }
326 
327   
328 
329 
330 
331 
332 
333   private int calculateAvailableSplitters(int numTasks) {
334     
335     int availableRSs = 1;
336     try {
337       List<String> regionServers =
338           ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
339       availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
340     } catch (KeeperException e) {
341       
342       LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
343     }
344 
345     int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
346     expectedTasksPerRS = Math.max(1, expectedTasksPerRS); 
347     
348     return Math.min(expectedTasksPerRS, maxConcurrentTasks)
349         - this.tasksInProgress.get();
350   }
351 
352   
353 
354 
355 
356 
357 
358 
359 
360 
361 
362 
363 
364 
365   protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
366       ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
367     int latestZKVersion = FAILED_TO_OWN_TASK;
368     try {
369       SplitLogTask slt = new SplitLogTask.Owned(server, mode);
370       Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
371       if (stat == null) {
372         LOG.warn("zk.setData() returned null for path " + task);
373         SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
374         return FAILED_TO_OWN_TASK;
375       }
376       latestZKVersion = stat.getVersion();
377       SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
378       return latestZKVersion;
379     } catch (KeeperException e) {
380       if (!isFirstTime) {
381         if (e.code().equals(KeeperException.Code.NONODE)) {
382           LOG.warn("NONODE failed to assert ownership for " + task, e);
383         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
384           LOG.warn("BADVERSION failed to assert ownership for " + task, e);
385         } else {
386           LOG.warn("failed to assert ownership for " + task, e);
387         }
388       }
389     } catch (InterruptedException e1) {
390       LOG.warn("Interrupted while trying to assert ownership of " + task + " "
391           + StringUtils.stringifyException(e1));
392       Thread.currentThread().interrupt();
393     }
394     SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
395     return FAILED_TO_OWN_TASK;
396   }
397 
398   
399 
400 
401 
402 
403 
404 
405 
406 
407   @Override
408   public void taskLoop() throws InterruptedException {
409     while (!shouldStop) {
410       int seq_start = taskReadySeq;
411       List<String> paths = null;
412       paths = getTaskList();
413       if (paths == null) {
414         LOG.warn("Could not get tasks, did someone remove " + watcher.splitLogZNode
415             + " ... worker thread exiting.");
416         return;
417       }
418       
419       int offset = (int) (Math.random() * paths.size());
420       for (int i = 0; i < paths.size(); i++) {
421         if (DefaultWALProvider.isMetaFile(paths.get(i))) {
422           offset = i;
423           break;
424         }
425       }
426       int numTasks = paths.size();
427       for (int i = 0; i < numTasks; i++) {
428         int idx = (i + offset) % paths.size();
429         
430         
431         if (this.calculateAvailableSplitters(numTasks) > 0) {
432           grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
433         } else {
434           LOG.debug("Current region server " + server.getServerName() + " has "
435               + this.tasksInProgress.get() + " tasks in progress and can't take more.");
436           break;
437         }
438         if (shouldStop) {
439           return;
440         }
441       }
442       SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
443       synchronized (taskReadyLock) {
444         while (seq_start == taskReadySeq) {
445           taskReadyLock.wait(checkInterval);
446           if (server != null) {
447             
448             Map<String, Region> recoveringRegions = server.getRecoveringRegions();
449             if (!recoveringRegions.isEmpty()) {
450               
451               
452               List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
453               int listSize = tmpCopy.size();
454               for (int i = 0; i < listSize; i++) {
455                 String region = tmpCopy.get(i);
456                 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
457                 try {
458                   if (ZKUtil.checkExists(watcher, nodePath) == -1) {
459                     server.getExecutorService().submit(
460                       new FinishRegionRecoveringHandler(server, region, nodePath));
461                   } else {
462                     
463                     
464                     
465                     
466                     
467                     break;
468                   }
469                 } catch (KeeperException e) {
470                   
471                   LOG.debug("Got a zookeeper when trying to open a recovering region", e);
472                   break;
473                 }
474               }
475             }
476           }
477         }
478       }
479     }
480   }
481 
482   private List<String> getTaskList() throws InterruptedException {
483     List<String> childrenPaths = null;
484     long sleepTime = 1000;
485     
486     
487     while (!shouldStop) {
488       try {
489         childrenPaths =
490             ZKUtil.listChildrenAndWatchForNewChildren(watcher,
491               watcher.splitLogZNode);
492         if (childrenPaths != null) {
493           return childrenPaths;
494         }
495       } catch (KeeperException e) {
496         LOG.warn("Could not get children of znode " + watcher.splitLogZNode, e);
497       }
498       LOG.debug("Retry listChildren of znode " + watcher.splitLogZNode
499           + " after sleep for " + sleepTime + "ms!");
500       Thread.sleep(sleepTime);
501     }
502     return childrenPaths;
503   }
504 
505   @Override
506   public void markCorrupted(Path rootDir, String name, FileSystem fs) {
507     ZKSplitLog.markCorrupted(rootDir, name, fs);
508   }
509 
510   @Override
511   public boolean isReady() throws InterruptedException {
512     int result = -1;
513     try {
514       result = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
515     } catch (KeeperException e) {
516       
517       LOG.warn("Exception when checking for " + watcher.splitLogZNode
518           + " ... retrying", e);
519     }
520     if (result == -1) {
521       LOG.info(watcher.splitLogZNode
522           + " znode does not exist, waiting for master to create");
523       Thread.sleep(1000);
524     }
525     return (result != -1);
526   }
527 
528   @Override
529   public int getTaskReadySeq() {
530     return taskReadySeq;
531   }
532 
533   @Override
534   public void registerListener() {
535     watcher.registerListener(this);
536   }
537 
538   @Override
539   public void removeListener() {
540     watcher.unregisterListener(this);
541   }
542 
543 
544   @Override
545   public void stopProcessingTasks() {
546     this.shouldStop = true;
547 
548   }
549 
550   @Override
551   public boolean isStop() {
552     return shouldStop;
553   }
554 
555   @Override
556   public RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key)
557       throws IOException {
558     return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, key);
559   }
560 
561   
562 
563 
564   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
565     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
566 
567     @Override
568     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
569       SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
570       if (rc != 0) {
571         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
572         getDataSetWatchFailure(path);
573         return;
574       }
575       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
576       getDataSetWatchSuccess(path, data);
577     }
578   }
579 
580   
581 
582 
583   
584 
585 
586 
587 
588 
589   @Override
590   public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) {
591     ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
592     String task = zkDetails.getTaskNode();
593     int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
594     try {
595       if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
596         LOG.info("successfully transitioned task " + task + " to final state " + slt);
597         ctr.incrementAndGet();
598         return;
599       }
600       LOG.warn("failed to transistion task " + task + " to end state " + slt
601           + " because of version mismatch ");
602     } catch (KeeperException.BadVersionException bve) {
603       LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
604         bve);
605     } catch (KeeperException.NoNodeException e) {
606       LOG.fatal(
607         "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
608     } catch (KeeperException e) {
609       LOG.warn("failed to end task, " + task + " " + slt, e);
610     }
611     SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
612   }
613 
614   
615 
616 
617 
618   public static class ZkSplitTaskDetails implements SplitTaskDetails {
619     private String taskNode;
620     private MutableInt curTaskZKVersion;
621 
622     public ZkSplitTaskDetails() {
623     }
624 
625     public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
626       this.taskNode = taskNode;
627       this.curTaskZKVersion = curTaskZKVersion;
628     }
629 
630     public String getTaskNode() {
631       return taskNode;
632     }
633 
634     public void setTaskNode(String taskNode) {
635       this.taskNode = taskNode;
636     }
637 
638     public MutableInt getCurTaskZKVersion() {
639       return curTaskZKVersion;
640     }
641 
642     public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
643       this.curTaskZKVersion = curTaskZKVersion;
644     }
645 
646     @Override
647     public String getWALFile() {
648       return ZKSplitLog.getFileName(taskNode);
649     }
650   }
651 
652 }