s3/client: add tagging ops
with tagging ops, we will be able to attach kv pairs to an object. this will allow us to mark sstable components with taggings, and filter them based on them. * test/pylib/minio_server.py: enable anonymous user to perform more actions. because the tagging related ops are not enabled by "mc anonymous set public", we have to enable them using "set-json" subcommand. * utils/s3/client: add methods to manipulate taggings. * test/boost/s3_test: add a simple test accordingly. Signed-off-by: Kefu Chai <kefu.chai@scylladb.com> Closes #14486
This commit is contained in:
@@ -205,3 +205,30 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
|
||||
f.close().get();
|
||||
cln->close().get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}",
|
||||
tests::getenv_safe("S3_PUBLIC_BUCKET_FOR_TEST"), ::getpid()));
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config());
|
||||
auto data = sstring("1234567890ABCDEF").release();
|
||||
client->put_object(name, std::move(data)).get();
|
||||
{
|
||||
auto tagset = client->get_object_tagging(name).get0();
|
||||
BOOST_CHECK(tagset.empty());
|
||||
}
|
||||
{
|
||||
s3::tag_set expected_tagset{{"1", "one"}, {"2", "two"}};
|
||||
client->put_object_tagging(name, expected_tagset).get();
|
||||
auto actual_tagset = client->get_object_tagging(name).get0();
|
||||
std::ranges::sort(actual_tagset);
|
||||
std::ranges::sort(expected_tagset);
|
||||
BOOST_CHECK(actual_tagset == expected_tagset);
|
||||
}
|
||||
{
|
||||
client->delete_object_tagging(name).get();
|
||||
auto tagset = client->get_object_tagging(name).get0();
|
||||
BOOST_CHECK(tagset.empty());
|
||||
}
|
||||
client->delete_object(name).get();
|
||||
client->close().get();
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import argparse
|
||||
import asyncio
|
||||
from asyncio.subprocess import Process
|
||||
from typing import Optional
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
import subprocess
|
||||
@@ -79,6 +80,54 @@ class MinioServer:
|
||||
else:
|
||||
break
|
||||
|
||||
def _anonymous_public_policy(self):
|
||||
# the default anonymous public policy does not allow us to access
|
||||
# the taggings, so let's add the tagging actions manually.
|
||||
#
|
||||
# the original access policy is dumped using:
|
||||
# mc anonymous set public local/testbucket
|
||||
# mc anonymous get-json local/testbucket
|
||||
#
|
||||
# we added following actions to the policy for accessing objects in the
|
||||
# bucket created for testing:
|
||||
# - GetObjectTagging
|
||||
# - PutObjectTagging
|
||||
# - DeleteObjectTagging
|
||||
#
|
||||
# the full list of actions can be found at
|
||||
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_Operations.html
|
||||
bucket_actions = [
|
||||
"s3:ListBucket",
|
||||
"s3:ListBucketMultipartUploads",
|
||||
"s3:GetBucketLocation",
|
||||
]
|
||||
object_actions = [
|
||||
"s3:AbortMultipartUpload",
|
||||
"s3:DeleteObject",
|
||||
"s3:GetObject",
|
||||
"s3:ListMultipartUploadParts",
|
||||
"s3:PutObject",
|
||||
"s3:GetObjectTagging",
|
||||
"s3:PutObjectTagging",
|
||||
"s3:DeleteObjectTagging"
|
||||
]
|
||||
statement = [
|
||||
{
|
||||
'Action': bucket_actions,
|
||||
'Effect': 'Allow',
|
||||
'Principal': {'AWS': ['*']},
|
||||
'Resource': [ f'arn:aws:s3:::{self.bucket_name}' ]
|
||||
},
|
||||
{
|
||||
'Action': object_actions,
|
||||
'Effect': 'Allow',
|
||||
'Principal': {'AWS': ['*']},
|
||||
'Resource': [ f'arn:aws:s3:::{self.bucket_name}/*' ]
|
||||
}
|
||||
]
|
||||
return {'Statement': statement,
|
||||
'Version': '2012-10-17'}
|
||||
|
||||
async def start(self):
|
||||
if self.srv_exe is None:
|
||||
self.logger.info("Minio not installed, get it from https://dl.minio.io/server/minio/release/linux-amd64/minio and put into PATH")
|
||||
@@ -119,7 +168,10 @@ class MinioServer:
|
||||
await self.mc('config', 'host', 'add', alias, f'http://{self.address}:{self.port}', self.default_user, self.default_pass, timeout=30)
|
||||
self.log_to_file(f'Configuring bucket {self.bucket_name}')
|
||||
await self.mc('mb', f'{alias}/{self.bucket_name}')
|
||||
await self.mc('anonymous', 'set', 'public', f'{alias}/{self.bucket_name}')
|
||||
with tempfile.NamedTemporaryFile(mode='w', encoding='UTF-8', suffix='.json') as policy_file:
|
||||
json.dump(self._anonymous_public_policy(), policy_file, indent=2)
|
||||
policy_file.flush()
|
||||
await self.mc('anonymous', 'set-json', policy_file.name, f'{alias}/{self.bucket_name}')
|
||||
|
||||
except Exception as e:
|
||||
self.logger.info(f'MC failed: {e}')
|
||||
|
||||
@@ -6,7 +6,9 @@
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include <initializer_list>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <rapidxml.h>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
@@ -28,6 +30,15 @@
|
||||
#include "db_clock.hh"
|
||||
#include "log.hh"
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<s3::tag> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const s3::tag& tag, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
|
||||
tag.key, tag.value);
|
||||
}
|
||||
};
|
||||
|
||||
namespace utils {
|
||||
|
||||
inline size_t iovec_len(const std::vector<iovec>& iov)
|
||||
@@ -223,6 +234,92 @@ future<client::stats> client::get_object_stats(sstring object_name) {
|
||||
co_return st;
|
||||
}
|
||||
|
||||
static rapidxml::xml_node<>* first_node_of(rapidxml::xml_node<>* root,
|
||||
std::initializer_list<std::string_view> names) {
|
||||
assert(root);
|
||||
auto* node = root;
|
||||
for (auto name : names) {
|
||||
node = node->first_node(name.data(), name.size());
|
||||
if (!node) {
|
||||
throw std::runtime_error(fmt::format("'{}' is not found", name));
|
||||
}
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
static tag_set parse_tagging(sstring& body) {
|
||||
auto doc = std::make_unique<rapidxml::xml_document<>>();
|
||||
try {
|
||||
doc->parse<0>(body.data());
|
||||
} catch (const rapidxml::parse_error& e) {
|
||||
s3l.warn("cannnot parse tagging response: {}", e.what());
|
||||
throw std::runtime_error("cannot parse tagging response");
|
||||
}
|
||||
tag_set tags;
|
||||
auto tagset_node = first_node_of(doc.get(), {"Tagging", "TagSet"});
|
||||
for (auto tag_node = tagset_node->first_node("Tag"); tag_node; tag_node = tag_node->next_sibling()) {
|
||||
auto key = tag_node->first_node("Key")->value();
|
||||
auto value = tag_node->first_node("Value")->value();
|
||||
tags.emplace_back(tag{key, value});
|
||||
}
|
||||
return tags;
|
||||
}
|
||||
|
||||
future<tag_set> client::get_object_tagging(sstring object_name) {
|
||||
// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
|
||||
auto req = http::request::make("GET", _host, object_name);
|
||||
req.query_parameters["tagging"] = "";
|
||||
s3l.trace("GET {} tagging", object_name);
|
||||
tag_set tags;
|
||||
co_await make_request(std::move(req),
|
||||
[&tags] (const http::reply& reply, input_stream<char>&& in) mutable -> future<> {
|
||||
auto& retval = tags;
|
||||
auto input = std::move(in);
|
||||
auto body = co_await util::read_entire_stream_contiguous(input);
|
||||
retval = parse_tagging(body);
|
||||
});
|
||||
co_return tags;
|
||||
}
|
||||
|
||||
static auto dump_tagging(const tag_set& tags) {
|
||||
// print the tags as an XML as defined by the API definition.
|
||||
fmt::memory_buffer body;
|
||||
fmt::format_to(fmt::appender(body), "<Tagging><TagSet>{}</TagSet></Tagging>", fmt::join(tags, ""));
|
||||
return body;
|
||||
}
|
||||
|
||||
future<> client::put_object_tagging(sstring object_name, tag_set tagging) {
|
||||
// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
|
||||
auto req = http::request::make("PUT", _host, object_name);
|
||||
req.query_parameters["tagging"] = "";
|
||||
s3l.trace("PUT {} tagging", object_name);
|
||||
auto body = dump_tagging(tagging);
|
||||
size_t body_size = body.size();
|
||||
req.write_body("xml", body_size, [body=std::move(body)] (output_stream<char>&& out) -> future<> {
|
||||
auto output = std::move(out);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await output.write(body.data(), body.size());
|
||||
co_await output.flush();
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await output.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
});
|
||||
co_await make_request(std::move(req));
|
||||
}
|
||||
|
||||
future<> client::delete_object_tagging(sstring object_name) {
|
||||
// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjectTagging.html
|
||||
auto req = http::request::make("DELETE", _host, object_name);
|
||||
req.query_parameters["tagging"] = "";
|
||||
s3l.trace("DELETE {} tagging", object_name);
|
||||
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::no_content);
|
||||
}
|
||||
|
||||
future<temporary_buffer<char>> client::get_object_contiguous(sstring object_name, std::optional<range> range) {
|
||||
auto req = http::request::make("GET", _host, object_name);
|
||||
http::reply::status_type expected = http::reply::status_type::ok;
|
||||
|
||||
@@ -22,6 +22,13 @@ struct range {
|
||||
size_t len;
|
||||
};
|
||||
|
||||
struct tag {
|
||||
std::string key;
|
||||
std::string value;
|
||||
auto operator<=>(const tag&) const = default;
|
||||
};
|
||||
using tag_set = std::vector<tag>;
|
||||
|
||||
future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_);
|
||||
|
||||
class client : public enable_shared_from_this<client> {
|
||||
@@ -51,6 +58,9 @@ public:
|
||||
std::time_t last_modified;
|
||||
};
|
||||
future<stats> get_object_stats(sstring object_name);
|
||||
future<tag_set> get_object_tagging(sstring object_name);
|
||||
future<> put_object_tagging(sstring object_name, tag_set tagging);
|
||||
future<> delete_object_tagging(sstring object_name);
|
||||
future<temporary_buffer<char>> get_object_contiguous(sstring object_name, std::optional<range> range = {});
|
||||
future<> put_object(sstring object_name, temporary_buffer<char> buf);
|
||||
future<> put_object(sstring object_name, ::memory_data_sink_buffers bufs);
|
||||
|
||||
Reference in New Issue
Block a user