1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.ListIterator;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.Server;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.MetaTableAccessor;
45  import org.apache.hadoop.hbase.client.HConnection;
46  import org.apache.hadoop.hbase.client.Mutation;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
49  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
50  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
51  import org.apache.hadoop.hbase.security.User;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.CancelableProgressable;
54  import org.apache.hadoop.hbase.util.ConfigUtil;
55  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56  import org.apache.hadoop.hbase.util.FSUtils;
57  import org.apache.hadoop.hbase.util.HasThread;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.util.PairOfSameType;
60  import org.apache.zookeeper.KeeperException;
61  
62  import com.google.common.util.concurrent.ThreadFactoryBuilder;
63  
64  @InterfaceAudience.Private
65  public class SplitTransactionImpl implements SplitTransaction {
66    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
67  
68    
69  
70  
71    private final HRegion parent;
72    private HRegionInfo hri_a;
73    private HRegionInfo hri_b;
74    private long fileSplitTimeout = 30000;
75    public SplitTransactionCoordination.SplitTransactionDetails std;
76    boolean useZKForAssignment;
77  
78    
79  
80  
81    private final byte [] splitrow;
82  
83    
84  
85  
86  
87    private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
88    private Server server;
89    private RegionServerServices rsServices;
90  
91    public static class JournalEntryImpl implements JournalEntry {
92      private SplitTransactionPhase type;
93      private long timestamp;
94  
95      public JournalEntryImpl(SplitTransactionPhase type) {
96        this(type, EnvironmentEdgeManager.currentTime());
97      }
98  
99      public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
100       this.type = type;
101       this.timestamp = timestamp;
102     }
103 
104     @Override
105     public String toString() {
106       StringBuilder sb = new StringBuilder();
107       sb.append(type);
108       sb.append(" at ");
109       sb.append(timestamp);
110       return sb.toString();
111     }
112 
113     @Override
114     public SplitTransactionPhase getPhase() {
115       return type;
116     }
117 
118     @Override
119     public long getTimeStamp() {
120       return timestamp;
121     }
122   }
123 
124   
125 
126 
127   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
128 
129   
130 
131 
132   private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
133 
134   
135 
136 
137 
138 
139   public SplitTransactionImpl(final Region r, final byte [] splitrow) {
140     this.parent = (HRegion)r;
141     this.splitrow = splitrow;
142     this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
143     useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf());
144   }
145 
146   private void transition(SplitTransactionPhase nextPhase) throws IOException {
147     transition(nextPhase, false);
148   }
149 
150   private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
151       throws IOException {
152     if (!isRollback) {
153       
154       
155       this.journal.add(new JournalEntryImpl(nextPhase));
156     }
157     for (int i = 0; i < listeners.size(); i++) {
158       TransactionListener listener = listeners.get(i);
159       if (!isRollback) {
160         listener.transition(this, currentPhase, nextPhase);
161       } else {
162         listener.rollback(this, currentPhase, nextPhase);
163       }
164     }
165     currentPhase = nextPhase;
166   }
167 
168   
169 
170 
171 
172 
173   public boolean prepare() throws IOException {
174     if (!this.parent.isSplittable()) return false;
175     
176     if (this.splitrow == null) return false;
177     HRegionInfo hri = this.parent.getRegionInfo();
178     parent.prepareToSplit();
179     
180     byte [] startKey = hri.getStartKey();
181     byte [] endKey = hri.getEndKey();
182     if (Bytes.equals(startKey, splitrow) ||
183         !this.parent.getRegionInfo().containsRow(splitrow)) {
184       LOG.info("Split row is not inside region key range or is equal to " +
185           "startkey: " + Bytes.toStringBinary(this.splitrow));
186       return false;
187     }
188     long rid = getDaughterRegionIdTimestamp(hri);
189     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
190     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
191 
192     transition(SplitTransactionPhase.PREPARED);
193 
194     return true;
195   }
196 
197   
198 
199 
200 
201 
202   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
203     long rid = EnvironmentEdgeManager.currentTime();
204     
205     
206     if (rid < hri.getRegionId()) {
207       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
208         " but current time here is " + rid);
209       rid = hri.getRegionId() + 1;
210     }
211     return rid;
212   }
213 
214   private static IOException closedByOtherException = new IOException(
215       "Failed to close region: already closed by another thread");
216 
217   
218 
219 
220 
221 
222 
223 
224 
225 
226 
227   @Deprecated
228   
229       final RegionServerServices services) throws IOException {
230     return createDaughters(server, services, null);
231   }
232 
233   
234       final RegionServerServices services, User user) throws IOException {
235     LOG.info("Starting split of region " + this.parent);
236     if ((server != null && server.isStopped()) ||
237         (services != null && services.isStopping())) {
238       throw new IOException("Server is stopped or stopping");
239     }
240     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
241       "Unsafe to hold write lock while performing RPCs";
242 
243     transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
244 
245     
246     if (this.parent.getCoprocessorHost() != null) {
247       if (user == null) {
248         
249         parent.getCoprocessorHost().preSplit();
250         parent.getCoprocessorHost().preSplit(splitrow);
251       } else {
252         try {
253           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
254             @Override
255             public Void run() throws Exception {
256               parent.getCoprocessorHost().preSplit();
257               parent.getCoprocessorHost().preSplit(splitrow);
258               return null;
259             }
260           });
261         } catch (InterruptedException ie) {
262           InterruptedIOException iioe = new InterruptedIOException();
263           iioe.initCause(ie);
264           throw iioe;
265         }
266       }
267     }
268 
269     transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
270 
271     
272     boolean testing = server == null? true:
273         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
274     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
275         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
276           this.fileSplitTimeout);
277 
278     PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
279 
280     final List<Mutation> metaEntries = new ArrayList<Mutation>();
281     boolean ret = false;
282     if (this.parent.getCoprocessorHost() != null) {
283       if (user == null) {
284         ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
285       } else {
286         try {
287           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
288             @Override
289             public Boolean run() throws Exception {
290               return parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
291             }
292           });
293         } catch (InterruptedException ie) {
294           InterruptedIOException iioe = new InterruptedIOException();
295           iioe.initCause(ie);
296           throw iioe;
297         }
298       }
299       if (ret) {
300           throw new IOException("Coprocessor bypassing region "
301             + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
302       }
303       try {
304         for (Mutation p : metaEntries) {
305           HRegionInfo.parseRegionName(p.getRow());
306         }
307       } catch (IOException e) {
308         LOG.error("Row key of mutation from coprossor is not parsable as region name."
309             + "Mutations from coprocessor should only for hbase:meta table.");
310         throw e;
311       }
312     }
313 
314     
315     
316     
317     
318     
319     
320     
321     
322     
323     
324     
325     
326     
327     
328     
329     transition(SplitTransactionPhase.PONR);
330 
331     
332     
333     
334     
335     
336     if (!testing && useZKForAssignment) {
337       if (metaEntries == null || metaEntries.isEmpty()) {
338         MetaTableAccessor.splitRegion(server.getConnection(),
339           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
340           daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
341           parent.getTableDesc().getRegionReplication());
342       } else {
343         offlineParentInMetaAndputMetaEntries(server.getConnection(),
344           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
345               .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
346               parent.getTableDesc().getRegionReplication());
347       }
348     } else if (services != null && !useZKForAssignment) {
349       if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
350           parent.getRegionInfo(), hri_a, hri_b)) {
351         
352         throw new IOException("Failed to notify master that split passed PONR: "
353           + parent.getRegionInfo().getRegionNameAsString());
354       }
355     }
356     return daughterRegions;
357   }
358 
359   public PairOfSameType<Region> stepsBeforePONR(final Server server,
360       final RegionServerServices services, boolean testing) throws IOException {
361 
362     if (useCoordinatedStateManager(server)) {
363       if (std == null) {
364         std =
365             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
366                 .getSplitTransactionCoordination().getDefaultDetails();
367       }
368       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
369           .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
370             hri_a, hri_b);
371     } else if (services != null && !useZKForAssignment) {
372       if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
373           parent.getRegionInfo(), hri_a, hri_b)) {
374         throw new IOException("Failed to get ok from master to split "
375           + parent.getRegionInfo().getRegionNameAsString());
376       }
377     }
378 
379     transition(SplitTransactionPhase.SET_SPLITTING);
380 
381     if (useCoordinatedStateManager(server)) {
382       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
383           .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
384             hri_b, std);
385     }
386 
387     this.parent.getRegionFileSystem().createSplitsDir();
388 
389     transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
390 
391     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
392     Exception exceptionToThrow = null;
393     try{
394       hstoreFilesToSplit = this.parent.close(false);
395     } catch (Exception e) {
396       exceptionToThrow = e;
397     }
398     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
399       
400       
401       
402       
403       
404       exceptionToThrow = closedByOtherException;
405     }
406     if (exceptionToThrow != closedByOtherException) {
407       transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
408     }
409     if (exceptionToThrow != null) {
410       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
411       throw new IOException(exceptionToThrow);
412     }
413     if (!testing) {
414       services.removeFromOnlineRegions(this.parent, null);
415     }
416 
417     transition(SplitTransactionPhase.OFFLINED_PARENT);
418 
419     
420     
421     
422     
423     
424     
425     Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
426 
427     
428     
429     
430     
431     transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
432 
433     assertReferenceFileCount(expectedReferences.getFirst(),
434         this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
435     Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
436     assertReferenceFileCount(expectedReferences.getFirst(),
437         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
438 
439     
440     transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
441 
442     assertReferenceFileCount(expectedReferences.getSecond(),
443         this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
444     Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
445     assertReferenceFileCount(expectedReferences.getSecond(),
446         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
447 
448     return new PairOfSameType<Region>(a, b);
449   }
450 
451   void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
452       throws IOException {
453     if (expectedReferenceFileCount != 0 &&
454         expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
455           dir)) {
456       throw new IOException("Failing split. Expected reference file count isn't equal.");
457     }
458   }
459 
460   
461 
462 
463 
464 
465 
466 
467 
468 
469   
470       final RegionServerServices services, Region a, Region b)
471       throws IOException {
472     boolean stopped = server != null && server.isStopped();
473     boolean stopping = services != null && services.isStopping();
474     
475     if (stopped || stopping) {
476       LOG.info("Not opening daughters " +
477           b.getRegionInfo().getRegionNameAsString() +
478           " and " +
479           a.getRegionInfo().getRegionNameAsString() +
480           " because stopping=" + stopping + ", stopped=" + stopped);
481     } else {
482       
483       DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
484       DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
485       aOpener.start();
486       bOpener.start();
487       try {
488         aOpener.join();
489         if (aOpener.getException() == null) {
490           transition(SplitTransactionPhase.OPENED_REGION_A);
491         }
492         bOpener.join();
493         if (bOpener.getException() == null) {
494           transition(SplitTransactionPhase.OPENED_REGION_B);
495         }
496       } catch (InterruptedException e) {
497         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
498       }
499       if (aOpener.getException() != null) {
500         throw new IOException("Failed " +
501           aOpener.getName(), aOpener.getException());
502       }
503       if (bOpener.getException() != null) {
504         throw new IOException("Failed " +
505           bOpener.getName(), bOpener.getException());
506       }
507       if (services != null) {
508         try {
509           if (useZKForAssignment) {
510             
511             services.postOpenDeployTasks(b);
512           } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
513               parent.getRegionInfo(), hri_a, hri_b)) {
514             throw new IOException("Failed to report split region to master: "
515               + parent.getRegionInfo().getShortNameToLog());
516           }
517           
518           services.addToOnlineRegions(b);
519           if (useZKForAssignment) {
520             services.postOpenDeployTasks(a);
521           }
522           services.addToOnlineRegions(a);
523         } catch (KeeperException ke) {
524           throw new IOException(ke);
525         }
526       }
527     }
528   }
529 
530   public PairOfSameType<Region> execute(final Server server,
531     final RegionServerServices services)
532         throws IOException {
533     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
534       LOG.warn("Should use execute(Server, RegionServerServices, User)");
535     }
536     return execute(server, services, null);
537   }
538 
539   
540 
541 
542 
543 
544 
545 
546 
547 
548 
549   @Override
550   public PairOfSameType<Region> execute(final Server server,
551       final RegionServerServices services, User user) throws IOException {
552     this.server = server;
553     this.rsServices = services;
554     useZKForAssignment = server == null ? true :
555       ConfigUtil.useZKForAssignment(server.getConfiguration());
556     if (useCoordinatedStateManager(server)) {
557       std =
558           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
559               .getSplitTransactionCoordination().getDefaultDetails();
560     }
561     PairOfSameType<Region> regions = createDaughters(server, services, user);
562     if (this.parent.getCoprocessorHost() != null) {
563       if (user == null) {
564         parent.getCoprocessorHost().preSplitAfterPONR();
565       } else {
566         try {
567           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
568             @Override
569             public Void run() throws Exception {
570               parent.getCoprocessorHost().preSplitAfterPONR();
571               return null;
572             }
573           });
574         } catch (InterruptedException ie) {
575           InterruptedIOException iioe = new InterruptedIOException();
576           iioe.initCause(ie);
577           throw iioe;
578         }
579       }
580     }
581     regions = stepsAfterPONR(server, services, regions, user);
582 
583     transition(SplitTransactionPhase.COMPLETED);
584 
585     return regions;
586   }
587 
588 	@Deprecated
589   public PairOfSameType<Region> stepsAfterPONR(final Server server,
590       final RegionServerServices services, final PairOfSameType<Region> regions)
591       throws IOException {
592     return stepsAfterPONR(server, services, regions, null);
593   }
594 
595   public PairOfSameType<Region> stepsAfterPONR(final Server server,
596       final RegionServerServices services, final PairOfSameType<Region> regions, User user)
597       throws IOException {
598     openDaughters(server, services, regions.getFirst(), regions.getSecond());
599     if (useCoordinatedStateManager(server)) {
600       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
601           .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
602             regions.getSecond(), std, parent);
603     }
604 
605     transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
606 
607     
608     if (parent.getCoprocessorHost() != null) {
609       if (user == null) {
610         this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
611       } else {
612         try {
613           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
614             @Override
615             public Void run() throws Exception {
616               parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
617               return null;
618             }
619           });
620         } catch (InterruptedException ie) {
621           InterruptedIOException iioe = new InterruptedIOException();
622           iioe.initCause(ie);
623           throw iioe;
624         }
625       }
626     }
627 
628     transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
629 
630     return regions;
631   }
632 
633   private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
634       HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
635       ServerName serverName, List<Mutation> metaEntries, int regionReplication)
636           throws IOException {
637     List<Mutation> mutations = metaEntries;
638     HRegionInfo copyOfParent = new HRegionInfo(parent);
639     copyOfParent.setOffline(true);
640     copyOfParent.setSplit(true);
641 
642     
643     Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
644     MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
645     mutations.add(putParent);
646     
647     
648     Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
649     Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
650 
651     addLocation(putA, serverName, 1); 
652     addLocation(putB, serverName, 1);
653     mutations.add(putA);
654     mutations.add(putB);
655 
656     
657     
658     for (int i = 1; i < regionReplication; i++) {
659       addEmptyLocation(putA, i);
660       addEmptyLocation(putB, i);
661     }
662 
663     MetaTableAccessor.mutateMetaTable(hConnection, mutations);
664   }
665 
666   private static Put addEmptyLocation(final Put p, int replicaId){
667     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
668     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
669       null);
670     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
671     return p;
672   }
673 
674   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
675     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
676       Bytes.toBytes(sn.getHostAndPort()));
677     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
678       Bytes.toBytes(sn.getStartcode()));
679     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
680         Bytes.toBytes(openSeqNum));
681     return p;
682   }
683 
684   
685 
686 
687 
688   class DaughterOpener extends HasThread {
689     private final Server server;
690     private final HRegion r;
691     private Throwable t = null;
692 
693     DaughterOpener(final Server s, final HRegion r) {
694       super((s == null? "null-services": s.getServerName()) +
695         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
696       setDaemon(true);
697       this.server = s;
698       this.r = r;
699     }
700 
701     
702 
703 
704 
705     Throwable getException() {
706       return this.t;
707     }
708 
709     @Override
710     public void run() {
711       try {
712         openDaughterRegion(this.server, r);
713       } catch (Throwable t) {
714         this.t = t;
715       }
716     }
717   }
718 
719   
720 
721 
722 
723 
724 
725 
726   void openDaughterRegion(final Server server, final HRegion daughter)
727   throws IOException, KeeperException {
728     HRegionInfo hri = daughter.getRegionInfo();
729     LoggingProgressable reporter = server == null ? null
730         : new LoggingProgressable(hri, server.getConfiguration().getLong(
731             "hbase.regionserver.split.daughter.open.log.interval", 10000));
732     daughter.openHRegion(reporter);
733   }
734 
735   static class LoggingProgressable implements CancelableProgressable {
736     private final HRegionInfo hri;
737     private long lastLog = -1;
738     private final long interval;
739 
740     LoggingProgressable(final HRegionInfo hri, final long interval) {
741       this.hri = hri;
742       this.interval = interval;
743     }
744 
745     @Override
746     public boolean progress() {
747       long now = EnvironmentEdgeManager.currentTime();
748       if (now - lastLog > this.interval) {
749         LOG.info("Opening " + this.hri.getRegionNameAsString());
750         this.lastLog = now;
751       }
752       return true;
753     }
754   }
755 
756   private boolean useCoordinatedStateManager(final Server server) {
757     return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
758   }
759 
760   
761 
762 
763 
764 
765 
766   private Pair<Integer, Integer> splitStoreFiles(
767       final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
768       throws IOException {
769     if (hstoreFilesToSplit == null) {
770       
771       throw new IOException("Close returned empty list of StoreFiles");
772     }
773     
774     
775     
776     int nbFiles = 0;
777     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
778         nbFiles += entry.getValue().size();
779     }
780     if (nbFiles == 0) {
781       
782       return new Pair<Integer, Integer>(0,0);
783     }
784     
785     
786     int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
787                 HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
788             Runtime.getRuntime().availableProcessors());
789     
790     int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
791                 defMaxThreads), nbFiles);
792     LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
793             " using " + maxThreads + " threads");
794     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
795     builder.setNameFormat("StoreFileSplitter-%1$d");
796     ThreadFactory factory = builder.build();
797     ThreadPoolExecutor threadPool =
798       (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
799     List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
800 
801     
802     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
803       for (StoreFile sf: entry.getValue()) {
804         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
805         futures.add(threadPool.submit(sfs));
806       }
807     }
808     
809     threadPool.shutdown();
810 
811     
812     try {
813       boolean stillRunning = !threadPool.awaitTermination(
814           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
815       if (stillRunning) {
816         threadPool.shutdownNow();
817         
818         while (!threadPool.isTerminated()) {
819           Thread.sleep(50);
820         }
821         throw new IOException("Took too long to split the" +
822             " files and create the references, aborting split");
823       }
824     } catch (InterruptedException e) {
825       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
826     }
827 
828     int created_a = 0;
829     int created_b = 0;
830     
831     for (Future<Pair<Path, Path>> future : futures) {
832       try {
833         Pair<Path, Path> p = future.get();
834         created_a += p.getFirst() != null ? 1 : 0;
835         created_b += p.getSecond() != null ? 1 : 0;
836       } catch (InterruptedException e) {
837         throw (InterruptedIOException) new InterruptedIOException().initCause(e);
838       } catch (ExecutionException e) {
839         throw new IOException(e);
840       }
841     }
842 
843     if (LOG.isDebugEnabled()) {
844       LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
845           + " storefiles, Daughter B: " + created_b + " storefiles.");
846     }
847     return new Pair<Integer, Integer>(created_a, created_b);
848   }
849 
850   private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
851       throws IOException {
852     if (LOG.isDebugEnabled()) {
853         LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
854                   this.parent);
855     }
856     HRegionFileSystem fs = this.parent.getRegionFileSystem();
857     String familyName = Bytes.toString(family);
858 
859     Path path_a =
860         fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
861           this.parent.getSplitPolicy());
862     Path path_b =
863         fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
864           this.parent.getSplitPolicy());
865     if (LOG.isDebugEnabled()) {
866         LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
867                   this.parent);
868     }
869     return new Pair<Path,Path>(path_a, path_b);
870   }
871 
872   
873 
874 
875 
876   class StoreFileSplitter implements Callable<Pair<Path,Path>> {
877     private final byte[] family;
878     private final StoreFile sf;
879 
880     
881 
882 
883 
884 
885     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
886       this.sf = sf;
887       this.family = family;
888     }
889 
890     public Pair<Path,Path> call() throws IOException {
891       return splitStoreFile(family, sf);
892     }
893   }
894   
895   @Override
896   public boolean rollback(final Server server, final RegionServerServices services)
897       throws IOException {
898     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
899       LOG.warn("Should use rollback(Server, RegionServerServices, User)");
900     }
901     return rollback(server, services, null);
902   }
903 
904   
905 
906 
907 
908 
909 
910 
911   @Override
912   @SuppressWarnings("deprecation")
913   public boolean rollback(final Server server, final RegionServerServices services, User user)
914   throws IOException {
915     
916     if (this.parent.getCoprocessorHost() != null) {
917       if (user == null) {
918         this.parent.getCoprocessorHost().preRollBackSplit();
919       } else {
920         try {
921           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
922             @Override
923             public Void run() throws Exception {
924               parent.getCoprocessorHost().preRollBackSplit();
925               return null;
926             }
927           });
928         } catch (InterruptedException ie) {
929           InterruptedIOException iioe = new InterruptedIOException();
930           iioe.initCause(ie);
931           throw iioe;
932         }
933       }
934     }
935 
936     boolean result = true;
937     ListIterator<JournalEntry> iterator =
938       this.journal.listIterator(this.journal.size());
939     
940     while (iterator.hasPrevious()) {
941       JournalEntry je = iterator.previous();
942 
943       transition(je.getPhase(), true);
944 
945       switch(je.getPhase()) {
946 
947       case SET_SPLITTING:
948         if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
949           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
950               .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
951         } else if (services != null && !useZKForAssignment
952             && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
953                 parent.getRegionInfo(), hri_a, hri_b)) {
954           return false;
955         }
956         break;
957 
958       case CREATE_SPLIT_DIR:
959         this.parent.writestate.writesEnabled = true;
960         this.parent.getRegionFileSystem().cleanupSplitsDir();
961         break;
962 
963       case CLOSED_PARENT_REGION:
964         try {
965           
966           
967           
968           
969           
970           this.parent.initialize();
971         } catch (IOException e) {
972           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
973             this.parent.getRegionInfo().getRegionNameAsString(), e);
974           throw new RuntimeException(e);
975         }
976         break;
977 
978       case STARTED_REGION_A_CREATION:
979         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
980         break;
981 
982       case STARTED_REGION_B_CREATION:
983         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
984         break;
985 
986       case OFFLINED_PARENT:
987         if (services != null) services.addToOnlineRegions(this.parent);
988         break;
989 
990       case PONR:
991         
992         
993         
994         
995         return false;
996 
997       
998       case STARTED:
999       case PREPARED:
1000       case BEFORE_PRE_SPLIT_HOOK:
1001       case AFTER_PRE_SPLIT_HOOK:
1002       case BEFORE_POST_SPLIT_HOOK:
1003       case AFTER_POST_SPLIT_HOOK:
1004       case OPENED_REGION_A:
1005       case OPENED_REGION_B:
1006       case COMPLETED:
1007         break;
1008 
1009       default:
1010         throw new RuntimeException("Unhandled journal entry: " + je);
1011       }
1012     }
1013     
1014     if (this.parent.getCoprocessorHost() != null) {
1015       if (user == null) {
1016         this.parent.getCoprocessorHost().postRollBackSplit();
1017       } else {
1018         try {
1019           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1020             @Override
1021             public Void run() throws Exception {
1022               parent.getCoprocessorHost().postRollBackSplit();
1023               return null;
1024             }
1025           });
1026         } catch (InterruptedException ie) {
1027           InterruptedIOException iioe = new InterruptedIOException();
1028           iioe.initCause(ie);
1029           throw iioe;
1030         }
1031       }
1032     }
1033     return result;
1034   }
1035 
1036   HRegionInfo getFirstDaughter() {
1037     return hri_a;
1038   }
1039 
1040   HRegionInfo getSecondDaughter() {
1041     return hri_b;
1042   }
1043 
1044   @Override
1045   public List<JournalEntry> getJournal() {
1046     return journal;
1047   }
1048 
1049   @Override
1050   public SplitTransaction registerTransactionListener(TransactionListener listener) {
1051     listeners.add(listener);
1052     return this;
1053   }
1054 
1055   @Override
1056   public Server getServer() {
1057     return server;
1058   }
1059 
1060   @Override
1061   public RegionServerServices getRegionServerServices() {
1062     return rsServices;
1063   }
1064 }