1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase;
20  
21  import com.google.common.collect.Sets;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
26  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
29  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
30  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
31  import org.apache.zookeeper.KeeperException;
32  
33  import java.io.IOException;
34  import java.util.List;
35  import java.util.NavigableMap;
36  import java.util.NavigableSet;
37  import java.util.concurrent.ConcurrentSkipListMap;
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  @InterfaceAudience.Private
50  public class ZKNamespaceManager extends ZooKeeperListener {
51    private static Log LOG = LogFactory.getLog(ZKNamespaceManager.class);
52    private final String nsZNode;
53    private volatile NavigableMap<String,NamespaceDescriptor> cache;
54  
55    public ZKNamespaceManager(ZooKeeperWatcher zkw) throws IOException {
56      super(zkw);
57      nsZNode = ZooKeeperWatcher.namespaceZNode;
58      cache = new ConcurrentSkipListMap<String, NamespaceDescriptor>();
59    }
60  
61    public void start() throws IOException {
62      watcher.registerListener(this);
63      try {
64        if (ZKUtil.watchAndCheckExists(watcher, nsZNode)) {
65          List<ZKUtil.NodeAndData> existing =
66              ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
67          if (existing != null) {
68            refreshNodes(existing);
69          }
70        } else {
71          ZKUtil.createWithParents(watcher, nsZNode);
72        }
73      } catch (KeeperException e) {
74        throw new IOException("Failed to initialize ZKNamespaceManager", e);
75      }
76    }
77  
78    public NamespaceDescriptor get(String name) {
79      return cache.get(name);
80    }
81  
82    public void update(NamespaceDescriptor ns) throws IOException {
83      writeNamespace(ns);
84      cache.put(ns.getName(), ns);
85    }
86  
87    public void remove(String name) throws IOException {
88      deleteNamespace(name);
89      cache.remove(name);
90    }
91  
92    public NavigableSet<NamespaceDescriptor> list() throws IOException {
93      NavigableSet<NamespaceDescriptor> ret =
94          Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR);
95      for(NamespaceDescriptor ns: cache.values()) {
96        ret.add(ns);
97      }
98      return ret;
99    }
100 
101   @Override
102   public void nodeCreated(String path) {
103     if (nsZNode.equals(path)) {
104       try {
105         List<ZKUtil.NodeAndData> nodes =
106             ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
107         refreshNodes(nodes);
108       } catch (KeeperException ke) {
109         String msg = "Error reading data from zookeeper";
110         LOG.error(msg, ke);
111         watcher.abort(msg, ke);
112       } catch (IOException e) {
113         String msg = "Error parsing data from zookeeper";
114         LOG.error(msg, e);
115         watcher.abort(msg, e);
116       }
117     }
118   }
119 
120   @Override
121   public void nodeDeleted(String path) {
122     if (nsZNode.equals(ZKUtil.getParent(path))) {
123       String nsName = ZKUtil.getNodeName(path);
124       cache.remove(nsName);
125     }
126   }
127 
128   @Override
129   public void nodeDataChanged(String path) {
130     if (nsZNode.equals(ZKUtil.getParent(path))) {
131       try {
132         byte[] data = ZKUtil.getDataAndWatch(watcher, path);
133         NamespaceDescriptor ns =
134             ProtobufUtil.toNamespaceDescriptor(
135                 HBaseProtos.NamespaceDescriptor.parseFrom(data));
136         cache.put(ns.getName(), ns);
137       } catch (KeeperException ke) {
138         String msg = "Error reading data from zookeeper for node "+path;
139         LOG.error(msg, ke);
140         
141         watcher.abort(msg, ke);
142       } catch (IOException ioe) {
143         String msg = "Error deserializing namespace: "+path;
144         LOG.error(msg, ioe);
145         watcher.abort(msg, ioe);
146       }
147     }
148   }
149 
150   @Override
151   public void nodeChildrenChanged(String path) {
152     if (nsZNode.equals(path)) {
153       try {
154         List<ZKUtil.NodeAndData> nodes =
155             ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
156         refreshNodes(nodes);
157       } catch (KeeperException ke) {
158         LOG.error("Error reading data from zookeeper for path "+path, ke);
159         watcher.abort("Zookeeper error get node children for path "+path, ke);
160       } catch (IOException e) {
161         LOG.error("Error deserializing namespace child from: "+path, e);
162         watcher.abort("Error deserializing namespace child from: " + path, e);
163       }
164     }
165   }
166 
167   private void deleteNamespace(String name) throws IOException {
168     String zNode = ZKUtil.joinZNode(nsZNode, name);
169     try {
170       ZKUtil.deleteNode(watcher, zNode);
171     } catch (KeeperException e) {
172       LOG.error("Failed updating permissions for namespace "+name, e);
173       throw new IOException("Failed updating permissions for namespace "+name, e);
174     }
175   }
176 
177   private void writeNamespace(NamespaceDescriptor ns) throws IOException {
178     String zNode = ZKUtil.joinZNode(nsZNode, ns.getName());
179     try {
180       ZKUtil.createWithParents(watcher, zNode);
181       ZKUtil.updateExistingNodeData(watcher, zNode,
182           ProtobufUtil.toProtoNamespaceDescriptor(ns).toByteArray(), -1);
183     } catch (KeeperException e) {
184       LOG.error("Failed updating permissions for namespace "+ns.getName(), e);
185       throw new IOException("Failed updating permissions for namespace "+ns.getName(), e);
186     }
187   }
188 
189   private void refreshNodes(List<ZKUtil.NodeAndData> nodes) throws IOException {
190     for (ZKUtil.NodeAndData n : nodes) {
191       if (n.isEmpty()) continue;
192       String path = n.getNode();
193       String namespace = ZKUtil.getNodeName(path);
194       byte[] nodeData = n.getData();
195       if (LOG.isDebugEnabled()) {
196         LOG.debug("Updating namespace cache from node "+namespace+" with data: "+
197             Bytes.toStringBinary(nodeData));
198       }
199       NamespaceDescriptor ns =
200           ProtobufUtil.toNamespaceDescriptor(
201               HBaseProtos.NamespaceDescriptor.parseFrom(nodeData));
202       cache.put(ns.getName(), ns);
203     }
204   }
205 }