1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.replication.regionserver;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.hadoop.conf.Configured;
24  import org.apache.hadoop.fs.FileSystem;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.Abortable;
27  import org.apache.hadoop.hbase.ChoreService;
28  import org.apache.hadoop.hbase.CoordinatedStateManager;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.Server;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.client.ClusterConnection;
34  import org.apache.hadoop.hbase.util.FSUtils;
35  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
36  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
37  import org.apache.hadoop.util.Tool;
38  import org.apache.hadoop.util.ToolRunner;
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  public class ReplicationSyncUp extends Configured implements Tool {
50  
51    static final Log LOG = LogFactory.getLog(ReplicationSyncUp.class.getName());
52  
53    private static Configuration conf;
54  
55    private static final long SLEEP_TIME = 10000;
56  
57    
58    
59    public static void setConfigure(Configuration config) {
60      conf = config;
61    }
62  
63    
64  
65  
66  
67  
68    public static void main(String[] args) throws Exception {
69      if (conf == null) conf = HBaseConfiguration.create();
70      int ret = ToolRunner.run(conf, new ReplicationSyncUp(), args);
71      System.exit(ret);
72    }
73  
74    @Override
75    public int run(String[] args) throws Exception {
76      Replication replication;
77      ReplicationSourceManager manager;
78      FileSystem fs;
79      Path oldLogDir, logDir, rootDir;
80      ZooKeeperWatcher zkw;
81  
82      Abortable abortable = new Abortable() {
83        @Override
84        public void abort(String why, Throwable e) {
85        }
86  
87        @Override
88        public boolean isAborted() {
89          return false;
90        }
91      };
92  
93      zkw =
94          new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable,
95              true);
96  
97      rootDir = FSUtils.getRootDir(conf);
98      fs = FileSystem.get(conf);
99      oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
100     logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
101 
102     System.out.println("Start Replication Server start");
103     replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
104     manager = replication.getReplicationManager();
105     manager.init();
106 
107     try {
108       int numberOfOldSource = 1; 
109       while (numberOfOldSource > 0) {
110         Thread.sleep(SLEEP_TIME);
111         numberOfOldSource = manager.getOldSources().size();
112       }
113     } catch (InterruptedException e) {
114       System.err.println("didn't wait long enough:" + e);
115       return (-1);
116     }
117 
118     manager.join();
119     zkw.close();
120 
121     return (0);
122   }
123 
124   static class DummyServer implements Server {
125     String hostname;
126     ZooKeeperWatcher zkw;
127 
128     DummyServer(ZooKeeperWatcher zkw) {
129       
130       hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org";
131       this.zkw = zkw;
132     }
133 
134     DummyServer(String hostname) {
135       this.hostname = hostname;
136     }
137 
138     @Override
139     public Configuration getConfiguration() {
140       return conf;
141     }
142 
143     @Override
144     public ZooKeeperWatcher getZooKeeper() {
145       return zkw;
146     }
147 
148     @Override
149     public CoordinatedStateManager getCoordinatedStateManager() {
150       return null;
151     }
152 
153     @Override
154     public MetaTableLocator getMetaTableLocator() {
155       return null;
156     }
157 
158     @Override
159     public ServerName getServerName() {
160       return ServerName.valueOf(hostname, 1234, 1L);
161     }
162 
163     @Override
164     public void abort(String why, Throwable e) {
165     }
166 
167     @Override
168     public boolean isAborted() {
169       return false;
170     }
171 
172     @Override
173     public void stop(String why) {
174     }
175 
176     @Override
177     public boolean isStopped() {
178       return false;
179     }
180 
181     @Override
182     public ClusterConnection getConnection() {
183       return null;
184     }
185 
186     @Override
187     public ChoreService getChoreService() {
188       return null;
189     }
190   }
191 }