diff --git a/README.md b/README.md
index ee57925a1..5bde00e99 100644
--- a/README.md
+++ b/README.md
@@ -27,7 +27,7 @@ Simulate flow-level, inter-node network coordination including scaling and place
## Citing this work
-If you are using this work in whole or in part in your project, please cite it as follows:
+If you are using this work in whole or in part in your project, please cite it as follows:
```
@inproceedings{schneider2020coordination,
@@ -54,9 +54,9 @@ pip install -r requirements.txt
## Usage
-Type `coord-sim -h` for help using the simulator. For now, this should print
+Type `coord-sim -h` for help using the simulator. For now, this should print
-```
+```
$ coord-sim -h
usage: coord-sim [-h] -d DURATION -sf SF [-sfr SFR] -n NETWORK -c CONFIG
[-t TRACE] [-s SEED]
@@ -85,7 +85,7 @@ optional arguments:
You can use the following command as an example (run from the root project folder)
-```bash
+```bash
coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml
```
This will run a simulation on a provided GraphML network file and a YAML placement file for a duration of 20 timesteps.
@@ -95,7 +95,7 @@ This will run a simulation on a provided GraphML network file and a YAML placeme
By default, all SFs have a node resource consumption, which exactly equals the aggregated traffic that they have to handle.
-It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a
+It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a
function `resource_function(load)` (see examples [here](https://github.com/RealVNF/coordination-simulation/tree/master/params/services/resource_functions)).
To use these modules, they need to be referenced in the service file:
@@ -113,6 +113,15 @@ And the path to the folder with the Python modules needs to be passed via the `-
See PR https://github.com/RealVNF/coordination-simulation/pull/78 for details.
+### Egress nodes
+
+- A node can be set to be a `Egress` node in the `NodeType` attribute of the network file
+- If some nodes are set as `Egress` then only the simulator will randomly choose one of them as the Egress node for each flow in the network
+- If some nodes are set to be Egress then once the flow is processed we check if for the flow, `current node == egress node` . If Yes then we depart , otherwise we forward the flow to the egress_node using the shortest_path routing.
+- **Todo**: Ideally the coordination algorithms should keep the path(Ingress to Egress) of the flow in view while creating the schedule/placement.
+
+See [PR 137](https://github.com/RealVNF/coord-sim/pull/137) for details.
+
## Tests
```bash
diff --git a/params/networks/triangle.graphml b/params/networks/triangle.graphml
index 59f0fe11e..3a56dd7e0 100644
--- a/params/networks/triangle.graphml
+++ b/params/networks/triangle.graphml
@@ -88,7 +88,7 @@
1
-87.65005
Chicago
- Normal
+ Egress
10
diff --git a/src/coordsim/main.py b/src/coordsim/main.py
index e20d9d4a8..1e5552d8f 100644
--- a/src/coordsim/main.py
+++ b/src/coordsim/main.py
@@ -28,8 +28,8 @@ def main():
random.seed(args.seed)
numpy.random.seed(args.seed)
- # Parse network and get NetworkX object and ingress network list
- network, ing_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)
+ # Parse network, get NetworkX object ,ingress network list, and egress nodes list
+ network, ing_nodes, eg_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)
# use dummy placement and schedule for running simulator without algorithm
# TODO: make configurable via CLI
@@ -44,8 +44,8 @@ def main():
metrics = Metrics(network, sf_list)
# Create the simulator parameters object with the provided args
- params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, metrics, sf_placement=sf_placement,
- schedule=schedule)
+ params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics,
+ sf_placement=sf_placement, schedule=schedule)
log.info(params)
if 'trace_path' in config:
diff --git a/src/coordsim/network/flow.py b/src/coordsim/network/flow.py
index 5b75b8870..5291ccce2 100644
--- a/src/coordsim/network/flow.py
+++ b/src/coordsim/network/flow.py
@@ -9,8 +9,8 @@
class Flow:
- def __init__(self, flow_id, sfc, dr, size, creation_time,
- destination=None, current_sf=None, current_node_id=None, current_position=0, end2end_delay=0.0):
+ def __init__(self, flow_id, sfc, dr, size, creation_time, destination=None, egress_node_id=None, current_sf=None,
+ current_node_id=None, current_position=0, end2end_delay=0.0):
# Flow ID: Unique ID string
self.flow_id = flow_id
@@ -24,6 +24,10 @@ def __init__(self, flow_id, sfc, dr, size, creation_time,
self.current_sf = current_sf
# The current node that the flow is being processed in
self.current_node_id = current_node_id
+ # The specified ingress node of the flow. The flow will spawn at the ingress node.
+ self.ingress_node_id = current_node_id
+ # The specified egress node of the flow. The flow will depart at the egress node. Might be non-existent.
+ self.egress_node_id = egress_node_id
# The duration of the flow calculated in ms.
self.duration = (float(size) / float(dr)) * 1000 # Converted flow duration to ms
# Current flow position within the SFC
diff --git a/src/coordsim/reader/reader.py b/src/coordsim/reader/reader.py
index 34376af84..aaf549650 100644
--- a/src/coordsim/reader/reader.py
+++ b/src/coordsim/reader/reader.py
@@ -223,13 +223,16 @@ def read_network(file, node_cap=None, link_cap=None):
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
shortest_paths(networkx_network)
- # Filter ingress nodes
+ # Filter ingress and egress (if any) nodes
ing_nodes = []
+ eg_nodes = []
for node in networkx_network.nodes.items():
if node[1]["type"] == "Ingress":
ing_nodes.append(node)
+ if node[1]["type"] == "Egress":
+ eg_nodes.append(node[0])
- return networkx_network, ing_nodes
+ return networkx_network, ing_nodes, eg_nodes
def reset_cap(network):
diff --git a/src/coordsim/simulation/flowsimulator.py b/src/coordsim/simulation/flowsimulator.py
index 58a71a882..e873f5e51 100644
--- a/src/coordsim/simulation/flowsimulator.py
+++ b/src/coordsim/simulation/flowsimulator.py
@@ -31,6 +31,8 @@ def start(self):
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
log.info("Using nodes list {}\n".format(list(self.params.network.nodes.keys())))
log.info("Total of {} ingress nodes available\n".format(len(self.params.ing_nodes)))
+ if self.params.eg_nodes:
+ log.info("Total of {} egress nodes available\n".format(len(self.params.eg_nodes)))
for node in self.params.ing_nodes:
node_id = node[0]
self.env.process(self.generate_flow(node_id))
@@ -66,9 +68,13 @@ def generate_flow(self, node_id):
flow_sfc = np.random.choice([sfc for sfc in self.params.sfc_list.keys()])
# Get the flow's creation time (current environment time)
creation_time = self.env.now
+ # Set the egress node for the flow if some are specified in the network file
+ flow_egress_node = None
+ if self.params.eg_nodes:
+ flow_egress_node = random.choice(self.params.eg_nodes)
# Generate flow based on given params
flow = Flow(str(self.total_flow_count), flow_sfc, flow_dr, flow_size, creation_time,
- current_node_id=node_id)
+ current_node_id=node_id, egress_node_id=flow_egress_node)
# Update metrics for the generated flow
self.params.metrics.generated_flow(flow, node_id)
# Generate flows and schedule them at ingress node
@@ -230,10 +236,31 @@ def process_flow(self, flow, sfc):
log.info("Flow {} started departing sf {} at node {}. Time {}"
.format(flow.flow_id, current_sf, current_node_id, self.env.now))
- # Check if flow is currently in last SF, if so, then depart flow.
- if (flow.current_position == len(sfc) - 1):
- yield self.env.timeout(flow.duration)
- self.depart_flow(flow)
+ # Check if flow is currently in last SF, if so, then:
+ # - Check if the flow has some Egress node set or not. If not then just depart. If Yes then:
+ # - check if the current node is the egress node. If Yes then depart. If No then forward the flow to
+ # the egress node using the shortest_path
+
+ if flow.current_position == len(sfc) - 1:
+ if flow.current_node_id == flow.egress_node_id:
+ # Flow is processed and resides at egress node: depart flow
+ yield self.env.timeout(flow.duration)
+ self.depart_flow(flow)
+ elif flow.egress_node_id is None:
+ # Flow is processed and no egress node specified: depart flow
+ log.info(f'Flow {flow.flow_id} has no egress node, will depart from'
+ f' current node {flow.current_node_id}. Time {self.env.now}.')
+ yield self.env.timeout(flow.duration)
+ self.depart_flow(flow)
+ else:
+ # Remove the active flow from the SF after it departed the SF on current node towards egress
+ self.params.metrics.remove_active_flow(flow, current_node_id, current_sf)
+ # Forward flow to the egress node and then depart from there
+ yield self.env.process(self.forward_flow(flow, flow.egress_node_id))
+ yield self.env.timeout(flow.duration)
+ # In this situation the last sf was never active for the egress node,
+ # so we should not remove it from the metrics
+ self.depart_flow(flow, remove_active_flow=False)
else:
# Increment the position of the flow within SFC
flow.current_position += 1
@@ -278,13 +305,14 @@ def process_flow(self, flow, sfc):
self.params.metrics.dropped_flow(flow)
self.env.exit()
- def depart_flow(self, flow):
+ def depart_flow(self, flow, remove_active_flow=True):
"""
Process the flow at the requested SF of the current node.
"""
# Update metrics for the processed flow
self.params.metrics.completed_flow()
self.params.metrics.add_end2end_delay(flow.end2end_delay)
- self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
+ if remove_active_flow:
+ self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
log.info("Flow {} was processed and departed the network from {}. Time {}"
.format(flow.flow_id, flow.current_node_id, self.env.now))
diff --git a/src/coordsim/simulation/simulatorparams.py b/src/coordsim/simulation/simulatorparams.py
index cad59b13a..a2cb7edc0 100644
--- a/src/coordsim/simulation/simulatorparams.py
+++ b/src/coordsim/simulation/simulatorparams.py
@@ -10,12 +10,14 @@
class SimulatorParams:
- def __init__(self, network, ing_nodes, sfc_list, sf_list, config, metrics, prediction=False,
+ def __init__(self, network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics, prediction=False,
schedule=None, sf_placement=None):
# NetworkX network object: DiGraph
self.network = network
# Ingress nodes of the network (nodes at which flows arrive): list
self.ing_nodes = ing_nodes
+ # Egress nodes of the network (nodes at which flows may leave the network): list
+ self.eg_nodes = eg_nodes
# List of available SFCs and their child SFs: defaultdict(None)
self.sfc_list = sfc_list
# List of every SF and it's properties (e.g. processing_delay): defaultdict(None)
diff --git a/src/siminterface/simulator.py b/src/siminterface/simulator.py
index 8cb03e6a5..0732ddd9a 100644
--- a/src/siminterface/simulator.py
+++ b/src/siminterface/simulator.py
@@ -28,7 +28,7 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
# Create CSV writer
self.writer = ResultWriter(self.test_mode, self.test_dir)
# init network, sfc, sf, and config files
- self.network, self.ing_nodes = reader.read_network(self.network_file)
+ self.network, self.ing_nodes, self.eg_nodes = reader.read_network(self.network_file)
self.sfc_list = reader.get_sfc(service_functions_file)
self.sf_list = reader.get_sf(service_functions_file, resource_functions_path)
self.config = reader.get_config(config_file)
@@ -38,8 +38,8 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
# Check if future ingress traffic setting is enabled
if 'future_traffic' in self.config and self.config['future_traffic']:
self.prediction = True
- self.params = SimulatorParams(self.network, self.ing_nodes, self.sfc_list, self.sf_list, self.config,
- self.metrics, prediction=self.prediction)
+ self.params = SimulatorParams(self.network, self.ing_nodes, self.eg_nodes, self.sfc_list, self.sf_list,
+ self.config, self.metrics, prediction=self.prediction)
if self.prediction:
self.predictor = TrafficPredictor(self.params)
self.episode = 0
diff --git a/tests/test_simulator.py b/tests/test_simulator.py
index 4d1b30fdf..ca198d255 100644
--- a/tests/test_simulator.py
+++ b/tests/test_simulator.py
@@ -27,7 +27,7 @@ def setUp(self):
self.env = simpy.Environment()
# Configure simulator parameters
- network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
+ network, ing_nodes, eg_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE)
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH)
config = reader.get_config(CONFIG_FILE)
@@ -38,7 +38,7 @@ def setUp(self):
schedule = dummy_data.triangle_schedule
# Initialize Simulator and SimulatoParams objects
- self.simulator_params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, self.metrics,
+ self.simulator_params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, self.metrics,
sf_placement=sf_placement, schedule=schedule)
self.flow_simulator = FlowSimulator(self.env, self.simulator_params)
self.flow_simulator.start()