Revert "Merge 'Unify configuration of object storage endpoints' from Pavel Emelyanov"

This reverts commit 1bb897c7ca, reversing
changes made to 954f2cbd2f. It makes
incompatible changes to the object storage configuration format, breaking
tests [1]. It's likely that it doesn't break any production configuration,
but we can't be sure.

Fixes #27966

Closes scylladb/scylladb#27969
This commit is contained in:
Avi Kivity
2026-01-04 16:22:11 +02:00
committed by Botond Dénes
parent 1ef6ac5439
commit 0df85c8ae8
15 changed files with 72 additions and 107 deletions

View File

@@ -12,6 +12,8 @@
#include <yaml-cpp/yaml.h>
#include <boost/lexical_cast.hpp>
#include "utils/s3/creds.hh"
#include "object_storage_endpoint_param.hh"
using namespace std::string_literals;
@@ -19,6 +21,9 @@ using namespace std::string_literals;
db::object_storage_endpoint_param::object_storage_endpoint_param(s3_storage s)
: _data(std::move(s))
{}
db::object_storage_endpoint_param::object_storage_endpoint_param(std::string endpoint, s3::endpoint_config config)
: object_storage_endpoint_param(s3_storage{std::move(endpoint), std::move(config)})
{}
db::object_storage_endpoint_param::object_storage_endpoint_param(gs_storage s)
: _data(std::move(s))
{}
@@ -27,8 +32,8 @@ db::object_storage_endpoint_param::object_storage_endpoint_param() = default;
db::object_storage_endpoint_param::object_storage_endpoint_param(const object_storage_endpoint_param&) = default;
std::string db::object_storage_endpoint_param::s3_storage::to_json_string() const {
return fmt::format("{{ \"type\": \"s3\", \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
region, iam_role_arn
return fmt::format("{{ \"port\": {}, \"use_https\": {}, \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
config.port, config.use_https, config.region, config.role_arn
);
}
@@ -94,6 +99,8 @@ const std::string& db::object_storage_endpoint_param::type() const {
db::object_storage_endpoint_param db::object_storage_endpoint_param::decode(const YAML::Node& node) {
auto name = node["name"];
auto aws_region = node["aws_region"];
auto iam_role_arn = node["iam_role_arn"];
auto type = node["type"];
auto get_opt = [](auto& node, const std::string& key, auto def) {
@@ -101,19 +108,13 @@ db::object_storage_endpoint_param db::object_storage_endpoint_param::decode(cons
return tmp ? tmp.template as<std::decay_t<decltype(def)>>() : def;
};
// aws s3 endpoint.
if (!type || type.as<std::string>() == s3_type ) {
if (!type || type.as<std::string>() == s3_type || aws_region || iam_role_arn) {
s3_storage ep;
ep.endpoint = name.as<std::string>();
auto aws_region = node["aws_region"];
ep.region = aws_region ? aws_region.as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
ep.iam_role_arn = get_opt(node, "iam_role_arn", ""s);
if (maybe_legacy_endpoint_name(ep.endpoint)) {
// Support legacy config for a while
auto port = node["port"].as<unsigned>();
auto use_https = node["https"].as<bool>(false);
ep.endpoint = fmt::format("http{}://{}:{}", use_https ? "s" : "", ep.endpoint, port);
}
ep.config.port = node["port"].as<unsigned>();
ep.config.use_https = node["https"].as<bool>(false);
ep.config.region = aws_region ? aws_region.as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
ep.config.role_arn = iam_role_arn ? iam_role_arn.as<std::string>() : "";
return object_storage_endpoint_param{std::move(ep)};
}

View File

@@ -13,6 +13,7 @@
#include <variant>
#include <compare>
#include <fmt/core.h>
#include "utils/s3/creds.hh"
namespace YAML {
class Node;
@@ -24,8 +25,7 @@ class object_storage_endpoint_param {
public:
struct s3_storage {
std::string endpoint;
std::string region;
std::string iam_role_arn;
s3::endpoint_config config;
std::strong_ordering operator<=>(const s3_storage&) const = default;
std::string to_json_string() const;
@@ -43,6 +43,7 @@ public:
object_storage_endpoint_param();
object_storage_endpoint_param(const object_storage_endpoint_param&);
object_storage_endpoint_param(s3_storage);
object_storage_endpoint_param(std::string endpoint, s3::endpoint_config config);
object_storage_endpoint_param(gs_storage);
std::strong_ordering operator<=>(const object_storage_endpoint_param&) const;
@@ -76,7 +77,3 @@ template <>
struct fmt::formatter<db::object_storage_endpoint_param> : fmt::formatter<std::string_view> {
auto format(const db::object_storage_endpoint_param&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
inline bool maybe_legacy_endpoint_name(std::string_view ep) noexcept {
return !(ep.starts_with("http://") || ep.starts_with("https://"));
}

View File

@@ -20,7 +20,9 @@ command line option when launchgin scylla.
You can define endpoint details in the `scylla.yaml` file. For example:
```yaml
object_storage_endpoints:
- name: https://s3.us-east-1.amazonaws.com:443
- name: s3.us-east-1.amazonaws.com
port: 443
https: true
aws_region: us-east-1
```
@@ -76,7 +78,9 @@ The examples above are intended for development or local environments. You shoul
For the EC2 Instance Metadata Service to function correctly, no additional configuration is required. However, STS requires the IAM Role ARN to be defined in the `scylla.yaml` file, as shown below:
```yaml
object_storage_endpoints:
- name: https://s3.us-east-1.amazonaws.com:443
- name: s3.us-east-1.amazonaws.com
port: 443
https: true
aws_region: us-east-1
iam_role_arn: arn:aws:iam::123456789012:instance-profile/my-instance-instance-profile
```
@@ -96,7 +100,9 @@ in `scylla.yaml`:
```yaml
object_storage_endpoints:
- name: https://s3.us-east-2.amazonaws.com:443
- name: s3.us-east-2.amazonaws.com
port: 443
https: true
aws_region: us-east-2
```

View File

@@ -111,7 +111,9 @@ should follow this format:
.. code-block:: yaml
object_storage_endpoints:
- name: https://<endpoint_address_or_domain_name>[:<port_number>]
- name: <endpoint_address_or_domain_name>
port: <port_number>
https: <true_or_false> # optional
aws_region: <region_name> # optional, e.g. us-east-1
iam_role_arn: <iam_role> # optional
@@ -121,7 +123,9 @@ Example:
.. code:: yaml
object_storage_endpoints:
- name: https://s3.us-east-1.amazonaws.com
- name: s3.us-east-1.amazonaws.com
port: 443
https: true
aws_region: us-east-1
iam_role_arn: arn:aws:iam::123456789012:instance-profile/my-instance-instance-profile

View File

@@ -14,6 +14,7 @@
#include <seastar/core/sharded.hh>
#include <seastar/core/abort_source.hh>
#include "utils/log.hh"
#include "utils/s3/creds.hh"
#include "seastarx.hh"
#include <boost/program_options.hpp>
#include <yaml-cpp/yaml.h>

View File

@@ -95,6 +95,7 @@ class reconcilable_result;
namespace bi = boost::intrusive;
namespace tracing { class trace_state_ptr; }
namespace s3 { struct endpoint_config; }
namespace lang { class manager; }

View File

@@ -60,17 +60,12 @@ fmt::formatter<sstables::object_name>::format(const sstables::object_name& n, fm
return fmt::format_to(ctx.out(), "{}", n.str());
}
static shared_ptr<s3::client> make_s3_client(const db::object_storage_endpoint_param& ep, semaphore& memory, std::function<shared_ptr<s3::client>(std::string)> factory) {
auto& epc = ep.get_s3_storage();
return s3::client::make(epc.endpoint, epc.region, epc.iam_role_arn, memory, std::move(factory));
}
class s3_client_wrapper : public sstables::object_storage_client {
shared_ptr<s3::client> _client;
shard_client_factory _cf;
public:
s3_client_wrapper(const db::object_storage_endpoint_param& ep, semaphore& memory, shard_client_factory cf)
: _client(make_s3_client(ep, memory, std::bind_front(&s3_client_wrapper::shard_client, this)))
s3_client_wrapper(const std::string& host, s3::endpoint_config_ptr cfg, semaphore& memory, shard_client_factory cf)
: _client(s3::client::make(host, cfg, memory, std::bind_front(&s3_client_wrapper::shard_client, this)))
, _cf(std::move(cf))
{}
shared_ptr<s3::client> shard_client(std::string host) const {
@@ -103,8 +98,8 @@ public:
return _client->upload_file(std::move(path), name.str(), up, as);
}
future<> update_config(const db::object_storage_endpoint_param& ep) override {
auto& epc = ep.get_s3_storage();
return _client->update_config(epc.region, epc.iam_role_arn);
auto s3_cfg = make_lw_shared<s3::endpoint_config>(ep.get_s3_storage().config);
return _client->update_config(std::move(s3_cfg));
}
future<> close() override {
return _client->close();
@@ -295,7 +290,9 @@ public:
shared_ptr<object_storage_client> sstables::make_object_storage_client(const db::object_storage_endpoint_param& ep, semaphore& memory, shard_client_factory cf) {
if (ep.is_s3_storage()) {
return seastar::make_shared<s3_client_wrapper>(ep, memory, std::move(cf));
auto& epc = ep.get_s3_storage();
auto s3_cfg = make_lw_shared<s3::endpoint_config>(epc.config);
return seastar::make_shared<s3_client_wrapper>(epc.endpoint, std::move(s3_cfg), memory, std::move(cf));
}
if (ep.is_gs_storage()) {
return seastar::make_shared<gs_client_wrapper>(ep, memory, std::move(cf));

View File

@@ -21,7 +21,6 @@
#include "gms/feature.hh"
#include "gms/feature_service.hh"
#include "utils/assert.hh"
#include "utils/http.hh"
#include "exceptions/exceptions.hh"
namespace sstables {
@@ -139,16 +138,6 @@ future<> storage_manager::update_config(const db::config& cfg) {
auto storage_manager::get_endpoint(const sstring& endpoint) -> object_storage_endpoint& {
auto found = _object_storage_endpoints.find(endpoint);
if (found == _object_storage_endpoints.end() && maybe_legacy_endpoint_name(endpoint)) {
found = _object_storage_endpoints.begin();
while (found != _object_storage_endpoints.end()) {
auto uri = utils::http::parse_simple_url(found->first);
if (uri.host == endpoint) {
break;
}
found++;
}
}
if (found == _object_storage_endpoints.end()) {
smlogger.error("unable to find {} in configured object-storage endpoints", endpoint);
throw std::invalid_argument(format("endpoint {} not found", endpoint));
@@ -171,20 +160,7 @@ sstring storage_manager::get_endpoint_type(sstring endpoint) {
}
bool storage_manager::is_known_endpoint(sstring endpoint) const {
if (_object_storage_endpoints.contains(endpoint)) {
return true;
}
if (maybe_legacy_endpoint_name(endpoint)) {
for (auto ep : _object_storage_endpoints) {
auto uri = utils::http::parse_simple_url(ep.first);
if (uri.host == endpoint) {
return true;
}
}
}
return false;
return _object_storage_endpoints.contains(endpoint);
}
std::vector<sstring> storage_manager::endpoints(sstring type) const noexcept {

View File

@@ -775,7 +775,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
auto& db_cfg = *db_cfg_ptr;
db_cfg.enable_user_defined_functions({true}, db::config::config_source::CommandLine);
db_cfg.experimental_features({experimental_features_t::feature::UDF, experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS}, db::config::config_source::CommandLine);
db_cfg.object_storage_endpoints({db::object_storage_endpoint_param::s3_storage{.endpoint = "http://localhost"}}, db::config::config_source::CommandLine);
db_cfg.object_storage_endpoints({db::object_storage_endpoint_param{"localhost", s3::endpoint_config{}}}, db::config::config_source::CommandLine);
if (regenerate) {
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
} else {

View File

@@ -33,28 +33,27 @@ def format_tuples(tuples=None, **kwargs):
class S3_Server:
def __init__(self, tempdir: str, address: str, port: int, acc_key: str, secret_key: str, region: str, bucket_name):
self.tempdir = tempdir
self.ip = address
self.address = address
self.port = port
self.address = f'http://{self.ip}:{self.port}'
self.acc_key = acc_key
self.secret_key = secret_key
self.region = region
self.bucket_name = bucket_name
def __repr__(self):
return f"[unknown] {self.address}/{self.bucket_name}"
return f"[unknown] {self.address}:{self.port}/{self.bucket_name}"
@property
def type(self):
return 'S3'
def create_endpoint_conf(self):
return MinioServer.create_conf(self.address, self.region)
return MinioServer.create_conf(self.address, self.port, self.region)
def get_resource(self):
"""Creates boto3.resource object that can be used to communicate to the given server"""
return boto3.resource('s3',
endpoint_url=self.address,
endpoint_url=f'http://{self.address}:{self.port}',
aws_access_key_id=self.acc_key,
aws_secret_access_key=self.secret_key,
aws_session_token=None,
@@ -74,16 +73,15 @@ class MinioWrapper(S3_Server):
'127.0.0.1',
logging.getLogger('minio'))
self.tempdir = tempdir
self.ip = self.server.address
self.address = self.server.address
self.port = self.server.port
self.address = f'http://{self.ip}:{self.port}'
self.acc_key = self.server.access_key
self.secret_key = self.server.access_key
self.region = MinioServer.DEFAULT_REGION
self.bucket_name = self.server.bucket_name
def create_endpoint_conf(self):
return MinioServer.create_conf(self.address, self.region)
return MinioServer.create_conf(self.address, self.port, self.region)
async def start(self):
return self.server.start()
@@ -269,4 +267,4 @@ async def s3_storage(pytestconfig, tmpdir):
await server.start()
yield server
finally:
await server.stop()
await server.stop()

View File

@@ -252,9 +252,8 @@ async def test_memtable_flush_retries(manager: ManagerClient, tmpdir, object_sto
@pytest.mark.asyncio
async def test_get_object_store_endpoints(manager: ManagerClient, object_storage):
a_ep_name='http://a'
objconf = object_storage.create_endpoint_conf()
badconf = MinioServer.create_conf(a_ep_name, 'bad_region')
badconf = MinioServer.create_conf('a', 123, 'bad_region')
cfg = {'object_storage_endpoints': objconf + badconf}
print('Scylla returns the object storage endpoints')
@@ -265,26 +264,25 @@ async def test_get_object_store_endpoints(manager: ManagerClient, object_storage
del objconf[0]['name']
del badconf[0]['name']
assert json.loads(endpoints[object_storage.address]) == objconf[0]
assert json.loads(endpoints[a_ep_name]) == badconf[0]
assert json.loads(endpoints['a']) == badconf[0]
print('Check that system.config contains the object storage endpoints')
cql = manager.get_cql()
res = json.loads(cql.execute("SELECT value FROM system.config WHERE name = 'object_storage_endpoints';").one().value)
assert object_storage.address in res and a_ep_name in res
assert object_storage.address in res and 'a' in res
assert json.loads(res[object_storage.address]) == objconf[0]
assert json.loads(res[a_ep_name]) == badconf[0]
assert json.loads(res['a']) == badconf[0]
b_ep_name = 'http://b'
print('Update config with a new endpoint and SIGHUP Scylla to reload configuration')
new_endpoint = MinioServer.create_conf(b_ep_name, 'good_region')
new_endpoint = MinioServer.create_conf('b', 456, 'good_region')
await manager.server_update_config(server.server_id, 'object_storage_endpoints', new_endpoint)
await wait_for_config(manager, server, 'object_storage_endpoints', {b_ep_name: '{ "type": "s3", "aws_region": "good_region", "iam_role_arn": "" }'})
await wait_for_config(manager, server, 'object_storage_endpoints', {'b': '{ "port": 456, "use_https": false, "aws_region": "good_region", "iam_role_arn": "" }'})
print('Trying to create a keyspace with an endpoint not configured in object_storage_endpoints should trip storage_manager::is_known_endpoint()')
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy',
'replication_factor': '1'})
storage_opts = format_tuples(type=f'{object_storage.type}',
endpoint=a_ep_name,
endpoint='a',
bucket=object_storage.bucket_name)
with pytest.raises(ConfigurationException):
cql.execute((f'CREATE KEYSPACE random_ks WITH'
@@ -292,7 +290,7 @@ async def test_get_object_store_endpoints(manager: ManagerClient, object_storage
print('Passing a known endpoint will make the CREATE KEYSPACE stmt to succeed')
storage_opts = format_tuples(type=f'{object_storage.type}',
endpoint=b_ep_name,
endpoint='b',
bucket=object_storage.bucket_name)
cql.execute((f'CREATE KEYSPACE random_ks WITH'
f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};'))

View File

@@ -187,11 +187,11 @@ std::vector<db::object_storage_endpoint_param> make_storage_options_config(const
},
[&endpoints] (const data_dictionary::storage_options::object_storage& os) mutable -> void {
if (os.type == data_dictionary::storage_options::S3_NAME) {
auto reg = ::getenv("AWS_DEFAULT_REGION");
auto endpoint = fmt::format("{}://{}:{}", reg != nullptr ? "https" : "http", os.endpoint, tests::getenv_safe("S3_SERVER_PORT_FOR_TEST"));
endpoints.emplace_back(db::object_storage_endpoint_param::s3_storage{
.endpoint = std::move(endpoint),
.region = std::string(reg != nullptr ? reg : "local"),
endpoints.emplace_back(os.endpoint,
s3::endpoint_config {
.port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")),
.use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr,
.region = tests::getenv_or_default("AWS_DEFAULT_REGION", "local"),
});
}
if (os.type == data_dictionary::storage_options::GS_NAME) {

View File

@@ -151,8 +151,9 @@ class MinioServer:
yield random.randint(min_port, max_port)
@staticmethod
def create_conf(address: str, region: str):
def create_conf(address: str, port: int, region: str):
endpoint = {'name': address,
'port': port,
# don't put credentials here. We're exporing env vars, which should
# be picked up properly by scylla.
# https://github.com/scylladb/scylla-pkg/issues/3845
@@ -160,7 +161,7 @@ class MinioServer:
#'aws_secret_access_key': secret_key,
'aws_region': region,
'iam_role_arn': '',
'type': 's3'
'use_https': False
}
return [endpoint]

View File

@@ -126,14 +126,11 @@ client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global
}
}
future<> client::update_config(std::string region, std::string ira) {
endpoint_config new_cfg = {
.port = _cfg->port,
.use_https = _cfg->use_https,
.region = std::move(region),
.role_arn = std::move(ira),
};
_cfg = make_lw_shared<endpoint_config>(std::move(new_cfg));
future<> client::update_config(endpoint_config_ptr cfg) {
if (_cfg->port != cfg->port || _cfg->use_https != cfg->use_https) {
throw std::runtime_error("Updating port and/or https usage is not possible");
}
_cfg = std::move(cfg);
auto units = co_await get_units(_creds_sem, 1);
_creds_provider_chain.invalidate_credentials();
_credentials = {};
@@ -144,17 +141,6 @@ shared_ptr<client> client::make(std::string endpoint, endpoint_config_ptr cfg, s
return seastar::make_shared<client>(std::move(endpoint), std::move(cfg), mem, std::move(gf), private_tag{});
}
shared_ptr<client> client::make(std::string ep, std::string region, std::string iam_role_arn, semaphore& memory, global_factory gf) {
auto url = utils::http::parse_simple_url(ep);
endpoint_config cfg = {
.port = url.port,
.use_https = url.is_https(),
.region = std::move(region),
.role_arn = std::move(iam_role_arn),
};
return make(url.host, make_lw_shared<endpoint_config>(std::move(cfg)), memory, gf);
}
future<> client::update_credentials_and_rearm() {
_credentials = co_await _creds_provider_chain.get_aws_credentials();
_creds_invalidation_timer.rearm(_credentials.expires_at);

View File

@@ -178,7 +178,6 @@ public:
client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag, std::unique_ptr<seastar::http::experimental::retry_strategy> rs = nullptr);
static shared_ptr<client> make(std::string endpoint, endpoint_config_ptr cfg, semaphore& memory, global_factory gf = {});
static shared_ptr<client> make(std::string url, std::string region, std::string iam_role_arn, semaphore& memory, global_factory gf = {});
future<uint64_t> get_object_size(sstring object_name, seastar::abort_source* = nullptr);
future<stats> get_object_stats(sstring object_name, seastar::abort_source* = nullptr);
@@ -212,7 +211,7 @@ public:
upload_progress& up,
seastar::abort_source* = nullptr);
future<> update_config(std::string reg, std::string ira);
future<> update_config(endpoint_config_ptr);
struct handle {
std::string _host;