1 /*
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27
28 import com.google.common.annotations.VisibleForTesting;
29
30 /**
31 * Allows multiple concurrent clients to lock on a numeric id with a minimal
32 * memory overhead. The intended usage is as follows:
33 *
34 * <pre>
35 * IdLock.Entry lockEntry = idLock.getLockEntry(id);
36 * try {
37 * // User code.
38 * } finally {
39 * idLock.releaseLockEntry(lockEntry);
40 * }</pre>
41 */
42 @InterfaceAudience.Private
43 public class IdLock {
44
45 /** An entry returned to the client as a lock object */
46 public static class Entry {
47 private final long id;
48 private int numWaiters;
49 private boolean isLocked = true;
50
51 private Entry(long id) {
52 this.id = id;
53 }
54
55 public String toString() {
56 return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
57 + isLocked;
58 }
59 }
60
61 private ConcurrentMap<Long, Entry> map =
62 new ConcurrentHashMap<Long, Entry>();
63
64 /**
65 * Blocks until the lock corresponding to the given id is acquired.
66 *
67 * @param id an arbitrary number to lock on
68 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
69 * the lock
70 * @throws IOException if interrupted
71 */
72 public Entry getLockEntry(long id) throws IOException {
73 Entry entry = new Entry(id);
74 Entry existing;
75 while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
76 synchronized (existing) {
77 if (existing.isLocked) {
78 ++existing.numWaiters; // Add ourselves to waiters.
79 while (existing.isLocked) {
80 try {
81 existing.wait();
82 } catch (InterruptedException e) {
83 --existing.numWaiters; // Remove ourselves from waiters.
84 throw new InterruptedIOException(
85 "Interrupted waiting to acquire sparse lock");
86 }
87 }
88
89 --existing.numWaiters; // Remove ourselves from waiters.
90 existing.isLocked = true;
91 return existing;
92 }
93 // If the entry is not locked, it might already be deleted from the
94 // map, so we cannot return it. We need to get our entry into the map
95 // or get someone else's locked entry.
96 }
97 }
98 return entry;
99 }
100
101 /**
102 * Must be called in a finally block to decrease the internal counter and
103 * remove the monitor object for the given id if the caller is the last
104 * client.
105 *
106 * @param entry the return value of {@link #getLockEntry(long)}
107 */
108 public void releaseLockEntry(Entry entry) {
109 synchronized (entry) {
110 entry.isLocked = false;
111 if (entry.numWaiters > 0) {
112 entry.notify();
113 } else {
114 map.remove(entry.id);
115 }
116 }
117 }
118
119 /** For testing */
120 void assertMapEmpty() {
121 assert map.size() == 0;
122 }
123
124 @VisibleForTesting
125 public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
126 for (Entry entry;;) {
127 entry = map.get(id);
128 if (entry != null) {
129 synchronized (entry) {
130 if (entry.numWaiters >= numWaiters) {
131 return;
132 }
133 }
134 }
135 Thread.sleep(100);
136 }
137 }
138 }