Storm测试
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm测试相关的知识,希望对你有一定的参考价值。
package storm.scheduler; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.log4j.Logger; import backtype.storm.scheduler.Cluster; import backtype.storm.scheduler.EvenScheduler; import backtype.storm.scheduler.ExecutorDetails; import backtype.storm.scheduler.IScheduler; import backtype.storm.scheduler.SupervisorDetails; import backtype.storm.scheduler.Topologies; import backtype.storm.scheduler.TopologyDetails; import backtype.storm.scheduler.WorkerSlot; /** * 基于Storm Topology 热边的调度算法 * * @author wxweven * @version 1.0 * @email [email protected] * @blog http://wxweven.com * @Copyright: Copyright (c) wxweven 2009 - 2016 */ public class OnlineScheduler implements IScheduler { private static final int DEFAULT_RESCHEDULE_TIMEOUT = 180; // 单位为s private Logger logger = Logger.getLogger(OnlineScheduler.class); private AssignmentTracker assignmentTracker = new AssignmentTracker(); private long lastRescheduling; /** * 重写IScheduler中的方法,实现具体的调度算法 */ @Override public void schedule(Topologies topologies, Cluster cluster) { logger.info("HotEdge Scheduler"); if (!topologies.getTopologies().isEmpty()) { int rescheduleTimeout = DEFAULT_RESCHEDULE_TIMEOUT; for (TopologyDetails topology : topologies.getTopologies()) { rescheduleTimeout = Integer.parseInt(topology.getConf() .get(Utils.RESCHEDULE_TIMEOUT).toString()); } long now = System.currentTimeMillis(); long elapsedTime = (now - lastRescheduling) / 1000; // s if (lastRescheduling == 0 || elapsedTime >= rescheduleTimeout) { //执行具体的调度算法 doSchedule(topologies, cluster); } } new EvenScheduler().schedule(topologies, cluster); assignmentTracker.checkAssignment(topologies, cluster); } /** * 实现具体的改进调度算法 * * @param topologies * 提交的Topology * @param cluster * 集群 */ private void doSchedule(Topologies topologies, Cluster cluster) { try { // 从数据库中获取提交的Topology List<String> dbTopologies = DataManager.getInstance().getTopologies(); List<String> topologiesToBeRemoved = new ArrayList<String>(dbTopologies); List<String> stormTopologyList = new ArrayList<String>(); int trafficImprovement = 0; for (TopologyDetails topology : topologies.getTopologies()) { topologiesToBeRemoved.remove(topology.getId()); stormTopologyList.add(topology.getId()); for (Object key : topology.getConf().keySet()) { logger.debug("- " + key + ": " + topology.getConf().get(key)); } trafficImprovement = Integer.parseInt(topology.getConf() .get(Utils.TRAFFIC_IMPROVEMENT).toString()); } dbTopologies.removeAll(topologiesToBeRemoved); if (!topologiesToBeRemoved.isEmpty()) { DataManager.getInstance().removeTopologies(topologiesToBeRemoved); logger.info("Topologies succesfully removed from DB"); } // 计算调度策略 TrafficManager.getInstance().clear(); computeBestScheduling(dbTopologies, topologies, cluster); Map<Node, List<Slot>> bestAssignment = TrafficManager.getInstance().getAssignments(); int bestInterNodeTraffic = TrafficManager.getInstance().computeInterNodeTraffic(); int currentInterNodeTraffic = DataManager.getInstance().getCurrentInterNodeTraffic(); List<Node> overloadedNodeList = DataManager.getInstance().getOverloadedNodes(); logger.info("These nodes are currently overloaded: " + Utils.collectionToString(overloadedNodeList)); logger.info("Currently, the inter-node traffic is " + currentInterNodeTraffic + " tuple/s"); if (bestAssignment != null) { logger.info("The best assignment can lead to an inter-node traffic of " + bestInterNodeTraffic + " tuple/s"); boolean reschedulingDueToOverloading = false; boolean reschedulingDueToInterNodeTraffic = false; if (!overloadedNodeList.isEmpty()) { logger.info("Check how the new assignment can offload some of the currently overloaded nodes"); for (Node node : overloadedNodeList) { Node nodeAfterTheAssignment = null; for (Node n : bestAssignment.keySet()) { if (n.equals(node)) { nodeAfterTheAssignment = n; break; } } if (nodeAfterTheAssignment != null) { logger.info("Node " + node.getName() + " currenlty has a load of " + node.getLoad() + " Hz/s, and the assignment can lead to " + nodeAfterTheAssignment.getLoad()); if (node.getLoad() > nodeAfterTheAssignment.getLoad()) { reschedulingDueToOverloading = true; } } else { logger.warn("Node " + node.getName() + " currenlty has a load of " + node.getLoad() + " Hz/s, but it doesn‘t appear in the new assignment"); } } } if (reschedulingDueToOverloading) { logger.info("A rescheduling is required to offload currently overloaded nodes"); } int trafficThreshold = (int) (currentInterNodeTraffic * (1 - (float) trafficImprovement / 100)); logger.info("Minimum traffic threshold is " + trafficThreshold + " tuple/s"); if (trafficThreshold >= bestInterNodeTraffic) { logger.info("A rescheduling is required to lower inter-node traffic"); reschedulingDueToInterNodeTraffic = true; } if (reschedulingDueToInterNodeTraffic || reschedulingDueToOverloading) { logger.info("Let‘s apply the best assignment!!"); lastRescheduling = System.currentTimeMillis(); // free all available slots for (SupervisorDetails supervisor : cluster.getSupervisors().values()) { List<Integer> usedPorts = cluster.getUsedPorts(supervisor); for (int usedPort : usedPorts) { cluster.freeSlot(new WorkerSlot(supervisor.getId(), usedPort)); } } for (Node node : bestAssignment.keySet()) { SupervisorDetails supervisor = cluster.getSupervisorsByHost(node.getName()) .get(0); List<WorkerSlot> availableSlots = cluster.getAvailableSlots(supervisor); int slotIndex = 0; for (Slot slot : bestAssignment.get(node)) { logger.info("Assigning executors of slot " + slot); String topology = slot.getTopology().getTopologyID(); List<ExecutorDetails> executorList = new ArrayList<ExecutorDetails>(); // here a match is required to link scheduler // executors to storm executors Collection<ExecutorDetails> allTopologyExecutors = topologies.getById( topology).getExecutors(); for (Executor executor : slot.getExecutors()) { for (ExecutorDetails executorDetails : allTopologyExecutors) { if (executor.match(executorDetails)) { executorList.add(executorDetails); break; } } } cluster.assign(availableSlots.get(slotIndex), topology, executorList); logger.info("We assigned executors:" + Utils.collectionToString(executorList) + " to slot: [" + availableSlots.get(slotIndex).getNodeId() + ", " + availableSlots.get(slotIndex).getPort() + "]"); slotIndex++; } } DataManager.getInstance().removeTopologies(dbTopologies); } } else { logger.info("No assignment has been simulated"); } } catch (Throwable t) { logger.error("Error occurred during scheduling", t); } } private void computeBestScheduling(List<String> dbTopologies, Topologies stormTopologies, Cluster cluster) throws Exception { logger.info("-- First phase --"); List<Topology> topologyList = new ArrayList<Topology>(); for (String topologyID : dbTopologies) { logger.info("Topology ID: " + topologyID); TopologyDetails topologyDetails = stormTopologies.getById(topologyID); Topology topology = new Topology(topologyDetails); topology.setTotalLoad(DataManager.getInstance().getTotalLoad(topologyID)); topologyList.add(topology); List<ExecutorPair> interExecutorTrafficList = TrafficManager.getInstance() .getInterExecutorTrafficList(topologyID); logger.info("Inter-executor traffic stats: " + Utils.collectionToString(interExecutorTrafficList)); if (interExecutorTrafficList.isEmpty()) { logger.info("Traffic stats are not complete yet, skip this topology"); } else { for (ExecutorPair executorPair : interExecutorTrafficList) { logger.debug("Executor pair: " + executorPair); List<Slot> slotList = topology.getContainingSlotList(executorPair.getSource(), executorPair.getDestination()); logger.debug("Slots that already contain either executors: " + Utils.collectionToString(slotList)); if (slotList.isEmpty()) { logger.debug("Both executors have not been assigned yet, try to add them to the least loaded slot"); Slot leastLoadedSlot = topology.getLeastLoadedSlot( executorPair.getSource(), executorPair.getDestination()); if (leastLoadedSlot != null) { logger.debug("Least loaded slot able to get both the executors: " + leastLoadedSlot); leastLoadedSlot.assign(executorPair.getSource()); leastLoadedSlot.assign(executorPair.getDestination()); logger.info("Executors " + executorPair.getSource() + " and " + executorPair.getDestination() + " assigned to slot " + leastLoadedSlot); } else { logger.debug("No slot exists that can get both the executors, assign them to distinct slots"); leastLoadedSlot = topology.getLeastLoadedSlot(executorPair.getSource()); if (leastLoadedSlot == null) { throw new RuntimeException( "Cannot find a slot able to get executor " + executorPair.getSource() + " for topology " + topology); } logger.debug("Least loaded slot for source executor: " + leastLoadedSlot); leastLoadedSlot.assign(executorPair.getSource()); logger.info("Executor " + executorPair.getSource() + " assigned to slot " + leastLoadedSlot); if (leastLoadedSlot.canAccept(executorPair.getDestination())) { logger.debug("After having added executor " + executorPair.getSource() + ", the slot " + leastLoadedSlot + " can also get the executor " + executorPair.getDestination()); leastLoadedSlot.assign(executorPair.getDestination()); } else { logger.debug("After having added executor " + executorPair.getSource() + ", the slot " + leastLoadedSlot + " cannot get the executor " + executorPair.getDestination()); leastLoadedSlot = topology.getLeastLoadedSlot(executorPair .getDestination()); if (leastLoadedSlot == null) { throw new RuntimeException( "Cannot find a slot able to get executor " + executorPair.getDestination() + " for topology " + topology); } logger.debug("Least loaded slot for destination executor: " + leastLoadedSlot); leastLoadedSlot.assign(executorPair.getDestination()); } logger.info("Executor " + executorPair.getDestination() + " assigned to slot " + leastLoadedSlot); } } else { logger.debug("Some executor has been already assigned, compute the best assignment using the slot(s) found before and the least loaded one"); Slot leastLoadedSlot = topology.getLeastLoadedSlot(executorPair); logger.debug("Least loaded slot: " + leastLoadedSlot); if (leastLoadedSlot != null && !slotList.contains(leastLoadedSlot)) { slotList.add(leastLoadedSlot); } logger.debug("Slots to use: " + Utils.collectionToString(slotList)); logger.debug("Remove source and destination from the slots they are currently assigned to"); for (Slot slot : slotList) { if (slot.contains(executorPair.getSource())) { slot.remove(executorPair.getSource()); } if (slot.contains(executorPair.getDestination())) { slot.remove(executorPair.getDestination()); } } logger.debug("Slots to use after such removals: " + Utils.collectionToString(slotList)); logger.debug("Check every possible combination"); Slot bestSlotForSource = null; Slot bestSlotForDestination = null; int minInterSlotTraffic = -1; for (Slot slotForSource : slotList) { for (Slot slotForDestination : slotList) { logger.debug("Assigning executor " + executorPair.getSource() + " to slot " + slotForSource + " and executor " + executorPair.getDestination() + " to slot " + slotForDestination + "..."); boolean assignmentOk = true; if (slotForSource.canAccept(executorPair.getSource())) { slotForSource.assign(executorPair.getSource()); } else { logger.debug("Slot " + slotForSource + " is imbalanced, cannot be used to add more executors"); assignmentOk = false; } if (slotForDestination.canAccept(executorPair.getDestination())) { slotForDestination.assign(executorPair.getDestination()); } else { logger.debug("Slot " + slotForDestination + " is imbalanced, cannot be used to add more executors"); assignmentOk = false; } if (assignmentOk) { int interSlotTraffic = TrafficManager.getInstance() .computeInterSlotTraffic(topologyID); logger.debug("...the inter-slot traffic is " + interSlotTraffic + " tuple/s"); if (minInterSlotTraffic == -1 || interSlotTraffic < minInterSlotTraffic) { bestSlotForSource = slotForSource; bestSlotForDestination = slotForDestination; minInterSlotTraffic = interSlotTraffic; } } if (slotForSource.contains(executorPair.getSource())) { slotForSource.remove(executorPair.getSource()); } if (slotForDestination.contains(executorPair.getDestination())) { slotForDestination.remove(executorPair.getDestination()); } } } if (bestSlotForSource == null || bestSlotForDestination == null) { throw new Exception("Cannot find a possible assignment of executors " + executorPair.getSource() + " and " + executorPair.getDestination() + " to slots " + Utils.collectionToString(slotList)); } logger.debug("The best assignment is executor " + executorPair.getSource() + " to slot " + bestSlotForSource + " and executor " + executorPair.getDestination() + " to slot " + bestSlotForDestination + ", with inter-slot traffic " + minInterSlotTraffic + " tuple/s"); bestSlotForSource.assign(executorPair.getSource()); bestSlotForDestination.assign(executorPair.getDestination()); logger.info("Executor " + executorPair.getSource() + " assigned to slot " + bestSlotForSource); logger.info("Executor " + executorPair.getDestination() + " assigned to slot " + bestSlotForDestination); } /* end if (!slotList.isEmpty()) */ logger.debug("Assignment of executors " + executorPair + " completed"); } /* end for (ExecutorPair executorPair : executorPairList) */ logger.info("Current assignment: " + Utils.collectionToString(topology.getSlots())); logger.info("Check for empty slots"); List<Slot> emptySlotList = topology.getEmptySlots(); if (emptySlotList.isEmpty()) { logger.info("No empty slots, the assignment is succesfully completed"); } else { logger.info("Empty slots: " + Utils.collectionToString(emptySlotList)); List<Slot> usedSlotList = topology.getUsedSlots(); for (Slot emptySlot : emptySlotList) { logger.debug("Find an executor to assign to slot " + emptySlot); Executor bestExecutor = null; Slot bestSlot = null; int bestInterSlotTraffic = -1; for (Slot usedSlot : usedSlotList) { if (usedSlot.getExecutors().size() > 1) { logger.debug("Check the executors of slot " + usedSlot); List<Executor> executorList = new ArrayList<Executor>( usedSlot.getExecutors()); for (Executor executor : executorList) { usedSlot.remove(executor); emptySlot.assign(executor); int interSlotTraffic = TrafficManager.getInstance() .computeInterSlotTraffic(topologyID); logger.debug("Moving executor " + executor + ", the inter-slot traffic is " + interSlotTraffic + " tuple/s"); if (bestInterSlotTraffic == -1 || interSlotTraffic < bestInterSlotTraffic) { bestExecutor = executor; bestSlot = usedSlot; bestInterSlotTraffic = interSlotTraffic; } emptySlot.remove(executor); usedSlot.assign(executor); } } } if (bestSlot != null) { logger.debug("The best assignment is moving executor " + bestExecutor + " from slot " + bestSlot + " to slot " + emptySlot + " with an inter-slot traffic of " + bestInterSlotTraffic + " tuple/s"); bestSlot.remove(bestExecutor); emptySlot.assign(bestExecutor); logger.info("Executor " + bestExecutor + " moved from slot " + bestSlot + " to slot " + emptySlot); } else { logger.warn("Cannot find an executor to move to slot " + emptySlot); } } /* end for (Slot emptySlot : emptySlotList) */ } logger.info("Next assignment: " + Utils.collectionToString(topology.getSlots())); TrafficManager.getInstance().compileInterSlotTraffic(); } /* end if (!executorPairList.isEmpty()) */ logger.info("Assignments of executors for topology " + topologyID + " completed"); } /* end for (String topologyID : dbTopologies) */ logger.info("First phase completed!"); List<SlotPair> interSlotTrafficList = TrafficManager.getInstance() .getInterSlotTrafficList(); logger.info("Inter-slot traffic stats: " + Utils.collectionToString(interSlotTrafficList)); // second phase logger.info("-- Second phase --"); NodeManager nodeManager = new NodeManager(topologyList, cluster); if (nodeManager.getNodeCount() == 0) { logger.info("No nodes have been configured yet, cannot determine any scheduling"); } else { for (SlotPair slotPair : interSlotTrafficList) { logger.info("Slot pair: " + slotPair); List<Node> nodeList = TrafficManager.getInstance().getContainingNodeList( slotPair.getFirst(), slotPair.getSecond()); if (nodeList.isEmpty()) { logger.debug("Both slots have not been assigned yet, try to add them to the least loaded node"); Node leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair.getFirst(), slotPair.getSecond()); if (leastLoadedNode != null) { logger.debug("Least loaded node able to get both the slots: " + leastLoadedNode); leastLoadedNode.assign(slotPair.getFirst()); leastLoadedNode.assign(slotPair.getSecond()); logger.info("Slots " + slotPair.getFirst() + " and " + slotPair.getSecond() + " assigned to node " + leastLoadedNode + " (Slots of topology " + slotPair.getFirst().getTopology().getTopologyID() + " in this node: " + leastLoadedNode.getTopologySlotCount(slotPair.getFirst() .getTopology().getTopologyID())); } else { logger.debug("No node exists that can get both the slots, assign them to distinct nodes"); leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair.getFirst()); if (leastLoadedNode == null) { throw new RuntimeException( "Cannot find a node able to sustain the load of the slot " + slotPair.getFirst()); } logger.debug("Assign slot " + slotPair.getFirst() + " to node " + leastLoadedNode); leastLoadedNode.assign(slotPair.getFirst()); logger.debug("Least loaded node after such assignment: " + leastLoadedNode); logger.info("Slot " + slotPair.getFirst() + " assigned to node " + leastLoadedNode + " (Slots of topology " + slotPair.getFirst().getTopology().getTopologyID() + " in this node: " + leastLoadedNode.getTopologySlotCount(slotPair.getFirst() .getTopology().getTopologyID())); leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair.getSecond()); if (leastLoadedNode == null) { throw new RuntimeException( "Cannot find a node able to sustain the load of the slot " + slotPair.getSecond()); } logger.debug("Assign slot " + slotPair.getSecond() + " to node " + leastLoadedNode); leastLoadedNode.assign(slotPair.getSecond()); logger.debug("Least loaded node after such assignment: " + leastLoadedNode); logger.info("Slot " + slotPair.getSecond() + " assigned to node " + leastLoadedNode + " (Slots of topology " + slotPair.getSecond().getTopology().getTopologyID() + " in this node: " + leastLoadedNode.getTopologySlotCount(slotPair.getSecond() .getTopology().getTopologyID())); } } else { logger.debug("Either slot has been already assigned to nodes " + Utils.collectionToString(nodeList)); Node leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair); logger.debug("The least loaded node able to sustain the traffic of the slot with the lowest traffic is " + leastLoadedNode); if (leastLoadedNode != null && !nodeList.contains(leastLoadedNode)) { nodeList.add(leastLoadedNode); } logger.debug("Nodes to use: " + Utils.collectionToString(nodeList)); logger.debug("Remove slots " + slotPair.getFirst() + " and " + slotPair.getSecond() + " from these nodes"); for (Node node : nodeList) { if (node.contains(slotPair.getFirst())) { node.remove(slotPair.getFirst()); } if (node.contains(slotPair.getSecond())) { node.remove(slotPair.getSecond()); } } logger.debug("Nodes after such removals: " + Utils.collectionToString(nodeList)); logger.debug("Check every possible combination"); Node bestNodeForFirst = null; Node bestNodeForSecond = null; int minInterNodeTraffic = -1; for (Node nodeForFirst : nodeList) { for (Node nodeForSecond : nodeList) { logger.debug("Assigning slot " + slotPair.getFirst() + " to node " + nodeForFirst + " and slot " + slotPair.getSecond() + " to node " + nodeForSecond + "..."); boolean assignmentOK = true; if (nodeForFirst.canAssign(slotPair.getFirst())) { nodeForFirst.assign(slotPair.getFirst()); } else { logger.debug("Cannot assign slot " + slotPair.getFirst() + " to noe " + nodeForFirst); assignmentOK = false; } if (nodeForSecond.canAssign(slotPair.getSecond())) { nodeForSecond.assign(slotPair.getSecond()); } else { logger.debug("Cannot assign slot " + slotPair.getSecond() + " to node " + nodeForSecond); assignmentOK = false; } if (assignmentOK) { int tmpInterNodeTraffic = TrafficManager.getInstance() .computeInterNodeTraffic(); logger.debug("...the inter-node traffic is " + tmpInterNodeTraffic + " tuple/s"); if (minInterNodeTraffic == -1 || tmpInterNodeTraffic < minInterNodeTraffic) { bestNodeForFirst = nodeForFirst; bestNodeForSecond = nodeForSecond; minInterNodeTraffic = tmpInterNodeTraffic; } } if (nodeForFirst.contains(slotPair.getFirst())) { nodeForFirst.remove(slotPair.getFirst()); } if (nodeForSecond.contains(slotPair.getSecond())) { nodeForSecond.remove(slotPair.getSecond()); } } } logger.debug("The best assignment is slot " + slotPair.getFirst() + " to node " + bestNodeForFirst + " and slot " + slotPair.getSecond() + " to node " + bestNodeForSecond + ", with inter-node traffic " + minInterNodeTraffic + " tuple/s"); bestNodeForFirst.assign(slotPair.getFirst()); bestNodeForSecond.assign(slotPair.getSecond()); logger.info("Slot " + slotPair.getFirst() + " assigned to node " + bestNodeForFirst + " (Slots of topology " + slotPair.getFirst().getTopology().getTopologyID() + " in this node: " + bestNodeForFirst.getTopologySlotCount(slotPair.getFirst() .getTopology().getTopologyID())); logger.info("Slot " + slotPair.getSecond() + " assigned to node " + bestNodeForSecond + " (Slots of topology " + slotPair.getSecond().getTopology().getTopologyID() + " in this node: " + bestNodeForSecond.getTopologySlotCount(slotPair.getSecond() .getTopology().getTopologyID())); } /* end if (!nodeList.isEmpty()) */ logger.info("Assignment of slots " + slotPair + " completed"); } /* end for (SlotPair slotPair : interSlotTrafficList) */ if (TrafficManager.getInstance().getAssignments() != null) { logger.info("Intermediate assignment: " + Utils.collectionToString(TrafficManager.getInstance().getAssignments() .keySet())); } /* * ensure that the slots of a given topology are assigned to the * proper number of nodes, otherwise the chances of * parallelization/pipelining are not rightly exploited */ logger.info("Check whether all the topologies are using the desired number of nodes"); for (Topology topology : topologyList) { // int numberOfNodesToUse = Math.min(topology.getSlots().size(), // nodeManager.getNodeCount()); int numberOfNodesToUse = topology.getNumberOfNodesToUse(nodeManager.getNodeCount()); List<Node> usedNodeList = null; while (TrafficManager.getInstance().getNodeList(topology) != null && (usedNodeList = new ArrayList<Node>(TrafficManager.getInstance() .getNodeList(topology))).size() < numberOfNodesToUse) { logger.info("Topology " + topology + " is using " + usedNodeList.size() + " nodes, while it should use " + numberOfNodesToUse); Node bestUsedNode = null; Node bestUnusedNode = null; Slot bestSlot = null; int bestTraffic = -1; for (Node usedNode : usedNodeList) { // check if this node has more than one slot for that // topology int topologySlotCount = 0; List<Slot> nodeSlotList = new ArrayList<Slot>(usedNode.getSlotList()); for (Slot slot : nodeSlotList) { if (slot.getTopology() == topology) { topologySlotCount++; } } if (topologySlotCount > 1) { for (Slot slot : nodeSlotList) { Node unusedNode = nodeManager.getUnusedNode(usedNodeList, slot); if (unusedNode != null) { usedNode.remove(slot); unusedNode.assign(slot); int traffic = TrafficManager.getInstance() .computeInterNodeTraffic(); logger.info("Moving slot " + slot + " from node " + usedNode + " to node " + unusedNode + ", the traffic becomes " + traffic + " tuple/s"); if (bestUsedNode == null || traffic < bestTraffic) { bestUsedNode = usedNode; bestUnusedNode = unusedNode; bestSlot = slot; bestTraffic = traffic; } unusedNode.remove(slot); usedNode.assign(slot); } /* end if (unusedNode != null) */ } /* end for (Slot slot : nodeSlotList) */ } /* end if (topologySlotCount > 1) */ } /* end for (Node usedNode : usedNodeList) */ if (bestUnusedNode != null) { logger.info("The best is moving slot " + bestSlot + " from node " + bestUsedNode + " to node " + bestUnusedNode + ", with a traffic of " + bestTraffic + " tuple/s"); bestUsedNode.remove(bestSlot); bestUnusedNode.assign(bestSlot); } else { logger.info("Cannot find a way to make topology " + topology + " use the desired number of nodes"); break; } } /* end while (number of used nodes < number of nodes to use */ } /* end for (Topology topology : topologyList) */ } logger.info("Second phase completed!"); if (TrafficManager.getInstance().getAssignments() != null) { logger.info("Final assignment: " + Utils.collectionToString(TrafficManager.getInstance().getAssignments() .keySet())); } } }
1 package storm.scheduler; 2 3 import java.lang.management.ManagementFactory; 4 import java.lang.management.ThreadMXBean; 5 import java.util.HashMap; 6 import java.util.Map; 7 import java.util.Set; 8 9 import cpuinfo.CPUInfo; 10 11 /** 12 * 负载监视器 13 * @author wxweven 14 * @version 1.0 15 * @email [email protected] 16 * @blog http://wxweven.com 17 * @Copyright: Copyright (c) wxweven 2009 - 2016 18 */ 19 public class LoadMonitor { 20 21 private static final int SECS_TO_NANOSECS = 1000000000; 22 private static LoadMonitor instance = null; 23 private final long cpuSpeed; // Hz 24 Map<Long, Long> loadHistory; 25 26 public static LoadMonitor getInstance() { 27 if (instance == null) { 28 instance = new LoadMonitor(); 29 } 30 return instance; 31 } 32 33 private LoadMonitor() { 34 cpuSpeed = CPUInfo.getInstance().getCoreInfo(0).getSpeed(); 35 } 36 37 public Map<Long, Long> getLoadInfo(Set<Long> threadIds) { 38 // get current load 39 Map<Long, Long> currentLoadInfo = new HashMap<Long, Long>(); 40 ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); 41 for (long id : threadIds) { 42 currentLoadInfo.put(id, threadBean.getThreadCpuTime(id)); 43 } 44 45 // compute difference wrt history 46 Map<Long, Long> loadInfo = new HashMap<Long, Long>(); 47 for (long id : threadIds) { 48 // Long oldObj = (loadHistory != null)?loadHistory.get(id):0; 49 // long old = (oldObj != null)?oldObj.longValue():0; 50 long old = 0; 51 if (loadHistory != null && loadHistory.get(id) != null) { 52 old = loadHistory.get(id); 53 } 54 double deltaTime = (double)(currentLoadInfo.get(id) - old) / SECS_TO_NANOSECS; // sec 55 loadInfo.put(id, (long)(deltaTime * cpuSpeed)); 56 } 57 58 // replace history with current 59 loadHistory = currentLoadInfo; 60 61 return loadInfo; 62 } 63 }
以上是关于Storm测试的主要内容,如果未能解决你的问题,请参考以下文章