1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.util;
21  
22  
23  import com.google.common.base.Supplier;
24  
25  import java.util.Comparator;
26  import java.util.Set;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentMap;
29  import java.util.concurrent.ConcurrentSkipListSet;
30  
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  
34  /**
35   * A simple concurrent map of sets. This is similar in concept to
36   * {@link com.google.common.collect.Multiset}, with the following exceptions:
37   * <ul>
38   *   <li>The set is thread-safe and concurrent: no external locking or
39   *   synchronization is required. This is important for the use case where
40   *   this class is used to index cached blocks by filename for their
41   *   efficient eviction from cache when the file is closed or compacted.</li>
42   *   <li>The expectation is that all entries may only be removed for a key
43   *   once no more additions of values are being made under that key.</li>
44   * </ul>
45   * @param <K> Key type
46   * @param <V> Value type
47   */
48  @InterfaceAudience.Private
49  @InterfaceStability.Evolving
50  public class ConcurrentIndex<K, V> {
51  
52    /** Container for the sets, indexed by key */
53    private final ConcurrentMap<K, Set<V>> container;
54  
55    /**
56     * A factory that constructs new instances of the sets if no set is
57     * associated with a given key.
58     */
59    private final Supplier<Set<V>> valueSetFactory;
60  
61    /**
62     * Creates an instance with a specified factory object for sets to be
63     * associated with a given key.
64     * @param valueSetFactory The factory instance
65     */
66    public ConcurrentIndex(Supplier<Set<V>> valueSetFactory) {
67      this.valueSetFactory = valueSetFactory;
68      this.container = new ConcurrentHashMap<K, Set<V>>();
69    }
70  
71    /**
72     * Creates an instance using the DefaultValueSetFactory for sets,
73     * which in turn creates instances of {@link ConcurrentSkipListSet}
74     * @param valueComparator A {@link Comparator} for value types
75     */
76    public ConcurrentIndex(Comparator<V> valueComparator) {
77      this(new DefaultValueSetFactory<V>(valueComparator));
78    }
79  
80    /**
81     * Associate a new unique value with a specified key. Under the covers, the
82     * method employs optimistic concurrency: if no set is associated with a
83     * given key, we create a new set; if another thread comes in, creates,
84     * and associates a set with the same key in the mean-time, we simply add
85     * the value to the already created set.
86     * @param key The key
87     * @param value An additional unique value we want to associate with a key
88     */
89    public void put(K key, V value) {
90      Set<V> set = container.get(key);
91      if (set != null) {
92        set.add(value);
93      } else {
94        set = valueSetFactory.get();
95        set.add(value);
96        Set<V> existing = container.putIfAbsent(key, set);
97        if (existing != null) {
98          // If a set is already associated with a key, that means another
99          // writer has already come in and created the set for the given key.
100         // Pursuant to an optimistic concurrency policy, in this case we will
101         // simply add the value to the existing set associated with the key.
102         existing.add(value);
103       }
104     }
105   }
106 
107   /**
108    * Get all values associated with a specified key or null if no values are
109    * associated. <b>Note:</b> if the caller wishes to add or removes values
110    * to under the specified as they're iterating through the returned value,
111    * they should make a defensive copy; otherwise, a
112    * {@link java.util.ConcurrentModificationException} may be thrown.
113    * @param key The key
114    * @return All values associated with the specified key or null if no values
115    *         are associated with the key.
116    */
117   public Set<V> values(K key) {
118     return container.get(key);
119   }
120 
121   /**
122    * Removes the association between a specified key and value. If as a
123    * result of removing a value a set becomes empty, we remove the given
124    * set from the mapping as well.
125    * @param key The specified key
126    * @param value The value to disassociate with the key
127    */
128   public boolean remove(K key, V value) {
129     Set<V> set = container.get(key);
130     boolean success = false;
131     if (set != null) {
132       success = set.remove(value);
133       if (set.isEmpty()) {
134         container.remove(key);
135       }
136     }
137     return success;
138   }
139 
140   /**
141    * Default factory class for the sets associated with given keys. Creates
142    * a {@link ConcurrentSkipListSet} using the comparator passed into the
143    * constructor.
144    * @see ConcurrentSkipListSet
145    * @see Supplier
146    * @param <V> The value type. Should match value type of the
147    *           ConcurrentIndex instances of this object are passed to.
148    */
149   private static class DefaultValueSetFactory<V> implements Supplier<Set<V>> {
150     private final Comparator<V> comparator;
151 
152     /**
153      * Creates an instance that passes a specified comparator to the
154      * {@link ConcurrentSkipListSet}
155      * @param comparator The specified comparator
156      */
157     public DefaultValueSetFactory(Comparator<V> comparator) {
158       this.comparator = comparator;
159     }
160 
161     /**
162      * Creates a new {@link ConcurrentSkipListSet} instance using the
163      * comparator specified when the class instance was constructed.
164      * @return The instantiated {@link ConcurrentSkipListSet} object
165      */
166     @Override
167     public Set<V> get() {
168       return new ConcurrentSkipListSet<V>(comparator);
169     }
170   }
171 }