Add suite test to check incremental restore works well

This commit is contained in:
Olivier 'reivilibre' 2021-09-13 18:45:03 +01:00
parent ff7e3b83ea
commit a1e17454d7

View File

@ -1,10 +1,16 @@
import subprocess
import time
from pathlib import Path
from random import Random
from tempfile import TemporaryDirectory
from unittest import TestCase
from helpers import DirectoryDescriptor, generate_random_dir, scan_dir
from helpers import (
DirectoryDescriptor,
generate_random_dir,
randomly_mutate_directory_in_descriptor,
scan_dir,
)
from helpers.datman_helpers import get_hostname, set_up_simple_datman
from helpers.yama_helpers import set_up_simple_yama
@ -129,3 +135,90 @@ kind = {{ stdout = "blahblah.txt" }}
self.assertIn("\x1b[31m\x1b[1mFAILED", last_line)
td.cleanup()
def test_backup_incremental(self):
td = TemporaryDirectory("test_backup_incremental")
tdpath = Path(td.name)
datman_path = tdpath.joinpath("datman")
src_path = datman_path.joinpath("srca")
yama_path = datman_path.joinpath("main")
set_up_simple_datman(datman_path)
set_up_simple_yama(yama_path)
rng = Random()
seed = rng.randint(0, 9001)
print(f"seed: {seed}")
rng.seed(seed)
initial_descriptor, _ = generate_random_dir(rng, src_path, 32)
print("storing")
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
# now mutate and store incremental
randomly_mutate_directory_in_descriptor(initial_descriptor, src_path, rng)
mutated_descriptor = scan_dir(src_path)
time.sleep(2)
timestamp_formatted = time.strftime("%Y-%m-%d %H:%M:%S")
time.sleep(2)
subprocess.check_call(("datman", "backup-one", "srca", "main"), cwd=datman_path)
print("extracting")
dest1_path = tdpath.joinpath("desta1")
dest2_path = tdpath.joinpath("desta2")
subprocess.check_call(
(
"datman",
"extract",
"--skip-metadata",
"--accept-partial",
"--before",
timestamp_formatted,
"main",
"../desta1",
),
cwd=datman_path,
)
subprocess.check_call(
(
"datman",
"extract",
"--skip-metadata",
"--accept-partial",
"main",
"../desta2",
),
cwd=datman_path,
)
# this will be wrapped in a directory that starts with the name srca+
extracted_dir_descriptor_wrapper = scan_dir(dest1_path)
contents = extracted_dir_descriptor_wrapper.contents
self.assertEqual(len(contents), 1)
key, value = next(iter(contents.items()))
self.assertTrue(key.startswith("srca+"))
self.assertIsInstance(value, DirectoryDescriptor)
key, value = next(iter(value.contents.items()))
self.assertEqual(key, "srca")
self.assertEqual(value.ignore_metadata(), initial_descriptor.ignore_metadata())
extracted_dir_descriptor_wrapper = scan_dir(dest2_path)
contents = extracted_dir_descriptor_wrapper.contents
self.assertEqual(len(contents), 1)
key, value = next(iter(contents.items()))
self.assertTrue(key.startswith("srca+"))
self.assertIsInstance(value, DirectoryDescriptor)
key, value = next(iter(value.contents.items()))
self.assertEqual(key, "srca")
self.assertEqual(value.ignore_metadata(), mutated_descriptor.ignore_metadata())
td.cleanup()