Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
}
},
"executions": [],
"scenario": "replica"
"scenario": "replica",
"clustered": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"origin_minion_pool_id": null,
"destination_minion_pool_id": null,
"instance_osmorphing_minion_pool_mappings": {},
"clustered": false,
"executions": [
{
"created_at": "2019-07-11T10:06:47.000000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
"instances": {}
},
"id": "0460aa4d-6b16-4c98-bd56-27ee186e4a22",
"scenario": "replica"
"scenario": "replica",
"clustered": false
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@
"ubuntu-xenial": "echo 'anything you need'"
}
},
"scenario": "replica"
"scenario": "replica",
"clustered": false
}
}
}
9 changes: 9 additions & 0 deletions coriolis/api-refs/source/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ connection_info_schema:
in: body
type: object
required: false
clustered:
description: |
Present on transfer responses. ``true`` when more than one instance is
listed (multi-instance scheduling: sync barriers and shared-disk
coordination). Set by the server at creation from ``instances``; not
accepted on create.
in: body
type: boolean
required: false
deployment_cancel:
description: |
Object containing information about the type of deployment cancellation.
Expand Down
3 changes: 3 additions & 0 deletions coriolis/api-refs/source/transfer.inc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Response
- instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings
- user_scripts : user_scripts
- scenario: scenario_type
- clustered : clustered

**Example of Transfer List Response**

Expand Down Expand Up @@ -111,6 +112,7 @@ Response
- instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings
- user_scripts : user_scripts
- scenario: scenario_type
- clustered : clustered

**Example of Transfer Show Response**

Expand Down Expand Up @@ -183,6 +185,7 @@ Response
- instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings
- user_scripts : user_scripts
- scenario: scenario_type
- clustered : clustered

**Example of Transfer Create Response**

Expand Down
595 changes: 482 additions & 113 deletions coriolis/conductor/rpc/server.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions coriolis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@
TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK"
TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
TASK_STATUS_FAILED_TO_SCHEDULE = "FAILED_TO_SCHEDULE"
TASK_STATUS_SYNCING = "SYNCING"

ACTIVE_TASK_STATUSES = [
TASK_STATUS_PENDING,
TASK_STATUS_STARTING,
TASK_STATUS_RUNNING,
TASK_STATUS_SYNCING,
TASK_STATUS_CANCELLING,
TASK_STATUS_CANCELLING_AFTER_COMPLETION
]
Expand Down Expand Up @@ -182,6 +184,11 @@
TASK_TYPE_POWER_ON_DESTINATION_MINION = "POWER_ON_DESTINATION_MINION"
TASK_TYPE_POWER_OFF_DESTINATION_MINION = "POWER_OFF_DESTINATION_MINION"

TASK_TYPES_TO_SYNC = [
TASK_TYPE_GET_INSTANCE_INFO,
TASK_TYPE_DEPLOY_TRANSFER_DISKS,
TASK_TYPE_SHUTDOWN_INSTANCE,
]

MINION_POOL_OPERATIONS_TASKS = [
TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS,
Expand Down Expand Up @@ -240,6 +247,7 @@
DISK_FORMAT_QCOW2 = 'qcow2'
DISK_FORMAT_VHD = 'vhd'
DISK_FORMAT_VHDX = 'vhdx'
VOLUME_INFO_REPLICATE_DISK_DATA = "replicate_disk_data"

DISK_ALLOCATION_TYPE_STATIC = "static"
DISK_ALLOCATION_TYPE_DYNAMIC = "dynamic"
Expand Down
2 changes: 2 additions & 0 deletions coriolis/db/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ def update_transfer_action_info_for_instance(
""" Updates the info for the given action with the provided dict.
Returns the updated value.
Sub-fields of the dict already in the info will get overwritten entirely!
After merging, volumes_info is updated so it stays aligned with export_info
"""
action = get_action(context, action_id, include_task_info=True)
if not new_instance_info:
Expand Down Expand Up @@ -794,6 +795,7 @@ def update_transfer_action_info_for_instance(

instance_info_old_copy = instance_info_old.copy()
instance_info_old_copy.update(new_instance_info)
utils.sync_instance_volumes_with_export(instance_info_old_copy)
action_info[instance] = instance_info_old_copy
action.info = action_info

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2026 Cloudbase Solutions Srl
# All Rights Reserved.

import sqlalchemy


def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine

base_transfer = sqlalchemy.Table(
'base_transfer_action', meta, autoload=True)
if 'clustered' in base_transfer.c:
return
# server_default so existing rows get a value when the column is added
# (MySQL stores booleans as TINYINT).
clustered = sqlalchemy.Column(
'clustered', sqlalchemy.Boolean, nullable=False,
server_default=sqlalchemy.text('0'))
base_transfer.create_column(clustered)
5 changes: 5 additions & 0 deletions coriolis/db/sqlalchemy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
sqlalchemy.Boolean, nullable=False, default=True)
skip_os_morphing = sqlalchemy.Column(
sqlalchemy.Boolean, nullable=False, default=False)
# Multi-instance transfer: enables cross-instance sync barriers and
# shared-disk handling. Must be set on INSERT (MySQL NOT NULL).
clustered = sqlalchemy.Column(
sqlalchemy.Boolean, nullable=False, default=False)

__mapper_args__ = {
'polymorphic_identity': 'base_transfer_action',
Expand Down Expand Up @@ -320,6 +324,7 @@ def to_dict(self, include_task_info=True, include_executions=True):
"user_scripts": self.user_scripts,
"clone_disks": self.clone_disks,
"skip_os_morphing": self.skip_os_morphing,
"clustered": bool(self.clustered),
}
if include_executions:
for ex in self.executions:
Expand Down
2 changes: 1 addition & 1 deletion coriolis/schemas/disk_sync_resources_info_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
},
"volume_dev": {
"type": "string",
"description": "String device path (ex: /dev/sdc) from within the temporary minion VM where the disk was attached."
"description": "Guest minion device path (e.g. /dev/disk/by-id/...) when the volume is attached; use \"\" for rows that do not represent a transferred block dev yet (e.g. shared-disk non-owners)."
}
},
"required": ["disk_id", "volume_dev"],
Expand Down
5 changes: 5 additions & 0 deletions coriolis/schemas/vm_export_info_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,14 @@
"type": "string",
"description": "The allocation scheme for the given disk (static = thick; dynamic = thin)",
"enum": ["static", "dynamic"]
},
"shareable": {
"type": "boolean",
"description": "Whether the disk is shared (multi-writer) and can be attached to multiple VMs simultaneously."
}
},
"required": [
"id",
"size_bytes"
]
}
Expand Down
6 changes: 6 additions & 0 deletions coriolis/tasks/minion_pool_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,12 @@ class _BaseAttachVolumesToTransferMinionTask(
def _get_volumes_info_from_task_info(cls, task_info):
return task_info["volumes_info"]

def _run(
self, ctxt, instance, origin, destination, task_info,
event_handler):
return super(_BaseAttachVolumesToTransferMinionTask, self)._run(
ctxt, instance, origin, destination, task_info, event_handler)

@classmethod
def get_required_task_info_properties(cls):
fields = super(
Expand Down
30 changes: 22 additions & 8 deletions coriolis/tasks/replica_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,27 @@ def _run(self, ctxt, instance, origin, destination, task_info,
source_environment = task_info['source_environment']

source_resources = task_info.get('source_resources', {})
volumes_info = provider.replicate_disks(
ctxt, connection_info, source_environment, instance,
source_resources, migr_source_conn_info, migr_target_conn_info,
volumes_info, incremental)
schemas.validate_value(
volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
volumes_to_replicate = [
vol for vol in volumes_info
if vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)]
pre_replicated_volumes = [
vol for vol in volumes_info
if not vol.get(constants.VOLUME_INFO_REPLICATE_DISK_DATA, True)]

replicated_volumes = []
if volumes_to_replicate:
replicated_volumes = provider.replicate_disks(
ctxt, connection_info, source_environment, instance,
source_resources, migr_source_conn_info, migr_target_conn_info,
volumes_to_replicate, incremental)
schemas.validate_value(
replicated_volumes, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
else:
LOG.info(
"No disks marked for replication for instance '%s'. "
"Using pre-provisioned volumes_info.", instance)

volumes_info = pre_replicated_volumes + replicated_volumes

volumes_info = _check_ensure_volumes_info_ordering(
export_info, volumes_info)
Expand Down Expand Up @@ -290,10 +305,9 @@ def _run(self, ctxt, instance, origin, destination, task_info,
event_handler)
connection_info = base.get_connection_info(ctxt, destination)

volumes_info = task_info.get("volumes_info", [])
volumes_info = provider.deploy_replica_disks(
ctxt, connection_info, target_environment, instance, export_info,
volumes_info)
task_info.get("volumes_info", []))
schemas.validate_value(
volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)

Expand Down
1 change: 1 addition & 0 deletions coriolis/tests/conductor/rpc/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,7 @@ def call_execute_transfer_tasks():
if has_origin_minion_pool else None,
destination_minion_pool_id=mock.sentinel.destination_minion_pool_id
if has_target_minion_pool else None,
clustered=False,
)
mock_get_transfer.return_value = mock_transfer

Expand Down
2 changes: 2 additions & 0 deletions coriolis/tests/db/sqlalchemy/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def test_to_dict(self):
transfer.info = mock.sentinel.info
transfer.clone_disks = True
transfer.skip_os_morphing = False
transfer.clustered = False
expected_result = {
"base_id": mock.sentinel.base_id,
"user_id": mock.sentinel.user_id,
Expand Down Expand Up @@ -314,6 +315,7 @@ def test_to_dict(self):
"info": mock.sentinel.info,
"clone_disks": True,
"skip_os_morphing": False,
"clustered": False,
}

result = transfer.to_dict()
Expand Down
2 changes: 2 additions & 0 deletions coriolis/tests/tasks/test_replica_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ def test__run(self, mock_unmarshal, mock_check_vol_info, mock_get_vol_info,
task_info.get.side_effect = [
task_info['incremental'], task_info['source_resources']]
prov_fun = mock_get_provider.return_value.replicate_disks
mock_get_vol_info.return_value = [{"disk_id": "disk_id1"}]
prov_fun.return_value = [{"disk_id": "disk_id1"}]
expected_result = {"volumes_info": mock_check_vol_info.return_value}
expected_validation_calls = [
mock.call.mock_validate_value(
Expand Down
80 changes: 80 additions & 0 deletions coriolis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,86 @@ def _exec_retry(*args, **kwargs):
return _retry_on_error


def cluster_disk_identity(disk_id_or_obj):
Comment thread
fabi200123 marked this conversation as resolved.
"""Return a stable key for matching the same disk across cluster nodes.

The provider is expected to return a ``disk_id`` (or ``id`` on export
``devices.disks`` entries) that is identical across all instances that
share the disk. For example, the VMware provider canonicalises the
datastore path of a multi-writer VMDK so every guest reports the same
string. Core just normalizes that value here for case-insensitive
comparison.
"""
if isinstance(disk_id_or_obj, dict):
disk_id = disk_id_or_obj.get("disk_id")
if disk_id is None:
disk_id = disk_id_or_obj.get("id")
else:
disk_id = disk_id_or_obj
if disk_id is None:
return None
s = str(disk_id).strip().lower()
return s if s else None


def ensure_volumes_info_volume_dev_default(volumes_info):
"""Ensure each volume row has a ``volume_dev`` string for schema validation

Use ``""`` when the minion has not attached the volume yet (e.g. non-owner
shared-disk waiters) or the provider did not set the field.
"""
if not volumes_info:
return
for vol in volumes_info:
if not isinstance(vol, dict):
continue
if "volume_dev" not in vol:
vol["volume_dev"] = ""


def sync_instance_volumes_with_export(instance_info):
"""Fill in volume list fields that should follow export_info.

Sets default ``volume_dev`` when missing, and copies the ``shareable``
flag from ``export_info['devices']['disks']`` onto each
``volumes_info`` entry where it applies. Safe to call more than once.

Invoked from ``update_transfer_action_info_for_instance`` after merging
new instance info, and in the conductor right before it sends
``task_info`` to a worker, so the DB and RPC payload stay aligned.
"""
if not isinstance(instance_info, dict):
return
vols = instance_info.get("volumes_info")
if vols is None:
return
ensure_volumes_info_volume_dev_default(vols)
export = instance_info.get("export_info")
if not export:
return
apply_export_disk_shareable_metadata_to_volumes_info(export, vols)


def apply_export_disk_shareable_metadata_to_volumes_info(
export_info, volumes_info):
"""Propagate shareable from export_info disks to volumes_info entries."""
if not export_info or not volumes_info:
return
disks = export_info.get("devices", {}).get("disks") or []
share_idents = set()
for d in disks:
if d.get("shareable"):
cid = cluster_disk_identity(d)
if cid:
share_idents.add(cid)
if not share_idents:
return
for vol in volumes_info:
cid = cluster_disk_identity(vol)
if cid and cid in share_idents:
vol["shareable"] = True


def get_udev_net_rules(net_ifaces_info):
content = ""
for name, mac_address in net_ifaces_info.items():
Expand Down
Loading