backup_task: remove a component once it is uploaded

Previously, during backup, SSTable components are preserved in the
snapshot directory even after being uploaded. This leads to redundant
uploads in case of failed backups or restarts, wasting time and
resources (S3 API calls).

This change

- adds an optional query parameter named "move_files" to
  "/storage_service/backup" API. if it is set to "true", SSTable
  components are removed once they are backed up to object storage.
- conditionally removes SSTable components from the snapshot directory once
  they are successfully uploaded to the target location. This prevents
  re-uploading the same files and reduces disk usage.

This change only "Refs" #20655, because, we can move further optimize
the backup process, consider:

- Sending HEAD requests to S3 to check for existing files before uploading.
- Implementing support for resuming partially uploaded files.

Fixes #21799
Refs #20655

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
Kefu Chai
2025-01-20 15:12:36 +08:00
parent 32d22371b9
commit 8080658df7
8 changed files with 79 additions and 9 deletions

View File

@@ -813,6 +813,14 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"move_files",
"description":"Move component files instead of copying them",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}

View File

@@ -1799,13 +1799,14 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto bucket = req->get_query_param("bucket");
auto prefix = req->get_query_param("prefix");
auto snapshot_name = req->get_query_param("snapshot");
auto move_files = req_param<bool>(*req, "move_files", false);
if (snapshot_name.empty()) {
// TODO: If missing, snapshot should be taken by scylla, then removed
throw httpd::bad_param_exception("The snapshot name must be specified");
}
auto& ctl = snap_ctl.local();
auto task_id = co_await ctl.start_backup(std::move(endpoint), std::move(bucket), std::move(prefix), std::move(keyspace), std::move(table), std::move(snapshot_name));
auto task_id = co_await ctl.start_backup(std::move(endpoint), std::move(bucket), std::move(prefix), std::move(keyspace), std::move(table), std::move(snapshot_name), move_files);
co_return json::json_return_type(fmt::to_string(task_id));
});

View File

@@ -137,10 +137,10 @@ future<int64_t> snapshot_ctl::true_snapshots_size() {
}));
}
future<tasks::task_id> snapshot_ctl::start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name) {
future<tasks::task_id> snapshot_ctl::start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name, bool move_files) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&](auto& local) {
return local.start_backup(endpoint, bucket, prefix, keyspace, table, snapshot_name);
return local.start_backup(endpoint, bucket, prefix, keyspace, table, snapshot_name, move_files);
});
}
@@ -175,7 +175,7 @@ future<tasks::task_id> snapshot_ctl::start_backup(sstring endpoint, sstring buck
sstables::snapshots_dir /
std::string_view(snapshot_name));
auto task = co_await _task_manager_module->make_and_start_task<::db::snapshot::backup_task_impl>(
{}, *this, std::move(cln), std::move(bucket), std::move(prefix), keyspace, dir);
{}, *this, std::move(cln), std::move(bucket), std::move(prefix), keyspace, dir, move_files);
co_return task->id();
}

View File

@@ -105,7 +105,7 @@ public:
*/
future<> clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name);
future<tasks::task_id> start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name);
future<tasks::task_id> start_backup(sstring endpoint, sstring bucket, sstring prefix, sstring keyspace, sstring table, sstring snapshot_name, bool move_files);
future<std::unordered_map<sstring, db_snapshot_details>> get_snapshot_details();

View File

@@ -30,13 +30,15 @@ backup_task_impl::backup_task_impl(tasks::task_manager::module_ptr module,
sstring bucket,
sstring prefix,
sstring ks,
std::filesystem::path snapshot_dir) noexcept
std::filesystem::path snapshot_dir,
bool move_files) noexcept
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
, _snap_ctl(ctl)
, _client(std::move(client))
, _bucket(std::move(bucket))
, _prefix(std::move(prefix))
, _snapshot_dir(std::move(snapshot_dir)) {
, _snapshot_dir(std::move(snapshot_dir))
, _remove_on_uploaded(move_files) {
_status.progress_units = "bytes ('total' may grow along the way)";
}
@@ -79,6 +81,22 @@ future<> backup_task_impl::upload_component(sstring name) {
snap_log.error("Error uploading {}: {}", component_name.native(), std::current_exception());
throw;
}
if (!_remove_on_uploaded) {
co_return;
}
// Delete the uploaded component to:
// 1. Free up disk space immediately
// 2. Avoid costly S3 existence checks on future backup attempts
try {
co_await remove_file(component_name.native());
} catch (...) {
// If deletion of an uploaded file fails, the backup process will continue.
// While this doesn't halt the backup, it may indicate filesystem permissions
// issues or system constraints that should be investigated.
snap_log.warn("Failed to remove {}: {}", component_name, std::current_exception());
}
}
future<> backup_task_impl::do_backup() {

View File

@@ -24,6 +24,7 @@ class backup_task_impl : public tasks::task_manager::task::impl {
sstring _bucket;
sstring _prefix;
std::filesystem::path _snapshot_dir;
bool _remove_on_uploaded;
s3::upload_progress _progress = {};
future<> do_backup();
@@ -39,7 +40,8 @@ public:
sstring bucket,
sstring prefix,
sstring ks,
std::filesystem::path snapshot_dir) noexcept;
std::filesystem::path snapshot_dir,
bool move_files) noexcept;
virtual std::string type() const override;
virtual tasks::is_internal is_internal() const noexcept override;

View File

@@ -89,6 +89,40 @@ async def test_simple_backup(manager: ManagerClient, s3_server):
assert len(res) == 1 and res[0][1].group(1) == 'strm'
@pytest.mark.asyncio
@pytest.mark.parametrize("move_files", [False, True])
async def test_backup_move(manager: ManagerClient, s3_server, move_files):
'''check that backing up a snapshot by _moving_ sstable to object storage'''
cfg = {'enable_user_defined_functions': False,
'object_storage_config_file': str(s3_server.config_file),
'experimental_features': ['keyspace-storage-options'],
'task_ttl_in_seconds': 300
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
server = await manager.server_add(config=cfg, cmdline=cmd)
ks, cf = await prepare_snapshot_for_backup(manager, server)
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
assert len(files) > 0
print('Backup snapshot')
prefix = f'{cf}/backup'
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix,
move_files=move_files)
print(f'Started task {tid}')
status = await manager.api.get_task_status(server.ip_addr, tid)
print(f'Status: {status}, waiting to finish')
status = await manager.api.wait_task(server.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
assert (status['progress_total'] > 0) and (status['progress_completed'] == status['progress_total'])
# all components in the "backup" snapshot should have been moved into bucket if move_files
assert len(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup')) == 0 if move_files else len(files)
@pytest.mark.asyncio
async def test_backup_to_non_existent_bucket(manager: ManagerClient, s3_server):
'''backup should fail if the destination bucket does not exist'''

View File

@@ -318,7 +318,7 @@ class ScyllaRESTAPIClient():
"""Flush all keyspaces"""
await self.client.post(f"/storage_service/flush", host=node_ip)
async def backup(self, node_ip: str, ks: str, table: str, tag: str, dest: str, bucket: str, prefix: str) -> str:
async def backup(self, node_ip: str, ks: str, table: str, tag: str, dest: str, bucket: str, prefix: str, **kwargs) -> str:
"""Backup keyspace's snapshot"""
params = {"keyspace": ks,
"table": table,
@@ -326,6 +326,13 @@ class ScyllaRESTAPIClient():
"bucket": bucket,
"prefix": prefix,
"snapshot": tag}
# add optional args. for instance, "move_files".
for key, value in kwargs.items():
if isinstance(value, bool):
params[key] = 'true' if value else 'false'
else:
assert any(isinstance(value, t) for t in (str, int, float))
params[key] = value
return await self.client.post_json(f"/storage_service/backup", host=node_ip, params=params)
async def restore(self, node_ip: str, ks: str, cf: str, dest: str, bucket: str, prefix: str, sstables: list[str], scope: str = None) -> str: