Merge 'Make s3 upload sink PUT small objects' from Pavel Emelyanov

When upload-sink is flushed, it may notice that the upload had not yet been started and fall-back to plain PUT in that case. This will make small files uploading much nicer, because multipart upload would take 3 API calls (start, part, complete) in this case

fixes: #13014

Closes scylladb/scylladb#15824

* github.com:scylladb/scylladb:
  test: Add s3_client test for upload PUT fallback
  s3/client: Add PUT fallback to upload sink
This commit is contained in:
Botond Dénes
2023-10-25 10:03:46 +03:00
2 changed files with 35 additions and 0 deletions

View File

@@ -162,6 +162,34 @@ SEASTAR_THREAD_TEST_CASE(test_client_multipart_copy_upload) {
do_test_client_multipart_upload(true);
}
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback) {
const sstring name(fmt::format("/{}/testfbobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
testlog.info("Make client");
semaphore mem(0);
mem.broken(); // so that any attempt to use it throws
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto close_client = deferred_close(*cln);
testlog.info("Upload object");
auto out = output_stream<char>(cln->make_upload_sink(name));
auto close = seastar::deferred_close(out);
temporary_buffer<char> data = sstring("1A3B5C7890").release();
out.write(reinterpret_cast<const char*>(data.begin()), data.size()).get();
testlog.info("Flush upload");
out.flush().get(); // if it tries to do regular flush, memory claim would throw
auto delete_object = deferred_delete_object(cln, name);
testlog.info("Closing");
close.close_now();
testlog.info("Get object content");
temporary_buffer<char> res = cln->get_object_contiguous(name).get0();
BOOST_REQUIRE_EQUAL(to_sstring(std::move(res)), to_sstring(std::move(data)));
}
SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
const sstring name(fmt::format("/{}/testroobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));

View File

@@ -678,6 +678,13 @@ public:
virtual future<> flush() override {
if (_bufs.size() != 0) {
// This is handy for small objects that are uploaded via the sink. It makes
// upload happen in one REST call, instead of three (create + PUT + wrap-up)
if (!upload_started()) {
s3l.trace("Sink fallback to plain PUT for {}", _object_name);
co_return co_await _client->put_object(_object_name, std::move(_bufs));
}
co_await upload_part(std::move(_bufs));
}
if (upload_started()) {