sstables::object_storage_client: Add multi-upload support for GS

Uses file splitting + object merge to facilitate parallel, resumable
upload of files with known size.
This commit is contained in:
Calle Wilund
2025-09-30 04:26:47 +00:00
parent bd1304972c
commit 5e4e5b1f4a
4 changed files with 168 additions and 13 deletions

View File

@@ -526,6 +526,7 @@ scylla_tests = set([
'test/boost/mutation_test',
'test/boost/mvcc_test',
'test/boost/nonwrapping_interval_test',
'test/boost/object_storage_upload_test',
'test/boost/observable_test',
'test/boost/partitioner_test',
'test/boost/pretty_printers_test',

View File

@@ -210,21 +210,73 @@ public:
future<> upload_file(std::filesystem::path path, object_name name, utils::upload_progress& up, seastar::abort_source* as) override {
auto f = co_await open_file_dma(path.string(), open_flags::ro);
auto s = co_await f.stat();
up.total += s.st_size;
uint64_t pos = 0;
auto sink = make_upload_sink(std::move(name), as);
for (;;) {
auto buf = co_await f.dma_read_bulk<char>(pos, 64*1024);
auto n = buf.size();
if (n == 0) {
break;
uint64_t size = s.st_size;
up.total += size;
auto upload_one = [this, as, &up, &f](object_name name, uint64_t offset, uint64_t size) -> future<> {
uint64_t pos = offset;
auto sink = make_upload_sink(std::move(name), as);
while (size > 0) {
auto rem = std::min(size, size_t(64*1024));
auto buf = co_await f.dma_read_bulk<char>(pos, rem);
auto n = buf.size();
if (n == 0) {
break;
}
co_await sink.put(std::move(buf));
up.uploaded += n;
pos += n;
size -= n;
}
co_await sink.put(std::move(buf));
up.uploaded += n;
pos += n;
co_await sink.flush();
co_await sink.close();
};
constexpr size_t chunk_size = 8*1024*1024;
if (size <= chunk_size) {
co_await upload_one(std::move(name), 0, size);
co_await f.close();
} else {
size_t cc = chunk_size;
while ((cc * 32) < size) {
cc <<= 1;
}
struct part {
std::string name;
uint64_t off, size;
};
std::vector<part> ranges;
auto object = name.object();
auto bucket = name.bucket();
for (uint64_t off = 0; off < size;) {
auto rem = std::min(size - off, cc);
auto subname = fmt::format("{}-temp-{}-{}", object, off, off+rem);
ranges.emplace_back(part{subname, off, rem});
off += rem;
}
auto existing = (co_await _client->list_objects(bucket, fmt::format("{}-temp-", object)))
| std::views::transform([](auto& info) { return info.name; })
| std::ranges::to<std::unordered_set<std::string>>()
;
co_await parallel_for_each(ranges, [bucket, &upload_one, &existing](const part& p) -> future<> {
if (!existing.count(p.name)) {
co_await upload_one(object_name(bucket, p.name), p.off, p.size);
}
});
co_await f.close();
auto names = ranges | std::views::transform([](auto& p) { return p.name; }) | std::ranges::to<std::vector<std::string>>();
co_await _client->merge_objects(bucket, object, std::move(names), {}, as);
co_await parallel_for_each(names, [this, bucket](auto& name) -> future<> {
co_await _client->delete_object(bucket, name);
});
}
co_await sink.flush();
co_await sink.close();
}
future<> update_config(const db::object_storage_endpoint_param& ep) override {
auto client = std::exchange(_client, make_gcs_client(ep));

View File

@@ -306,6 +306,8 @@ add_scylla_test(wrapping_interval_test
KIND BOOST)
add_scylla_test(address_map_test
KIND SEASTAR)
add_scylla_test(object_storage_upload_test
KIND SEASTAR)
add_scylla_test(combined_tests
KIND SEASTAR

View File

@@ -0,0 +1,100 @@
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/assert.hh"
#include <fmt/ranges.h>
#include <seastar/core/future.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/test_fixture.hh>
#include "db/config.hh"
#include "sstables/object_storage_client.hh"
#include "sstables/storage.hh"
#include "utils/upload_progress.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/sstable_test_env.hh"
#include "test/lib/gcs_fixture.hh"
namespace fs = std::filesystem;
using namespace sstables;
using namespace tests;
#if 1
static future<> create_file_of_size(fs::path file, size_t dest_size) {
auto f = co_await seastar::open_file_dma(file.string(), open_flags::wo|open_flags::create);
auto os = co_await make_file_output_stream(std::move(f));
size_t done = 0;
while (done < dest_size) {
auto rem = dest_size - done;
auto len = std::min(rem, size_t(8*1024));
auto rnd = tests::random::get_bytes(len);
for (size_t i = 0; i < rnd.size(); ++i) {
//rnd[i] = "ABCDEFGHIJKLMNO"[i % 16];
}
auto data = reinterpret_cast<char*>(rnd.data());
co_await os.write(data, len);
done += len;
}
co_await os.flush();
co_await os.close();
}
#endif
static future<> compare_streams(input_stream<char>& is1, input_stream<char>& is2, size_t total) {
uint64_t read = 0;
while (!is1.eof()) {
auto buf = co_await is1.read();
if (buf.empty()) {
break;
}
auto buf2 = co_await is2.read_exactly(buf.size());
BOOST_REQUIRE_EQUAL(buf, buf2);
read += buf.size();
}
BOOST_REQUIRE((co_await is2.read()).empty());
BOOST_REQUIRE_EQUAL(read, total);
}
future<> test_file_upload(test_env_config cfg, size_t size) {
auto bucket = cfg.storage.to_map()["bucket"];
return test_env::do_with_async([size, bucket] (test_env& env) {
auto ep = env.db_config().object_storage_endpoints().front().key();
auto client = env.manager().get_endpoint_client(ep);
tmpdir tmp;
auto path = tmp.path() / "testfile";
object_name name(bucket, "testfile");
utils::upload_progress up;
create_file_of_size(path, size).get();
client->upload_file(path, name, up).get();
auto source = client->make_download_source(name);
auto is1 = make_file_input_stream(seastar::open_file_dma(path.string(), open_flags::ro).get());
auto is2 = input_stream<char>(std::move(source));
compare_streams(is1, is2, size).get();
is1.close().get();
is2.close().get();
}, std::move(cfg));
}
constexpr auto large_size = 256 * 1024 * 1024 + 351;
SEASTAR_TEST_CASE(test_large_file_upload_s3, *boost::unit_test::precondition(tests::has_scylla_test_env)) {
return test_file_upload(test_env_config{ .storage = make_test_object_storage_options("S3") }, large_size);
}
SEASTAR_FIXTURE_TEST_CASE(test_large_file_upload_gs, gcs_fixture, *check_run_test_decorator("ENABLE_GCP_STORAGE_TEST", true)) {
return test_file_upload(test_env_config{ .storage = make_test_object_storage_options("GS") }, large_size);
}