From afd2f75e4cb38929ca0112845da72fff4fc77df3 Mon Sep 17 00:00:00 2001 From: Adnan Manzoor Date: Tue, 14 Apr 2020 11:47:53 +0200 Subject: [PATCH 1/3] Adding the eggress node handling function --- params/networks/triangle.graphml | 2 +- src/coordsim/main.py | 8 ++--- src/coordsim/network/flow.py | 8 +++-- src/coordsim/reader/reader.py | 7 ++-- src/coordsim/simulation/flowsimulator.py | 42 ++++++++++++++++++---- src/coordsim/simulation/simulatorparams.py | 4 ++- src/siminterface/simulator.py | 6 ++-- 7 files changed, 57 insertions(+), 20 deletions(-) diff --git a/params/networks/triangle.graphml b/params/networks/triangle.graphml index 59f0fe11e..3a56dd7e0 100644 --- a/params/networks/triangle.graphml +++ b/params/networks/triangle.graphml @@ -88,7 +88,7 @@ 1 -87.65005 Chicago - Normal + Egress 10 diff --git a/src/coordsim/main.py b/src/coordsim/main.py index e20d9d4a8..1e5552d8f 100644 --- a/src/coordsim/main.py +++ b/src/coordsim/main.py @@ -28,8 +28,8 @@ def main(): random.seed(args.seed) numpy.random.seed(args.seed) - # Parse network and get NetworkX object and ingress network list - network, ing_nodes = reader.read_network(args.network, node_cap=10, link_cap=10) + # Parse network, get NetworkX object ,ingress network list, and egress nodes list + network, ing_nodes, eg_nodes = reader.read_network(args.network, node_cap=10, link_cap=10) # use dummy placement and schedule for running simulator without algorithm # TODO: make configurable via CLI @@ -44,8 +44,8 @@ def main(): metrics = Metrics(network, sf_list) # Create the simulator parameters object with the provided args - params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, metrics, sf_placement=sf_placement, - schedule=schedule) + params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics, + sf_placement=sf_placement, schedule=schedule) log.info(params) if 'trace_path' in config: diff --git a/src/coordsim/network/flow.py b/src/coordsim/network/flow.py index 5b75b8870..5291ccce2 100644 --- a/src/coordsim/network/flow.py +++ b/src/coordsim/network/flow.py @@ -9,8 +9,8 @@ class Flow: - def __init__(self, flow_id, sfc, dr, size, creation_time, - destination=None, current_sf=None, current_node_id=None, current_position=0, end2end_delay=0.0): + def __init__(self, flow_id, sfc, dr, size, creation_time, destination=None, egress_node_id=None, current_sf=None, + current_node_id=None, current_position=0, end2end_delay=0.0): # Flow ID: Unique ID string self.flow_id = flow_id @@ -24,6 +24,10 @@ def __init__(self, flow_id, sfc, dr, size, creation_time, self.current_sf = current_sf # The current node that the flow is being processed in self.current_node_id = current_node_id + # The specified ingress node of the flow. The flow will spawn at the ingress node. + self.ingress_node_id = current_node_id + # The specified egress node of the flow. The flow will depart at the egress node. Might be non-existent. + self.egress_node_id = egress_node_id # The duration of the flow calculated in ms. self.duration = (float(size) / float(dr)) * 1000 # Converted flow duration to ms # Current flow position within the SFC diff --git a/src/coordsim/reader/reader.py b/src/coordsim/reader/reader.py index 34376af84..aaf549650 100644 --- a/src/coordsim/reader/reader.py +++ b/src/coordsim/reader/reader.py @@ -223,13 +223,16 @@ def read_network(file, node_cap=None, link_cap=None): # Setting the all-pairs shortest path in the NetworkX network as a graph attribute shortest_paths(networkx_network) - # Filter ingress nodes + # Filter ingress and egress (if any) nodes ing_nodes = [] + eg_nodes = [] for node in networkx_network.nodes.items(): if node[1]["type"] == "Ingress": ing_nodes.append(node) + if node[1]["type"] == "Egress": + eg_nodes.append(node[0]) - return networkx_network, ing_nodes + return networkx_network, ing_nodes, eg_nodes def reset_cap(network): diff --git a/src/coordsim/simulation/flowsimulator.py b/src/coordsim/simulation/flowsimulator.py index 58a71a882..ffb5c9e0f 100644 --- a/src/coordsim/simulation/flowsimulator.py +++ b/src/coordsim/simulation/flowsimulator.py @@ -31,6 +31,8 @@ def start(self): # Setting the all-pairs shortest path in the NetworkX network as a graph attribute log.info("Using nodes list {}\n".format(list(self.params.network.nodes.keys()))) log.info("Total of {} ingress nodes available\n".format(len(self.params.ing_nodes))) + if self.params.eg_nodes: + log.info("Total of {} egress nodes available\n".format(len(self.params.eg_nodes))) for node in self.params.ing_nodes: node_id = node[0] self.env.process(self.generate_flow(node_id)) @@ -66,9 +68,13 @@ def generate_flow(self, node_id): flow_sfc = np.random.choice([sfc for sfc in self.params.sfc_list.keys()]) # Get the flow's creation time (current environment time) creation_time = self.env.now + # Set the egress node for the flow if some are specified in the network file + flow_egress_node = None + if self.params.eg_nodes: + flow_egress_node = random.choice(self.params.eg_nodes) # Generate flow based on given params flow = Flow(str(self.total_flow_count), flow_sfc, flow_dr, flow_size, creation_time, - current_node_id=node_id) + current_node_id=node_id, egress_node_id=flow_egress_node) # Update metrics for the generated flow self.params.metrics.generated_flow(flow, node_id) # Generate flows and schedule them at ingress node @@ -230,10 +236,31 @@ def process_flow(self, flow, sfc): log.info("Flow {} started departing sf {} at node {}. Time {}" .format(flow.flow_id, current_sf, current_node_id, self.env.now)) - # Check if flow is currently in last SF, if so, then depart flow. - if (flow.current_position == len(sfc) - 1): - yield self.env.timeout(flow.duration) - self.depart_flow(flow) + # Check if flow is currently in last SF, if so, then: + # - Check if the flow has some Egress node set or not. If not then just depart. If Yes then: + # - check if the current node is the egress node. If Yes then depart. If No then forward the flow to + # the egress node using the shortest_path + + if flow.current_position == len(sfc) - 1: + if flow.current_node_id == flow.egress_node_id: + # Flow is processed and resides at egress node: depart flow + yield self.env.timeout(flow.duration) + self.depart_flow(flow) + elif flow.egress_node_id is None: + # Flow is processed and no egress node specified: depart flow + log.info(f'Flow {flow.flow_id} has no egress node, will depart from' + f' current node {flow.current_node_id}. Time {self.env.now}.') + yield self.env.timeout(flow.duration) + self.depart_flow(flow) + else: + # Remove the active flow from the SF after it departed the SF on current node towards egress + self.params.metrics.remove_active_flow(flow, current_node_id, current_sf) + # Forward flow to the egress node and then depart from there + yield self.env.process(self.forward_flow(flow, flow.egress_node_id)) + yield self.env.timeout(flow.duration) + # In this situation the last sf was never active for the egress node, + # so we should not remove it from the metrics + self.depart_flow(flow, remove_active_flow=False) else: # Increment the position of the flow within SFC flow.current_position += 1 @@ -278,13 +305,14 @@ def process_flow(self, flow, sfc): self.params.metrics.dropped_flow(flow) self.env.exit() - def depart_flow(self, flow): + def depart_flow(self, flow, remove_active_flow=True): """ Process the flow at the requested SF of the current node. """ # Update metrics for the processed flow self.params.metrics.completed_flow() self.params.metrics.add_end2end_delay(flow.end2end_delay) - self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf) + if remove_active_flow: + self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf) log.info("Flow {} was processed and departed the network from {}. Time {}" .format(flow.flow_id, flow.current_node_id, self.env.now)) diff --git a/src/coordsim/simulation/simulatorparams.py b/src/coordsim/simulation/simulatorparams.py index cad59b13a..a2cb7edc0 100644 --- a/src/coordsim/simulation/simulatorparams.py +++ b/src/coordsim/simulation/simulatorparams.py @@ -10,12 +10,14 @@ class SimulatorParams: - def __init__(self, network, ing_nodes, sfc_list, sf_list, config, metrics, prediction=False, + def __init__(self, network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics, prediction=False, schedule=None, sf_placement=None): # NetworkX network object: DiGraph self.network = network # Ingress nodes of the network (nodes at which flows arrive): list self.ing_nodes = ing_nodes + # Egress nodes of the network (nodes at which flows may leave the network): list + self.eg_nodes = eg_nodes # List of available SFCs and their child SFs: defaultdict(None) self.sfc_list = sfc_list # List of every SF and it's properties (e.g. processing_delay): defaultdict(None) diff --git a/src/siminterface/simulator.py b/src/siminterface/simulator.py index 8cb03e6a5..0732ddd9a 100644 --- a/src/siminterface/simulator.py +++ b/src/siminterface/simulator.py @@ -28,7 +28,7 @@ def __init__(self, network_file, service_functions_file, config_file, resource_ # Create CSV writer self.writer = ResultWriter(self.test_mode, self.test_dir) # init network, sfc, sf, and config files - self.network, self.ing_nodes = reader.read_network(self.network_file) + self.network, self.ing_nodes, self.eg_nodes = reader.read_network(self.network_file) self.sfc_list = reader.get_sfc(service_functions_file) self.sf_list = reader.get_sf(service_functions_file, resource_functions_path) self.config = reader.get_config(config_file) @@ -38,8 +38,8 @@ def __init__(self, network_file, service_functions_file, config_file, resource_ # Check if future ingress traffic setting is enabled if 'future_traffic' in self.config and self.config['future_traffic']: self.prediction = True - self.params = SimulatorParams(self.network, self.ing_nodes, self.sfc_list, self.sf_list, self.config, - self.metrics, prediction=self.prediction) + self.params = SimulatorParams(self.network, self.ing_nodes, self.eg_nodes, self.sfc_list, self.sf_list, + self.config, self.metrics, prediction=self.prediction) if self.prediction: self.predictor = TrafficPredictor(self.params) self.episode = 0 From 1a637cc613062ee77c24af5b7ce309783848d623 Mon Sep 17 00:00:00 2001 From: Adnan Manzoor Date: Tue, 14 Apr 2020 12:08:05 +0200 Subject: [PATCH 2/3] Fixing flake8 and nose2 errors --- src/coordsim/simulation/flowsimulator.py | 2 +- tests/test_simulator.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/coordsim/simulation/flowsimulator.py b/src/coordsim/simulation/flowsimulator.py index ffb5c9e0f..e873f5e51 100644 --- a/src/coordsim/simulation/flowsimulator.py +++ b/src/coordsim/simulation/flowsimulator.py @@ -249,7 +249,7 @@ def process_flow(self, flow, sfc): elif flow.egress_node_id is None: # Flow is processed and no egress node specified: depart flow log.info(f'Flow {flow.flow_id} has no egress node, will depart from' - f' current node {flow.current_node_id}. Time {self.env.now}.') + f' current node {flow.current_node_id}. Time {self.env.now}.') yield self.env.timeout(flow.duration) self.depart_flow(flow) else: diff --git a/tests/test_simulator.py b/tests/test_simulator.py index 4d1b30fdf..ca198d255 100644 --- a/tests/test_simulator.py +++ b/tests/test_simulator.py @@ -27,7 +27,7 @@ def setUp(self): self.env = simpy.Environment() # Configure simulator parameters - network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10) + network, ing_nodes, eg_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10) sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE) sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH) config = reader.get_config(CONFIG_FILE) @@ -38,7 +38,7 @@ def setUp(self): schedule = dummy_data.triangle_schedule # Initialize Simulator and SimulatoParams objects - self.simulator_params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, self.metrics, + self.simulator_params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, self.metrics, sf_placement=sf_placement, schedule=schedule) self.flow_simulator = FlowSimulator(self.env, self.simulator_params) self.flow_simulator.start() From 69ec4ef1ca3432e896d20dbfd8e25c69c838c19a Mon Sep 17 00:00:00 2001 From: Adnan Manzoor Date: Thu, 16 Apr 2020 11:14:04 +0200 Subject: [PATCH 3/3] Updating Readme --- README.md | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ee57925a1..5bde00e99 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Simulate flow-level, inter-node network coordination including scaling and place ## Citing this work -If you are using this work in whole or in part in your project, please cite it as follows: +If you are using this work in whole or in part in your project, please cite it as follows: ``` @inproceedings{schneider2020coordination, @@ -54,9 +54,9 @@ pip install -r requirements.txt ## Usage -Type `coord-sim -h` for help using the simulator. For now, this should print +Type `coord-sim -h` for help using the simulator. For now, this should print -``` +``` $ coord-sim -h usage: coord-sim [-h] -d DURATION -sf SF [-sfr SFR] -n NETWORK -c CONFIG [-t TRACE] [-s SEED] @@ -85,7 +85,7 @@ optional arguments: You can use the following command as an example (run from the root project folder) -```bash +```bash coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml ``` This will run a simulation on a provided GraphML network file and a YAML placement file for a duration of 20 timesteps. @@ -95,7 +95,7 @@ This will run a simulation on a provided GraphML network file and a YAML placeme By default, all SFs have a node resource consumption, which exactly equals the aggregated traffic that they have to handle. -It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a +It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a function `resource_function(load)` (see examples [here](https://github.com/RealVNF/coordination-simulation/tree/master/params/services/resource_functions)). To use these modules, they need to be referenced in the service file: @@ -113,6 +113,15 @@ And the path to the folder with the Python modules needs to be passed via the `- See PR https://github.com/RealVNF/coordination-simulation/pull/78 for details. +### Egress nodes + +- A node can be set to be a `Egress` node in the `NodeType` attribute of the network file +- If some nodes are set as `Egress` then only the simulator will randomly choose one of them as the Egress node for each flow in the network +- If some nodes are set to be Egress then once the flow is processed we check if for the flow, `current node == egress node` . If Yes then we depart , otherwise we forward the flow to the egress_node using the shortest_path routing. +- **Todo**: Ideally the coordination algorithms should keep the path(Ingress to Egress) of the flow in view while creating the schedule/placement. + +See [PR 137](https://github.com/RealVNF/coord-sim/pull/137) for details. + ## Tests ```bash