package org.apache.storm.scheduler; import com.google.common.collect.Sets; import java.io.File; import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.storm.utils.Utils; public class RAScheduling implements IScheduler { private TopologyDetails t; private Cluster cluster; @Override public void prepare(Map conf) {//noop } @Override public void schedule(Topologies topologies, Cluster cluster) { this.cluster = cluster; Collection topologyDetails = topologies.getTopologies(); for (TopologyDetails t : topologyDetails) { this.t = t; Map> aliveAssigned = EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, t.getId()); Set aliveExecutors = new HashSet(); for (List list : aliveAssigned.values()) { aliveExecutors.addAll(list); } int NumWorker = t.getNumWorkers(); Set allExecutors = t.getExecutors(); List availableSlots = cluster.getAvailableSlots(); Set reassignExecutors = Sets.difference(allExecutors, aliveExecutors); Map slotMap = MapExecutorToSlot(t.getId(), reassignExecutors, NumWorker, availableSlots); AllocateSlotsToNodes(slotMap); } } @Override public Map> config() { return new HashMap<>(); } public Map MapExecutorToSlot(String t, Set allExecutors, int NumWorker, List availableSlots) { HashMap> slotMap = new HashMap<>(); int AssignedSlots = 0, AssignedExecutors = 0; int TotalExecutors = allExecutors.size(); List executors = new ArrayList(allExecutors); Map assignment = new HashMap(); ArrayList trafficList = new ArrayList(); try { trafficList = ReadFromYaml(); } catch (Exception ex) { System.out.println(ex.getMessage()); } int MaxExecutorPerSlot = (int) Math.ceil(TotalExecutors / NumWorker); if (NumWorker == 1) { for (int i = 0; i < availableSlots.size(); i++) { assignment.put(executors.get(i), availableSlots.get(0)); } Map> nodePortToExecutors = Utils.reverseMap(assignment); for (Map.Entry> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List executor = entry.getValue(); cluster.assign(nodePort, t, executor); } } else if (NumWorker >= TotalExecutors) { for (int i = 0; i < availableSlots.size(); i++) { assignment.put(executors.get(i), availableSlots.get(i)); } Map> nodePortToExecutors = Utils.reverseMap(assignment); for (Map.Entry> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List executor = entry.getValue(); cluster.assign(nodePort, t, executor); } } else { int temp; String[] tem = new String[1]; for (int j = 0; j < trafficList.size(); j++) { String[] x = (String[]) trafficList.get(0); temp = Integer.parseInt(x[2]); for (int i = j + 1; i < trafficList.size(); i++) { String[] c = (String[]) trafficList.get(i); if (temp > Integer.parseInt(c[2])) { tem = (String[]) trafficList.get(i); trafficList.set(i, trafficList.get(j)); trafficList.set(j, tem); } } } ArrayList resourceList = null; int count = 0; try { resourceList = ReadResourceFromYaml(); } catch (Exception ex) { System.out.println(ex.getMessage()); } Map map = this.t.getExecutorToComponent(); Set executorDetailsSet = map.keySet(); List availableSlotsList = cluster.getAvailableSlots(); for (int i = 0; i < trafficList.size(); i++) { String[] executor = (String[]) trafficList.get(i); for (ExecutorDetails executorDetails : executorDetailsSet) { if (executor[0].equals(executorDetails)) { for (int k = 0; k < resourceList.size(); k++) { String[] resource = (String[]) resourceList.get(k); for (int j = 0; j < availableSlotsList.size(); j++) { String nodeId = availableSlotsList.get(j).getNodeId(); if (nodeId.equalsIgnoreCase(resource[0])) { assignment.put(executorDetails, availableSlotsList.get(j)); AssignedExecutors++; AssignedSlots++; count++; if (count < MaxExecutorPerSlot) { for (ExecutorDetails executorDetails1 : executorDetailsSet) { if (executor[1].equals(executorDetails1)) { assignment.put(executorDetails1, availableSlotsList.get(j)); AssignedExecutors++; AssignedSlots++; count++; } } } } count = 0; } } } } } if (AssignedExecutors < TotalExecutors) { for (int i = 0; i < executors.size(); i++) { assignment.put(executors.get(i), availableSlots.get(i % availableSlots.size())); } } Map> nodePortToExecutors = Utils.reverseMap(assignment); for (Map.Entry> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List executor = entry.getValue(); cluster.assign(nodePort, t, executor); } } return assignment; } /** * Function that read the hard coded values from YAML. file */ private ArrayList ReadFromYaml() throws Exception { ArrayList check = new ArrayList(); try { Scanner in = new Scanner(new File("/home/malik/Downloads/apache-storm-2.0.1-SNAPSHOT/conf/file.txt")); String s; while (in.hasNext()) { s = in.nextLine(); if (s.equalsIgnoreCase("# Test traffic")) { while (in.hasNext()) { s = in.nextLine(); if (s.equalsIgnoreCase("# Resources")) { break; } check.add(s.split(",")); } } } in.close(); return check; } catch (FileNotFoundException ex) { throw new Exception("File specified not found"); } } private void AllocateSlotsToNodes(Map slotMap) { } private ArrayList ReadResourceFromYaml() throws Exception { ArrayList check = new ArrayList(); try { Scanner in = new Scanner(new File("/home/malik/Downloads/apache-storm-2.0.1-SNAPSHOT/conf/file.txt")); String s; while (in.hasNext()) { s = in.nextLine(); if (s.equalsIgnoreCase("# Resources")) { while (in.hasNext()) { check.add(in.nextLine().split(",")); } } } in.close(); return check; } catch (FileNotFoundException ex) { throw new Exception("File specified not found"); } } }