1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.LinkedHashSet;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.util.Bytes;
27 import org.apache.hadoop.hbase.util.ClassSize;
28
29 /**
30 * Manages the read/write consistency within memstore. This provides
31 * an interface for readers to determine what entries to ignore, and
32 * a mechanism for writers to obtain new write numbers, then "commit"
33 * the new writes for readers to read (thus forming atomic transactions).
34 */
35 @InterfaceAudience.Private
36 public class MultiVersionConsistencyControl {
37 static final long NO_WRITE_NUMBER = 0;
38 private volatile long memstoreRead = 0;
39 private final Object readWaiters = new Object();
40
41 // This is the pending queue of writes.
42 private final LinkedHashSet<WriteEntry> writeQueue =
43 new LinkedHashSet<WriteEntry>();
44
45 /**
46 * Default constructor. Initializes the memstoreRead/Write points to 0.
47 */
48 public MultiVersionConsistencyControl() {
49 }
50
51 /**
52 * Initializes the memstoreRead/Write points appropriately.
53 * @param startPoint
54 */
55 public void initialize(long startPoint) {
56 synchronized (writeQueue) {
57 writeQueue.clear();
58 memstoreRead = startPoint;
59 }
60 }
61
62 /**
63 *
64 * @param initVal The value we used initially and expected it'll be reset later
65 * @return WriteEntry instance.
66 */
67 WriteEntry beginMemstoreInsert() {
68 return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
69 }
70
71 /**
72 * Get a mvcc write number before an actual one(its log sequence Id) being assigned
73 * @param sequenceId
74 * @return long a faked write number which is bigger enough not to be seen by others before a real
75 * one is assigned
76 */
77 public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
78 // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
79 // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
80 // because each handler could increment sequence num twice and max concurrent in-flight
81 // transactions is the number of RPC handlers.
82 // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
83 // changes touch same row key.
84 // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
85 // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all.
86 // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done?
87 return sequenceId.incrementAndGet() + 1000000000;
88 }
89
90 /**
91 * This function starts a MVCC transaction with current region's log change sequence number. Since
92 * we set change sequence number when flushing current change to WAL(late binding), the flush
93 * order may differ from the order to start a MVCC transaction. For example, a change begins a
94 * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
95 * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
96 * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
97 * big number is safe because we only need it to prevent current change being seen and the number
98 * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
99 * for MVCC to align with flush sequence.
100 * @param curSeqNum
101 * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
102 */
103 public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
104 return beginMemstoreInsertWithSeqNum(curSeqNum, false);
105 }
106
107 private WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum, boolean complete) {
108 WriteEntry e = new WriteEntry(curSeqNum);
109 if (complete) {
110 e.markCompleted();
111 }
112 synchronized (writeQueue) {
113 writeQueue.add(e);
114 return e;
115 }
116 }
117
118 /**
119 * Complete a {@link WriteEntry} that was created by
120 * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
121 * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
122 * visible to MVCC readers.
123 * @throws IOException
124 */
125 public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
126 throws IOException {
127 if(e == null) return;
128 if (seqId != null) {
129 e.setWriteNumber(seqId.getSequenceId());
130 } else {
131 // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
132 // function beginMemstoreInsertWithSeqNum in case of failures
133 e.setWriteNumber(NO_WRITE_NUMBER);
134 }
135 waitForPreviousTransactionsComplete(e);
136 }
137
138 /**
139 * Cancel a write insert that failed.
140 * Removes the write entry without advancing read point or without interfering with write
141 * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method
142 * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see
143 * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark
144 * it as for special handling).
145 * @param writeEntry Failed attempt at write. Does cleanup.
146 */
147 public void cancelMemstoreInsert(WriteEntry writeEntry) {
148 // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance
149 // readpoint and gets my little writeEntry completed and removed from queue of outstanding
150 // events which seems right. St.Ack 20150901.
151 writeEntry.setWriteNumber(NO_WRITE_NUMBER);
152 advanceMemstore(writeEntry);
153 }
154
155 /**
156 * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
157 * end of this call, the global read point is at least as large as the write point of the passed
158 * in WriteEntry. Thus, the write is visible to MVCC readers.
159 */
160 public void completeMemstoreInsert(WriteEntry e) {
161 waitForPreviousTransactionsComplete(e);
162 }
163
164 /**
165 * Mark the {@link WriteEntry} as complete and advance the read point as
166 * much as possible.
167 *
168 * How much is the read point advanced?
169 * Let S be the set of all write numbers that are completed and where all previous write numbers
170 * are also completed. Then, the read point is advanced to the supremum of S.
171 *
172 * @param e
173 * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
174 */
175 boolean advanceMemstore(WriteEntry e) {
176 long nextReadValue = -1;
177 synchronized (writeQueue) {
178 e.markCompleted();
179
180 while (!writeQueue.isEmpty()) {
181 WriteEntry queueFirst = writeQueue.iterator().next();
182 if (queueFirst.isCompleted()) {
183 // Using Max because Edit complete in WAL sync order not arriving order
184 nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
185 writeQueue.remove(queueFirst);
186 } else {
187 break;
188 }
189 }
190
191 if (nextReadValue > memstoreRead) {
192 memstoreRead = nextReadValue;
193 }
194
195 // notify waiters on writeQueue before return
196 writeQueue.notifyAll();
197 }
198
199 if (nextReadValue > 0) {
200 synchronized (readWaiters) {
201 readWaiters.notifyAll();
202 }
203 }
204
205 if (memstoreRead >= e.getWriteNumber()) {
206 return true;
207 }
208 return false;
209 }
210
211 /**
212 * Advances the current read point to be given seqNum if it is smaller than
213 * that.
214 */
215 void advanceMemstoreReadPointIfNeeded(long seqNum) {
216 synchronized (writeQueue) {
217 if (this.memstoreRead < seqNum) {
218 memstoreRead = seqNum;
219 }
220 }
221 }
222
223 /**
224 * Wait for all previous MVCC transactions complete
225 */
226 public void waitForPreviousTransactionsComplete() {
227 WriteEntry w = beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER, true);
228 waitForPreviousTransactionsComplete(w);
229 }
230
231 public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
232 boolean interrupted = false;
233 WriteEntry w = waitedEntry;
234 w.markCompleted();
235
236 try {
237 WriteEntry firstEntry = null;
238 do {
239 synchronized (writeQueue) {
240 if (writeQueue.isEmpty()) {
241 break;
242 }
243 firstEntry = writeQueue.iterator().next();
244 if (firstEntry == w) {
245 // all previous in-flight transactions are done
246 break;
247 }
248 // WriteEntry already was removed from the queue by another handler
249 if (!writeQueue.contains(w)) {
250 break;
251 }
252 try {
253 writeQueue.wait(0);
254 } catch (InterruptedException ie) {
255 // We were interrupted... finish the loop -- i.e. cleanup --and then
256 // on our way out, reset the interrupt flag.
257 interrupted = true;
258 break;
259 }
260 }
261 } while (firstEntry != null);
262 } finally {
263 advanceMemstore(w);
264 }
265 if (interrupted) {
266 Thread.currentThread().interrupt();
267 }
268 }
269
270 public long memstoreReadPoint() {
271 return memstoreRead;
272 }
273
274 public static class WriteEntry {
275 private long writeNumber;
276 private volatile boolean completed = false;
277
278 WriteEntry(long writeNumber) {
279 this.writeNumber = writeNumber;
280 }
281 void markCompleted() {
282 this.completed = true;
283 }
284 boolean isCompleted() {
285 return this.completed;
286 }
287 long getWriteNumber() {
288 return this.writeNumber;
289 }
290 void setWriteNumber(long val){
291 this.writeNumber = val;
292 }
293 }
294
295 public static final long FIXED_SIZE = ClassSize.align(
296 ClassSize.OBJECT +
297 2 * Bytes.SIZEOF_LONG +
298 2 * ClassSize.REFERENCE);
299
300 }