1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.chaos.monkies;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.List;
24  
25  import org.apache.commons.lang.math.RandomUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.IntegrationTestingUtility;
29  import org.apache.hadoop.hbase.chaos.policies.Policy;
30  import org.apache.hadoop.hbase.util.Pair;
31  
32  
33  
34  
35  public class PolicyBasedChaosMonkey extends ChaosMonkey {
36  
37    private static final Log LOG = LogFactory.getLog(PolicyBasedChaosMonkey.class);
38    private static final long ONE_SEC = 1000;
39    private static final long FIVE_SEC = 5 * ONE_SEC;
40    private static final long ONE_MIN = 60 * ONE_SEC;
41  
42    public static final long TIMEOUT = ONE_MIN;
43  
44    final IntegrationTestingUtility util;
45  
46    
47  
48  
49  
50  
51    public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
52      this.util = util;
53      this.policies = policies;
54    }
55  
56    public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) {
57      this.util = util;
58      this.policies = policies.toArray(new Policy[policies.size()]);
59    }
60  
61  
62    
63    public static <T> T selectRandomItem(T[] items) {
64      return items[RandomUtils.nextInt(items.length)];
65    }
66  
67    
68    public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
69      int totalWeight = 0;
70      for (Pair<T, Integer> pair : items) {
71        totalWeight += pair.getSecond();
72      }
73  
74      int cutoff = RandomUtils.nextInt(totalWeight);
75      int cummulative = 0;
76      T item = null;
77  
78      
79      for (int i=0; i<items.size(); i++) {
80        int curWeight = items.get(i).getSecond();
81        if ( cutoff < cummulative + curWeight) {
82          item = items.get(i).getFirst();
83          break;
84        }
85        cummulative += curWeight;
86      }
87  
88      return item;
89    }
90  
91    
92    public static <T> List<T> selectRandomItems(T[] items, float ratio) {
93      int remaining = (int)Math.ceil(items.length * ratio);
94  
95      List<T> selectedItems = new ArrayList<T>(remaining);
96  
97      for (int i=0; i<items.length && remaining > 0; i++) {
98        if (RandomUtils.nextFloat() < ((float)remaining/(items.length-i))) {
99          selectedItems.add(items[i]);
100         remaining--;
101       }
102     }
103 
104     return selectedItems;
105   }
106 
107   private Policy[] policies;
108   private Thread[] monkeyThreads;
109 
110   @Override
111   public void start() throws Exception {
112     monkeyThreads = new Thread[policies.length];
113 
114     for (int i=0; i<policies.length; i++) {
115       policies[i].init(new Policy.PolicyContext(this.util));
116       Thread monkeyThread = new Thread(policies[i]);
117       monkeyThread.start();
118       monkeyThreads[i] = monkeyThread;
119     }
120   }
121 
122   @Override
123   public void stop(String why) {
124     if (policies == null) {
125       return;
126     }
127 
128     for (Policy policy : policies) {
129       policy.stop(why);
130     }
131   }
132 
133   @Override
134   public boolean isStopped() {
135     return policies[0].isStopped();
136   }
137 
138   
139 
140 
141 
142   @Override
143   public void waitForStop() throws InterruptedException {
144     if (monkeyThreads == null) {
145       return;
146     }
147     for (Thread monkeyThread : monkeyThreads) {
148       
149       monkeyThread.join();
150     }
151   }
152 
153   @Override
154   public boolean isDestructive() {
155     
156     return true;
157   }
158 }