1 /*
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase.util;
20
21 import java.util.concurrent.locks.ReentrantReadWriteLock;
22
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24
25 import com.google.common.annotations.VisibleForTesting;
26
27 /**
28 * Allows multiple concurrent clients to lock on a numeric id with ReentrantReadWriteLock. The
29 * intended usage for read lock is as follows:
30 *
31 * <pre>
32 * ReentrantReadWriteLock lock = idReadWriteLock.getLock(id);
33 * try {
34 * lock.readLock().lock();
35 * // User code.
36 * } finally {
37 * lock.readLock().unlock();
38 * }
39 * </pre>
40 *
41 * For write lock, use lock.writeLock()
42 */
43 @InterfaceAudience.Private
44 public class IdReadWriteLock {
45 // The number of lock we want to easily support. It's not a maximum.
46 private static final int NB_CONCURRENT_LOCKS = 1000;
47 // The pool to get entry from, entries are mapped by weak reference to make it able to be
48 // garbage-collected asap
49 private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool =
50 new WeakObjectPool<Long, ReentrantReadWriteLock>(
51 new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
52 @Override
53 public ReentrantReadWriteLock createObject(Long id) {
54 return new ReentrantReadWriteLock();
55 }
56 }, NB_CONCURRENT_LOCKS);
57
58 /**
59 * Get the ReentrantReadWriteLock corresponding to the given id
60 * @param id an arbitrary number to identify the lock
61 */
62 public ReentrantReadWriteLock getLock(long id) {
63 lockPool.purge();
64 ReentrantReadWriteLock readWriteLock = lockPool.get(id);
65 return readWriteLock;
66 }
67
68 /** For testing */
69 @VisibleForTesting
70 int purgeAndGetEntryPoolSize() {
71 System.gc();
72 Threads.sleep(200);
73 lockPool.purge();
74 return lockPool.size();
75 }
76
77 @VisibleForTesting
78 public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
79 for (ReentrantReadWriteLock readWriteLock;;) {
80 readWriteLock = lockPool.get(id);
81 if (readWriteLock != null) {
82 synchronized (readWriteLock) {
83 if (readWriteLock.getQueueLength() >= numWaiters) {
84 return;
85 }
86 }
87 }
88 Thread.sleep(50);
89 }
90 }
91 }