1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.lang.reflect.Method;
25  import java.net.InetSocketAddress;
26  import java.net.URI;
27  import java.util.HashSet;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.Collection;
31  
32  import com.google.common.collect.Sets;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.classification.InterfaceStability;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hdfs.DistributedFileSystem;
41  import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
42  
43  
44  
45  
46  
47  @InterfaceAudience.Private
48  @InterfaceStability.Evolving
49  public class FSHDFSUtils extends FSUtils {
50    private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
51    private static Class dfsUtilClazz;
52    private static Method getNNAddressesMethod;
53  
54    
55  
56  
57  
58  
59    private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
60                                                        Configuration conf) {
61      Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
62      String serviceName = fs.getCanonicalServiceName();
63  
64      if (serviceName.startsWith("ha-hdfs")) {
65        try {
66          if (dfsUtilClazz == null) {
67            dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
68          }
69          if (getNNAddressesMethod == null) {
70            try {
71              
72              
73              getNNAddressesMethod =
74                  dfsUtilClazz.getMethod("getNNServiceRpcAddressesForCluster", Configuration.class);
75            } catch (NoSuchMethodException e) {
76              
77              getNNAddressesMethod =
78                  dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
79            }
80  
81          }
82  
83          Map<String, Map<String, InetSocketAddress>> addressMap =
84                  (Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
85                          .invoke(null, conf);
86          for (Map.Entry<String, Map<String, InetSocketAddress>> entry : addressMap.entrySet()) {
87            Map<String, InetSocketAddress> nnMap = entry.getValue();
88            for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
89              InetSocketAddress addr = e2.getValue();
90              addresses.add(addr);
91            }
92          }
93        } catch (Exception e) {
94          LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
95        }
96      } else {
97        URI uri = fs.getUri();
98        int port = uri.getPort();
99        if (port < 0) {
100         int idx = serviceName.indexOf(':');
101         port = Integer.parseInt(serviceName.substring(idx+1));
102       }
103       InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
104       addresses.add(addr);
105     }
106 
107     return addresses;
108   }
109 
110   
111 
112 
113 
114 
115 
116   public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
117     
118     
119     String srcServiceName = srcFs.getCanonicalServiceName();
120     String desServiceName = desFs.getCanonicalServiceName();
121 
122     if (srcServiceName == null || desServiceName == null) {
123       return false;
124     }
125     if (srcServiceName.equals(desServiceName)) {
126       return true;
127     }
128     if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
129       Collection<String> internalNameServices =
130           conf.getTrimmedStringCollection("dfs.internal.nameservices");
131       if (!internalNameServices.isEmpty()) {
132         if (internalNameServices.contains(srcServiceName.split(":")[1])) {
133           return true;
134         } else {
135           return false;
136         }
137       }
138     }
139     if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
140       
141       
142       
143       Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
144       Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
145       if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
146         return true;
147       }
148     }
149 
150     return false;
151   }
152 
153   
154 
155 
156   @Override
157   public void recoverFileLease(final FileSystem fs, final Path p,
158       Configuration conf, CancelableProgressable reporter)
159   throws IOException {
160     
161     if (!(fs instanceof DistributedFileSystem)) return;
162     recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter);
163   }
164 
165   
166 
167 
168 
169 
170 
171 
172 
173 
174 
175 
176 
177 
178 
179 
180 
181 
182 
183 
184 
185 
186 
187 
188 
189 
190 
191   boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
192       final Configuration conf, final CancelableProgressable reporter)
193   throws IOException {
194     LOG.info("Recovering lease on dfs file " + p);
195     long startWaiting = EnvironmentEdgeManager.currentTime();
196     
197     
198     
199     long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
200     
201     long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
202     
203     
204     
205     
206     
207     long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 61 * 1000);
208 
209     Method isFileClosedMeth = null;
210     
211     boolean findIsFileClosedMeth = true;
212     boolean recovered = false;
213     
214     for (int nbAttempt = 0; !recovered; nbAttempt++) {
215       recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
216       if (recovered) break;
217       checkIfCancelled(reporter);
218       if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) break;
219       try {
220         
221         if (nbAttempt == 0) {
222           Thread.sleep(firstPause);
223         } else {
224           
225           
226           long localStartWaiting = EnvironmentEdgeManager.currentTime();
227           while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) <
228               subsequentPauseBase * nbAttempt) {
229             Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
230             if (findIsFileClosedMeth) {
231               try {
232                 isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
233                   new Class[]{ Path.class });
234               } catch (NoSuchMethodException nsme) {
235                 LOG.debug("isFileClosed not available");
236               } finally {
237                 findIsFileClosedMeth = false;
238               }
239             }
240             if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
241               recovered = true;
242               break;
243             }
244             checkIfCancelled(reporter);
245           }
246         }
247       } catch (InterruptedException ie) {
248         InterruptedIOException iioe = new InterruptedIOException();
249         iioe.initCause(ie);
250         throw iioe;
251       }
252     }
253     return recovered;
254   }
255 
256   boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
257       final int nbAttempt, final Path p, final long startWaiting) {
258     if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
259       LOG.warn("Cannot recoverLease after trying for " +
260         conf.getInt("hbase.lease.recovery.timeout", 900000) +
261         "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
262         getLogMessageDetail(nbAttempt, p, startWaiting));
263       return true;
264     }
265     return false;
266   }
267 
268   
269 
270 
271 
272 
273 
274 
275 
276 
277   boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
278       final long startWaiting)
279   throws FileNotFoundException {
280     boolean recovered = false;
281     try {
282       recovered = dfs.recoverLease(p);
283       LOG.info("recoverLease=" + recovered + ", " +
284         getLogMessageDetail(nbAttempt, p, startWaiting));
285     } catch (IOException e) {
286       if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
287         
288         throw new FileNotFoundException("The given WAL wasn't found at " + p);
289       } else if (e instanceof FileNotFoundException) {
290         throw (FileNotFoundException)e;
291       }
292       LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
293     }
294     return recovered;
295   }
296 
297   
298 
299 
300 
301 
302 
303   private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
304     return "attempt=" + nbAttempt + " on file=" + p + " after " +
305       (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
306   }
307 
308   
309 
310 
311 
312 
313 
314 
315   private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
316     try {
317       return (Boolean) m.invoke(dfs, p);
318     } catch (SecurityException e) {
319       LOG.warn("No access", e);
320     } catch (Exception e) {
321       LOG.warn("Failed invocation for " + p.toString(), e);
322     }
323     return false;
324   }
325 
326   void checkIfCancelled(final CancelableProgressable reporter)
327   throws InterruptedIOException {
328     if (reporter == null) return;
329     if (!reporter.progress()) throw new InterruptedIOException("Operation cancelled");
330   }
331 }