Merge 'Enhance s3 client perf test with "uploading" facility and related tunables' from Pavel Emelyanov

The existing test measures latencies of object GET-s. That's nice (though incomplete), but we want to measure upload performance. Here it is.

refs: #22460

Closes scylladb/scylladb#22480

* github.com:scylladb/scylladb:
  test/perf/s3: Add --part-size-mb option for upload test
  test/perf/s3: Add uploading test
  test/perf/s3: Some renames not to be download-centric
  test/perf/s3: Make object/file name configurable
  test/perf/s3: Configure maximum number of sockets
  test/perf/s3: Remove parallelizm
  s3/client: Make http client connections limit configurable
This commit is contained in:
Botond Dénes
2025-02-17 09:46:10 +02:00
3 changed files with 55 additions and 26 deletions

View File

@@ -20,19 +20,21 @@ seastar::logger plog("perf");
class tester {
std::chrono::seconds _duration;
unsigned _parallel;
std::string _object_name;
size_t _object_size;
semaphore _mem;
shared_ptr<s3::client> _client;
utils::estimated_histogram _reads_hist;
utils::estimated_histogram _latencies;
unsigned _errors = 0;
unsigned _part_size_mb;
bool _remove_file;
static s3::endpoint_config_ptr make_config() {
static s3::endpoint_config_ptr make_config(unsigned sockets) {
s3::endpoint_config cfg;
cfg.port = 443;
cfg.use_https = true;
cfg.region = tests::getenv_safe("AWS_DEFAULT_REGION");
cfg.max_connections = sockets;
return make_lw_shared<s3::endpoint_config>(std::move(cfg));
}
@@ -42,16 +44,20 @@ class tester {
std::chrono::steady_clock::time_point now() const { return std::chrono::steady_clock::now(); }
public:
tester(std::chrono::seconds dur, unsigned prl, size_t obj_size)
tester(std::chrono::seconds dur, unsigned sockets, unsigned part_size, sstring object_name, size_t obj_size)
: _duration(dur)
, _parallel(prl)
, _object_name(fmt::format("/{}/perfobject-{}-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid(), this_shard_id()))
, _object_name(std::move(object_name))
, _object_size(obj_size)
, _mem(memory::stats().total_memory())
, _client(s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_config(), _mem))
, _client(s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_config(sockets), _mem))
, _part_size_mb(part_size)
, _remove_file(false)
{}
future<> start() {
private:
future<> make_temporary_file() {
_object_name = fmt::format("/{}/perfobject-{}-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid(), this_shard_id());
_remove_file = true;
plog.debug("Creating {} of {} bytes", _object_name, _object_size);
auto out = output_stream<char>(_client->make_upload_sink(_object_name));
@@ -74,9 +80,15 @@ public:
}
}
private:
public:
future<> start() {
if (_object_name.empty()) {
co_await make_temporary_file();
}
}
future<> do_run() {
future<> run_download() {
plog.info("Downloading");
auto until = now() + _duration;
uint64_t off = 0;
do {
@@ -84,25 +96,30 @@ private:
try {
co_await _client->get_object_contiguous(_object_name, s3::range{off, chunk_size});
off = (off + chunk_size) % (_object_size - chunk_size);
_reads_hist.add(std::chrono::duration_cast<std::chrono::milliseconds>(now() - start).count());
_latencies.add(std::chrono::duration_cast<std::chrono::milliseconds>(now() - start).count());
} catch (...) {
_errors++;
}
} while (now() < until);
}
public:
future<> run() {
co_await coroutine::parallel_for_each(std::views::iota(0u, _parallel), [this] (auto fnr) -> future<> {
plog.debug("Running {} fiber", fnr);
co_await seastar::sleep(std::chrono::milliseconds(fnr)); // make some discrepancy
co_await do_run();
});
future<> run_upload() {
plog.info("Uploading");
auto file_name = fs::path(_object_name);
auto sz = co_await seastar::file_size(file_name.native());
_object_name = fmt::format("/{}/{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), file_name.filename().native());
_remove_file = true;
auto start = now();
co_await _client->upload_file(file_name, _object_name, {}, _part_size_mb << 20);
auto time = std::chrono::duration_cast<std::chrono::duration<double>>(now() - start);
plog.info("Uploaded {}MB in {}s, speed {}MB/s", sz >> 20, time.count(), (sz >> 20) / time.count());
}
future<> stop() {
plog.debug("Removing {}", _object_name);
co_await _client->delete_object(_object_name);
if (_remove_file) {
plog.debug("Removing {}", _object_name);
co_await _client->delete_object(_object_name);
}
co_await _client->close();
auto print_percentiles = [] (const utils::estimated_histogram& hist) {
@@ -115,7 +132,7 @@ public:
hist.percentile(1.0)
);
};
plog.info("reads total: {:5}, errors: {:5}; latencies: {}", _reads_hist._count, _errors, print_percentiles(_reads_hist));
plog.info("requests total: {:5}, errors: {:5}; latencies: {}", _latencies._count, _errors, print_percentiles(_latencies));
}
};
@@ -123,23 +140,33 @@ int main(int argc, char** argv) {
namespace bpo = boost::program_options;
app_template app;
app.add_options()
("upload", "test file upload")
("duration", bpo::value<unsigned>()->default_value(10), "seconds to run")
("parallel", bpo::value<unsigned>()->default_value(1), "number of parallel fibers")
("sockets", bpo::value<unsigned>()->default_value(1), "maximum number of socket for http client")
("part_size_mb", bpo::value<unsigned>()->default_value(5), "part size")
("object_name", bpo::value<sstring>()->default_value(""), "use given object/file name")
("object_size", bpo::value<size_t>()->default_value(1 << 20), "size of test object")
;
return app.run(argc, argv, [&app] () -> future<> {
auto dur = std::chrono::seconds(app.configuration()["duration"].as<unsigned>());
auto prl = app.configuration()["parallel"].as<unsigned>();
auto sks = app.configuration()["sockets"].as<unsigned>();
auto part_size = app.configuration()["part_size_mb"].as<unsigned>();
auto oname = app.configuration()["object_name"].as<sstring>();
auto osz = app.configuration()["object_size"].as<size_t>();
auto upload = app.configuration().contains("upload");
sharded<tester> test;
plog.info("Creating");
co_await test.start(dur, prl, osz);
co_await test.start(dur, sks, part_size, oname, osz);
plog.info("Starting");
co_await test.invoke_on_all(&tester::start);
try {
plog.info("Running");
co_await test.invoke_on_all(&tester::run);
if (upload) {
co_await test.invoke_on_all(&tester::run_upload);
} else {
co_await test.invoke_on_all(&tester::run_download);
}
} catch (...) {
plog.error("Error running: {}", std::current_exception());
}

View File

@@ -237,7 +237,7 @@ client::group_client& client::find_or_create_client() {
// Limit the maximum number of connections this group's http client
// may have proportional to its shares. Shares are typically in the
// range of 100...1000, thus resulting in 1..10 connections
auto max_connections = std::max((unsigned)(sg.get_shares() / 100), 1u);
unsigned max_connections = _cfg->max_connections.has_value() ? *_cfg->max_connections : std::max((unsigned)(sg.get_shares() / 100), 1u);
it = _https.emplace(std::piecewise_construct,
std::forward_as_tuple(sg),
std::forward_as_tuple(std::move(factory), max_connections)

View File

@@ -32,6 +32,8 @@ struct endpoint_config {
std::string region;
// Amazon Resource Names (ARNs) to access AWS resources
std::string role_arn;
std::optional<unsigned> max_connections;
std::strong_ordering operator<=>(const endpoint_config& o) const = default;
};