package org.apache.storm.scheduler; import com.google.common.collect.Sets; import java.util.Collections; import java.io.File; import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.Set; import java.util.TreeMap; import java.util.Comparator; import org.apache.storm.utils.Utils; import java.util.Map.Entry; import org.apache.storm.utils.ServerUtils; public class WGStorm implements IScheduler { private static Set badSlots(Map> existingSlots, int numExecutors, int numWorkers) { if (numWorkers != 0) { Map distribution = Utils.integerDivided(numExecutors, numWorkers); Set slots = new HashSet(); for (Entry> entry : existingSlots.entrySet()) { Integer executorCount = entry.getValue().size(); Integer workerCount = distribution.get(executorCount); if (workerCount != null && workerCount > 0) { slots.add(entry.getKey()); workerCount--; distribution.put(executorCount, workerCount); } } for (WorkerSlot slot : slots) { existingSlots.remove(slot); } return existingSlots.keySet(); } return null; } public static Set slotsCanReassign(Cluster cluster, Set slots) { Set result = new HashSet(); for (WorkerSlot slot : slots) { if (!cluster.isBlackListed(slot.getNodeId())) { SupervisorDetails supervisor = cluster.getSupervisorById(slot.getNodeId()); if (supervisor != null) { Set ports = supervisor.getAllPorts(); if (ports != null && ports.contains(slot.getPort())) { result.add(slot); } } } } return result; } @Override public void prepare(Map conf) { //noop } @Override public Map> config() { return new HashMap<>(); } @Override public void schedule(Topologies topologies, Cluster cluster) { for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { if (cluster.getAvailableSlots().size() == 0) break; Set allExecutors = topology.getExecutors(); Map> aliveAssigned = EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); Set aliveExecutors = new HashSet(); for (List list : aliveAssigned.values()) { aliveExecutors.addAll(list); } Set canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet()); int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + cluster.getAvailableSlots().size()); Set badSlots = null; if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) { badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); } if (badSlots != null) { cluster.freeSlots(badSlots); } Map assignment = MapExecutorToSlot(cluster, topology, allExecutors, totalSlotsToUse); Map> nodePortToExecutors = Utils.reverseMap(assignment); for (Map.Entry> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List executors = entry.getValue(); System.out.println(" "); System.out.println(nodePort); for (ExecutorDetails e : executors) { System.out.print(e + " "); } System.out.println(""); cluster.assign(nodePort, topology.getId(), executors); } } } public Map MapExecutorToSlot(Cluster cluster, TopologyDetails topology, Set allExecutors, int NumWorker) { int TotalExecutors = allExecutors.size(); int MaxExecutorPerSlot = (int) Math.ceil(TotalExecutors / (double) NumWorker); List availableSlots = cluster.getAvailableSlots(); availableSlots = sortSlots(availableSlots); if (availableSlots == null) { return new HashMap(); } List executors = new ArrayList(allExecutors); Collections.sort(executors, new Comparator() { @Override public int compare(ExecutorDetails o1, ExecutorDetails o2) { return o1.getStartTask() - o2.getStartTask(); } }); Map assignment = new HashMap(); int count = 0, AssignedExecutors = 0, currentSlotNumber = 0, slotOrder = 0; int[] executorPerSlot = new int[availableSlots.size()]; int[] slotOrdering = new int[availableSlots.size()]; ArrayList trafficList = sortTrafficList(ReadTrafficFromYaml()); ArrayList resourceList = ReadResourceFromYaml(); Map map = topology.getExecutorToComponent(); Set executorDetailsSet = map.keySet(); ArrayList visited = new ArrayList(); // Top free resource should be fetched here for (int k = 0; k < resourceList.size() && !trafficList.isEmpty(); k++) { String[] resource = (String[]) resourceList.get(k); for (int j = 0; j < availableSlots.size() && !trafficList.isEmpty() && currentSlotNumber < NumWorker; j++) { count = 0; ArrayList slotVisited = new ArrayList(); String nodeId = availableSlots.get(j).getNodeId(); if (nodeId.equalsIgnoreCase(resource[0])) { slotOrdering[slotOrder++] = j; if (currentSlotNumber < 3) { int acker = currentSlotNumber + 1; assignment.put(GetNextNeighbour(executorDetailsSet, "[" + acker + ", " + acker + "]"), availableSlots.get(j)); AssignedExecutors++; count++; executorPerSlot[j]++; } String[] executor = GetNextExecutor(trafficList); trafficList.remove(0); ArrayList executorDet = GetNextExecutorDetails(executor, visited, executorDetailsSet); for (ExecutorDetails executorDetails1 : executorDet) { assignment.put(executorDetails1, availableSlots.get(j)); visited.add(executorDetails1.toString()); slotVisited.add(executorDetails1.toString()); AssignedExecutors++; count++; executorPerSlot[j]++; // } while (count < MaxExecutorPerSlot && !trafficList.isEmpty()) { ExecutorDetails mapit = GetNextNeighbour(slotVisited, visited, trafficList, executorDetailsSet); if (mapit == null) { break; } assignment.put(mapit, availableSlots.get(j)); visited.add(mapit.toString()); slotVisited.add(mapit.toString()); executorPerSlot[j]++; AssignedExecutors++; count++; } currentSlotNumber++; } } } if (AssignedExecutors < TotalExecutors) { trafficList = sortTrafficList(ReadTrafficFromYaml()); for (int i = 0; i < trafficList.size(); i++) { String[] tmp = (String[]) trafficList.get(i); if (!visited.contains(tmp[0])) { for (int counter = 0; counter < slotOrdering.length; counter++) { if (//executorPerSlot[slotOrdering[counter]] > 0 && executorPerSlot[slotOrdering[counter]] < MaxExecutorPerSlot) { ExecutorDetails mapit = GetNextNeighbour(executorDetailsSet, tmp[0]); assignment.put(mapit, availableSlots.get(slotOrdering[counter])); visited.add(mapit.toString()); executorPerSlot[slotOrdering[counter]]++; break; } } } if (!visited.contains(tmp[1])) { for (int counter = 0; counter < slotOrdering.length; counter++) { if (//executorPerSlot[slotOrdering[counter]] > 0 && executorPerSlot[slotOrdering[counter]] < MaxExecutorPerSlot) { ExecutorDetails mapit = GetNextNeighbour(executorDetailsSet, tmp[1]); assignment.put(mapit, availableSlots.get(slotOrdering[counter])); visited.add(mapit.toString()); executorPerSlot[slotOrdering[counter]]++; break; } } } trafficList.remove(i--); } } if (NumWorker < 3) { for (int counter = executorPerSlot.length - 1; counter >= 0; counter--) { if (executorPerSlot[counter] > 0 && executorPerSlot[counter] < MaxExecutorPerSlot) { int extraExecutors = 3; ExecutorDetails mapit = GetNextNeighbour(executorDetailsSet, "[" + extraExecutors + ", " + extraExecutors-- + "]"); assignment.put(mapit, availableSlots.get(counter)); executorPerSlot[counter]++; if (NumWorker == 1) { mapit = GetNextNeighbour(executorDetailsSet, "[" + extraExecutors + ", " + extraExecutors-- + "]"); assignment.put(mapit, availableSlots.get(counter)); executorPerSlot[counter]++; } break; } } } return assignment; } private String[] GetNextExecutor(ArrayList trafficList) { if (trafficList.size() > 0) { return (String[]) trafficList.get(0); } else { return null; } } private ArrayList GetNextExecutorDetails(String[] executor, ArrayList visited, Set executorDetailsSet) { ArrayList chk = new ArrayList(); for (ExecutorDetails executorDetails1 : executorDetailsSet) { if (executor[0].equals(executorDetails1.toString()) && !visited.contains(executorDetails1.toString())) { chk.add(executorDetails1); } else if (executor[1].equals(executorDetails1.toString()) && !visited.contains(executorDetails1.toString())) { chk.add(executorDetails1); } } return chk; } private ExecutorDetails GetNextNeighbour(Set executorDetailsSet, String s) { ExecutorDetails chk = null; for (ExecutorDetails executorDetails1 : executorDetailsSet) { if (s.equals(executorDetails1.toString())) { chk = executorDetails1; break; } } return chk; } private ExecutorDetails GetNextNeighbour(ArrayList slotVisited, ArrayList visited, ArrayList trafficList, Set executorDetailsSet) { boolean isfound = false; String found = ""; ExecutorDetails ret = null; for (int i = 0; i < trafficList.size(); i++) { String[] x = (String[]) trafficList.get(i); if (!isfound) { if (slotVisited.contains(x[0])) { if (!(slotVisited.contains(x[1])) && !(visited.contains(x[1]))) { found = x[1]; isfound = true; } trafficList.remove(i--); } else if (slotVisited.contains(x[1])) { if (!(slotVisited.contains(x[0])) && !(visited.contains(x[0]))) { found = x[0]; isfound = true; } trafficList.remove(i--); } } else break; } for (ExecutorDetails executorDetails : executorDetailsSet) { if (found.equals(executorDetails.toString())) { ret = executorDetails; break; } } return ret; } private ArrayList sortTrafficList(ArrayList trafficList) { java.util.Collections.sort(trafficList, new java.util.Comparator() { public int compare(String[] strings, String[] otherStrings) { return otherStrings[2].compareTo(strings[2]); } }); return trafficList; } public static List sortSlots(List availableSlots) { //For example, we have a three nodes(supervisor1, supervisor2, supervisor3) cluster: //slots before sort: //supervisor1:6700, supervisor1:6701, //supervisor2:6700, supervisor2:6701, supervisor2:6702, //supervisor3:6700, supervisor3:6703, supervisor3:6702, supervisor3:6701 //slots after sort: //supervisor3:6700, supervisor2:6700, supervisor1:6700, //supervisor3:6701, supervisor2:6701, supervisor1:6701, //supervisor3:6702, supervisor2:6702, //supervisor3:6703 if (availableSlots != null && availableSlots.size() > 0) { // group by node Map> slotGroups = new TreeMap<>(); for (WorkerSlot slot : availableSlots) { String node = slot.getNodeId(); List slots = null; if (slotGroups.containsKey(node)) { slots = slotGroups.get(node); } else { slots = new ArrayList(); slotGroups.put(node, slots); } slots.add(slot); } // sort by port: from small to large for (List slots : slotGroups.values()) { Collections.sort(slots, new Comparator() { @Override public int compare(WorkerSlot o1, WorkerSlot o2) { return o1.getPort() - o2.getPort(); } }); } // sort by available slots size: from large to small List> list = new ArrayList>(slotGroups.values()); Collections.sort(list, new Comparator>() { @Override public int compare(List o1, List o2) { return o2.size() - o1.size(); } }); return ServerUtils.interleaveAll(list); } return null; } /** * Function that read the hard coded values from YAML. file */ private ArrayList ReadTrafficFromYaml() { ArrayList traffic = new ArrayList(); try { try (Scanner input = new Scanner(new File("/home/malik/Asif/apache-storm-2.0.1-SNAPSHOT/conf/Traffic.yaml"));) { while (input.hasNext()) { String line = input.nextLine(); if (line.length() > 0) traffic.add(line.split(";")); } } } catch (FileNotFoundException ex) { System.out.println("Traffic file not found!"); } return traffic; } private ArrayList ReadResourceFromYaml() { ArrayList resource = new ArrayList(); try { try (Scanner input = new Scanner(new File("/home/malik/Asif/apache-storm-2.0.1-SNAPSHOT/conf/Resource.yaml"));) { while (input.hasNext()) { String line = input.nextLine(); if (line.length() > 0) resource.add(line.split(",")); } } } catch (FileNotFoundException ex) { System.out.println("Resource file not found!"); } return resource; } }