Skip to content

Commit 37db355

Browse files
committed
Add post-transfer disk checksum verification
After each disk is written, compare the source (replicator) and destination (writer) checksums to catch any data corruption during transfer. The comparison happens while the writer device is still acquired so the checksum job can run. - Client.get_disk_checksum: calls GET /api/v1/dev/{disk}/checksum - HTTPBackupWriterImpl: - _create_checksum_job: calls POST /api/v2/device/{disk}/checksumJob - _delete_checksum_job: calls DELETE /api/v2/device/{disk}/checksumJob/{id} - _get_checksum_job_status: calls GET /api/v2/device/{disk}/checksumJob/{id} - get_disk_checksum: creates checksum job, waits for it to finish, and returns the checksum value and algorithm. - Replicator._verify_disk_checksum: compares both sides, raises on algorithm or value mismatch.
1 parent dae596f commit 37db355

File tree

5 files changed

+397
-3
lines changed

5 files changed

+397
-3
lines changed

coriolis/providers/backup_writers.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
cfg.BoolOpt('compress_transfers',
3232
default=True,
3333
help='Use compression if possible during disk transfers'),
34+
cfg.IntOpt('disk_checksum_timeout',
35+
default=3600,
36+
help='Maximum number of seconds to wait for a disk checksum '
37+
'job to complete on the backup writer. Larger disks may '
38+
'require a higher value.'),
3439
]
3540
CONF.register_opts(opts)
3641
_CORIOLIS_HTTP_WRITER_CMD = "coriolis-writer"
@@ -66,6 +71,10 @@
6671
15: "ERR_OUT_OF_BOUDS",
6772
}
6873

74+
_CHECKSUM_JOB_POLL_INTERVAL = 15 # seconds between writer checksum job polls
75+
_CHECKSUM_JOB_FINISHED = "finished"
76+
_CHECKSUM_JOB_FAILED = "failed"
77+
6978

7079
def _disable_lvm2_lvmetad(ssh):
7180
"""Disables lvm2-lvmetad service. This service is responsible
@@ -193,6 +202,10 @@ def write(self, data):
193202
def close(self):
194203
pass
195204

205+
def get_disk_checksum(self, algorithm):
206+
"""Returns the destination disk checksum, or None if unsupported."""
207+
return None
208+
196209

197210
class BaseBackupWriter(with_metaclass(abc.ABCMeta)):
198211
@abc.abstractmethod
@@ -735,6 +748,101 @@ def _wait_for_queues(self):
735748
LOG.info("Waiting for unfinished transfers to complete")
736749
time.sleep(0.5)
737750

751+
def _create_checksum_job(self, algorithm, start_offset=0, end_offset=0):
752+
"""Creates a full-disk checksum job on the writer.
753+
754+
The device must already be acquired.
755+
756+
:param algorithm: Checksumming algorithm to use.
757+
:param start_offset: Checksumming starts from this offset.
758+
:param end_offset: Checksumming stop at this offset.
759+
:returns: job ID string.
760+
"""
761+
self._ensure_session()
762+
uri = "%s/checksumJob" % self._uri
763+
headers = {"X-Client-Token": self._id}
764+
body = {
765+
"start_offset": start_offset,
766+
"end_offset": end_offset,
767+
"checksum_algorithm": algorithm,
768+
}
769+
770+
resp = self._session.post(
771+
uri, headers=headers, json=body,
772+
timeout=CONF.default_requests_timeout)
773+
resp.raise_for_status()
774+
775+
return resp.json()["job_id"]
776+
777+
def _get_checksum_job_status(self, job_id):
778+
"""Returns the current status of a writer checksum job."""
779+
self._ensure_session()
780+
uri = "%s/checksumJob/%s" % (self._uri, job_id)
781+
782+
resp = self._session.get(
783+
uri, timeout=CONF.default_requests_timeout)
784+
resp.raise_for_status()
785+
786+
return resp.json()
787+
788+
def _delete_checksum_job(self, job_id):
789+
"""Deletes a writer checksum job."""
790+
self._ensure_session()
791+
uri = "%s/checksumJob/%s" % (self._uri, job_id)
792+
793+
resp = self._session.delete(
794+
uri, timeout=CONF.default_requests_timeout)
795+
resp.raise_for_status()
796+
797+
def get_disk_checksum(self, algorithm, start_offset=0, end_offset=0):
798+
"""Computes and returns the checksum of the entire destination disk.
799+
800+
Must be called while the device is acquired (inside open() context).
801+
Flushes all pending writes before starting the checksum job.
802+
803+
:param algorithm: Checksumming algorithm to use.
804+
:param start_offset: Checksumming starts from this offset.
805+
:param end_offset: Checksumming stop at this offset.
806+
:returns: dict with 'checksum' and 'algorithm' keys.
807+
"""
808+
self._wait_for_queues()
809+
if self._exception:
810+
raise exception.CoriolisException(
811+
"Cannot checksum disk '%s', pending write error: %s" % (
812+
self._path, self._exception))
813+
814+
timeout = CONF.disk_checksum_timeout
815+
deadline = time.monotonic() + timeout
816+
job_id = self._create_checksum_job(algorithm)
817+
try:
818+
while True:
819+
status = self._get_checksum_job_status(job_id)
820+
execution_status = status.get("execution_status")
821+
if execution_status == _CHECKSUM_JOB_FINISHED:
822+
return {
823+
"checksum": status["checksum_value"],
824+
"algorithm": status["checksum_algorithm"],
825+
}
826+
827+
if execution_status == _CHECKSUM_JOB_FAILED:
828+
raise exception.CoriolisException(
829+
"Checksum job failed for disk '%s': %s" % (
830+
self._path, status.get("error_message", "")))
831+
832+
if time.monotonic() >= deadline:
833+
raise exception.CoriolisException(
834+
"Timed out waiting for checksum job for disk '%s' "
835+
"after %d seconds." % (self._path, timeout))
836+
837+
time.sleep(_CHECKSUM_JOB_POLL_INTERVAL)
838+
finally:
839+
try:
840+
self._delete_checksum_job(job_id)
841+
except Exception:
842+
LOG.warning(
843+
"Failed to delete checksum job %s for disk %s",
844+
job_id, self._path)
845+
738846
def close(self):
739847
self._closing = True
740848
self._wait_for_queues()

coriolis/providers/replicator.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,22 @@ def get_disk_size(self, disk):
204204
info.raise_for_status()
205205
return int(info.headers["Content-Length"])
206206

207+
@utils.retry_on_error()
208+
def get_disk_checksum(self, device):
209+
"""Returns the total checksum of the given disk.
210+
211+
:raises HTTPError: with HTTP 409 status if checksumming has not
212+
completed yet.
213+
:returns: dict with 'checksum' and 'algorithm' keys.
214+
"""
215+
uri = "%s/api/v1/dev/%s/checksum" % (self._base_uri, device)
216+
217+
resp = self._cli.get(
218+
uri, timeout=CONF.replicator.default_requests_timeout)
219+
resp.raise_for_status()
220+
221+
return resp.json()
222+
207223
@utils.retry_on_error()
208224
def download_chunk(self, disk, chunk):
209225
diskUri = self.raw_disk_uri(disk)
@@ -768,7 +784,42 @@ def _find_vol_state(self, name, state):
768784
return vol
769785
return None
770786

771-
def replicate_disks(self, source_volumes_info, backup_writer):
787+
def _verify_disk_checksum(self, dev_name, destination):
788+
"""Compares source and destination checksums for a transferred disk.
789+
790+
Must be called while the device is still acquired on the writer side.
791+
792+
:raises CoriolisException: if the checksum algorithms do not match, or
793+
if the checksums do not match.
794+
"""
795+
self._event_manager.progress_update(
796+
"Verifying disk integrity for /dev/%s" % dev_name)
797+
source = self._cli.get_disk_checksum(dev_name)
798+
writer = destination.get_disk_checksum(source["algorithm"])
799+
if writer is None:
800+
self._event_manager.progress_update(
801+
"Disk integrity check skipped for /dev/%s "
802+
"(writer does not support checksums)" % dev_name)
803+
return
804+
805+
if source["algorithm"] != writer["algorithm"]:
806+
raise exception.CoriolisException(
807+
"Checksum algorithm mismatch for disk '%s': "
808+
"source=%s, destination=%s" % (
809+
dev_name, source["algorithm"], writer["algorithm"]))
810+
811+
if source["checksum"] != writer["checksum"]:
812+
raise exception.CoriolisException(
813+
"Checksum mismatch for disk '%s': "
814+
"source=%s, destination=%s" % (
815+
dev_name, source["checksum"], writer["checksum"]))
816+
817+
self._event_manager.progress_update(
818+
"Disk integrity verified for /dev/%s (checksum: %s)" % (
819+
dev_name, source["checksum"]))
820+
821+
def replicate_disks(
822+
self, source_volumes_info, backup_writer, verify_checksum=False):
772823
"""
773824
Fetch the block diff and send it to the backup_writer.
774825
If the target_is_zeroed parameter is set to True, on initial
@@ -845,6 +896,8 @@ def replicate_disks(self, source_volumes_info, backup_writer):
845896
total += 1
846897
self._event_manager.set_percentage_step(
847898
perc_step, total)
899+
if verify_checksum:
900+
self._verify_disk_checksum(devName, destination)
848901
dst_vol["replica_state"] = state_for_vol
849902

850903
self._repl_state = curr_state

coriolis/resources/bin/replicator

-1.62 MB
Binary file not shown.

coriolis/tests/providers/test_backup_writers.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,153 @@ def test_close_with_exception(self, mock_release, mock_wait_for_queues):
10611061
level=logging.ERROR):
10621062
self.assertRaises(exception.CoriolisException, self.writer.close)
10631063

1064+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
1065+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
1066+
new_callable=mock.PropertyMock)
1067+
@mock.patch.object(backup_writers, 'CONF')
1068+
def test__create_checksum_job(self, mock_conf, mock_uri,
1069+
mock_ensure_session):
1070+
self.writer._set_info(self.info)
1071+
mock_uri.return_value = "https://host:port/api/v2/device/b64path"
1072+
self.writer._session = mock.MagicMock()
1073+
mock_resp = mock.MagicMock()
1074+
mock_resp.json.return_value = {"job_id": "test-job-id"}
1075+
self.writer._session.post.return_value = mock_resp
1076+
1077+
result = self.writer._create_checksum_job("sha256")
1078+
1079+
self.assertEqual("test-job-id", result)
1080+
self.writer._session.post.assert_called_once_with(
1081+
"https://host:port/api/v2/device/b64path/checksumJob",
1082+
headers={"X-Client-Token": self.info["id"]},
1083+
json={"start_offset": 0, "end_offset": 0,
1084+
"checksum_algorithm": "sha256"},
1085+
timeout=mock_conf.default_requests_timeout)
1086+
mock_resp.raise_for_status.assert_called_once()
1087+
1088+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
1089+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
1090+
new_callable=mock.PropertyMock)
1091+
@mock.patch.object(backup_writers, 'CONF')
1092+
def test__get_checksum_job_status(self, mock_conf, mock_uri,
1093+
mock_ensure_session):
1094+
self.writer._set_info(self.info)
1095+
mock_uri.return_value = "https://host:port/api/v2/device/b64path"
1096+
self.writer._session = mock.MagicMock()
1097+
mock_resp = mock.MagicMock()
1098+
self.writer._session.get.return_value = mock_resp
1099+
1100+
result = self.writer._get_checksum_job_status("test-job-id")
1101+
1102+
self.assertEqual(result, mock_resp.json.return_value)
1103+
self.writer._session.get.assert_called_once_with(
1104+
"https://host:port/api/v2/device/b64path/checksumJob/test-job-id",
1105+
timeout=mock_conf.default_requests_timeout)
1106+
mock_resp.raise_for_status.assert_called_once()
1107+
1108+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_ensure_session')
1109+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_uri',
1110+
new_callable=mock.PropertyMock)
1111+
@mock.patch.object(backup_writers, 'CONF')
1112+
def test__delete_checksum_job(self, mock_conf, mock_uri,
1113+
mock_ensure_session):
1114+
self.writer._set_info(self.info)
1115+
mock_uri.return_value = "https://host:port/api/v2/device/b64path"
1116+
self.writer._session = mock.MagicMock()
1117+
mock_resp = mock.MagicMock()
1118+
self.writer._session.delete.return_value = mock_resp
1119+
1120+
self.writer._delete_checksum_job("test-job-id")
1121+
1122+
self.writer._session.delete.assert_called_once_with(
1123+
"https://host:port/api/v2/device/b64path/checksumJob/test-job-id",
1124+
timeout=mock_conf.default_requests_timeout)
1125+
mock_resp.raise_for_status.assert_called_once()
1126+
1127+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1128+
'_delete_checksum_job')
1129+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1130+
'_get_checksum_job_status')
1131+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1132+
'_create_checksum_job')
1133+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
1134+
def test_get_disk_checksum(self, mock_wait, mock_create, mock_status,
1135+
mock_delete):
1136+
self.writer._set_info(self.info)
1137+
mock_create.return_value = "test-job-id"
1138+
mock_status.return_value = {
1139+
"execution_status": "finished",
1140+
"checksum_value": "abc123",
1141+
"checksum_algorithm": "sha256",
1142+
}
1143+
1144+
result = self.writer.get_disk_checksum("sha256")
1145+
1146+
self.assertEqual({"checksum": "abc123", "algorithm": "sha256"}, result)
1147+
mock_wait.assert_called_once()
1148+
mock_create.assert_called_once_with("sha256")
1149+
mock_delete.assert_called_once_with("test-job-id")
1150+
1151+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1152+
'_delete_checksum_job')
1153+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1154+
'_get_checksum_job_status')
1155+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1156+
'_create_checksum_job')
1157+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
1158+
def test_get_disk_checksum_job_failed(self, mock_wait, mock_create,
1159+
mock_status, mock_delete):
1160+
self.writer._set_info(self.info)
1161+
mock_create.return_value = "test-job-id"
1162+
mock_status.return_value = {
1163+
"execution_status": "failed",
1164+
"error_message": "disk error",
1165+
}
1166+
1167+
self.assertRaises(
1168+
exception.CoriolisException,
1169+
self.writer.get_disk_checksum, "sha256")
1170+
mock_delete.assert_called_once_with("test-job-id")
1171+
1172+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1173+
'_delete_checksum_job')
1174+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1175+
'_create_checksum_job')
1176+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
1177+
def test_get_disk_checksum_write_error(self, mock_wait, mock_create,
1178+
mock_delete):
1179+
self.writer._set_info(self.info)
1180+
self.writer._exception = exception.CoriolisException("write failed")
1181+
1182+
self.assertRaises(
1183+
exception.CoriolisException,
1184+
self.writer.get_disk_checksum, "sha256")
1185+
mock_create.assert_not_called()
1186+
mock_delete.assert_not_called()
1187+
1188+
@mock.patch('coriolis.providers.backup_writers.time.sleep')
1189+
@mock.patch('coriolis.providers.backup_writers.time.monotonic')
1190+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1191+
'_delete_checksum_job')
1192+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1193+
'_get_checksum_job_status')
1194+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl,
1195+
'_create_checksum_job')
1196+
@mock.patch.object(backup_writers.HTTPBackupWriterImpl, '_wait_for_queues')
1197+
def test_get_disk_checksum_timeout(self, mock_wait, mock_create,
1198+
mock_status, mock_delete,
1199+
mock_monotonic, mock_sleep):
1200+
self.writer._set_info(self.info)
1201+
mock_create.return_value = "test-job-id"
1202+
mock_status.return_value = {"execution_status": "running"}
1203+
# First call sets the deadline; second call (after the poll) exceeds it
1204+
mock_monotonic.side_effect = [0, 3601]
1205+
1206+
self.assertRaises(
1207+
exception.CoriolisException,
1208+
self.writer.get_disk_checksum, "sha256")
1209+
mock_delete.assert_called_once_with("test-job-id")
1210+
10641211

10651212
class HTTPBackupWriterBootstrapperTestcase(test_base.CoriolisBaseTestCase):
10661213
"""Test suite for the Coriolis HTTPBackupWriterBootstrapper class."""

0 commit comments

Comments
 (0)