Merge 'scylla-sstable: add native S3 support' from Ernest Zaslavsky
scylla-sstable: Enable support for S3-stored sstables Minimal implementation of what was mentioned in this [issue](https://github.com/scylladb/scylladb/issues/20532) This update allows Scylla to work with sstables stored on AWS S3. Users can specify the fully qualified location of the sstable using the format: `s3://bucket/prefix/sstable_name`. One should have `object_storage_config_file` referenced in the `scylla.yaml` as described in docs/operating-scylla/admin.rst ref: https://github.com/scylladb/scylladb/issues/20532 fixes: https://github.com/scylladb/scylladb/issues/20535 No backport needed since the S3 functionality was never released Closes scylladb/scylladb#22321 * github.com:scylladb/scylladb: tests: Add Tests for Scylla-SSTable S3 Functionality docs: Update Scylla Tools Documentation for S3 SSTable Support scylla-sstable: Enable Support for S3 SSTables s3: Implement S3 Fully Qualified Name Manipulation Functions object_storage: Refactor `object_storage.yaml` parsing logic
This commit is contained in:
@@ -1038,6 +1038,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/s3/credentials_providers/instance_profile_credentials_provider.cc',
|
||||
'utils/s3/credentials_providers/sts_assume_role_credentials_provider.cc',
|
||||
'utils/s3/credentials_providers/aws_credentials_provider_chain.cc',
|
||||
'utils/s3/utils/manip_s3.cc',
|
||||
'utils/advanced_rpc_compressor.cc',
|
||||
'gms/version_generator.cc',
|
||||
'gms/versioned_value.cc',
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <variant>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/s3/utils/manip_s3.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace seastar {
|
||||
@@ -63,6 +64,18 @@ inline storage_options make_local_options(std::filesystem::path dir) {
|
||||
return so;
|
||||
}
|
||||
|
||||
inline storage_options make_s3_options(const std::string& endpoint, const std::string& fqn) {
|
||||
std::string bucket;
|
||||
std::string object;
|
||||
s3::s3fqn_to_parts(fqn, bucket, object);
|
||||
object = std::filesystem::path(object).parent_path().string(); // remove the filename and trailing separator from the path
|
||||
storage_options so;
|
||||
so.value = storage_options::s3{.bucket = std::move(bucket), .endpoint = endpoint, .location = std::move(object)};
|
||||
|
||||
return so;
|
||||
}
|
||||
|
||||
|
||||
} // namespace data_dictionary
|
||||
|
||||
template <>
|
||||
|
||||
@@ -29,7 +29,8 @@ The command syntax is as follows:
|
||||
scylla sstable <operation> <path to SStable>
|
||||
|
||||
|
||||
You can specify more than one SStable.
|
||||
You can specify more than one SSTable. Additionally, the path to SSTable can point to an S3 fully qualified path in the form of s3://bucket-name/prefix/of/your/sstable/sstable-TOC.txt. To use this feature, you need to have AWS credentials set up in your environment. For more information, see :ref:`Configuring AWS S3 access <aws-s3-configuration>`. Additionally, you must ensure the tool is able to load the correct Scylla YAML file, which can be done using the --scylla-yaml-file parameter or by placing the YAML file in one of the default locations the tool checks.
|
||||
|
||||
|
||||
Schema
|
||||
------
|
||||
|
||||
@@ -146,6 +146,8 @@ in the same directory as ``scylla.yaml``. You can override this location using t
|
||||
|
||||
object_storage_config_file: object_storage.yaml
|
||||
|
||||
.. _aws-s3-configuration:
|
||||
|
||||
Configuring AWS S3 access
|
||||
-------------------------
|
||||
|
||||
|
||||
43
init.cc
43
init.cc
@@ -129,3 +129,46 @@ void service_set::add(std::any value) {
|
||||
std::any service_set::find(const std::type_info& type) const {
|
||||
return _impl->find(type);
|
||||
}
|
||||
|
||||
future<> read_object_storage_config(db::config& db_cfg) {
|
||||
sstring cfg_name;
|
||||
if (!db_cfg.object_storage_config_file().empty()) {
|
||||
cfg_name = db_cfg.object_storage_config_file();
|
||||
} else {
|
||||
cfg_name = db::config::get_conf_sub("object_storage.yaml").native();
|
||||
if (!co_await file_accessible(cfg_name, access_flags::exists)) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
|
||||
auto cfg_file = co_await open_file_dma(cfg_name, open_flags::ro);
|
||||
sstring data;
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
auto sz = co_await cfg_file.size();
|
||||
data = seastar::to_sstring(co_await cfg_file.dma_read_exactly<char>(0, sz));
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await cfg_file.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(ex);
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, s3::endpoint_config> cfg;
|
||||
YAML::Node doc = YAML::Load(data.c_str());
|
||||
for (auto&& section : doc) {
|
||||
auto sec_name = section.first.as<std::string>();
|
||||
if (sec_name != "endpoints") {
|
||||
co_await coroutine::return_exception(std::runtime_error(fmt::format("While parsing object_storage config: section {} currently unsupported.", sec_name)));
|
||||
}
|
||||
|
||||
auto endpoints = section.second.as<std::vector<object_storage_endpoint_param>>();
|
||||
for (auto&& ep : endpoints) {
|
||||
cfg[ep.endpoint] = std::move(ep.config);
|
||||
}
|
||||
}
|
||||
|
||||
db_cfg.object_storage_config.set(std::move(cfg));
|
||||
}
|
||||
|
||||
23
init.hh
23
init.hh
@@ -14,8 +14,10 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "utils/log.hh"
|
||||
#include "utils/s3/creds.hh"
|
||||
#include "seastarx.hh"
|
||||
#include <boost/program_options.hpp>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
namespace db {
|
||||
class extensions;
|
||||
@@ -127,3 +129,24 @@ public:
|
||||
private:
|
||||
static void register_configurable(configurable &);
|
||||
};
|
||||
|
||||
future<> read_object_storage_config(db::config& db_cfg);
|
||||
|
||||
struct object_storage_endpoint_param {
|
||||
sstring endpoint;
|
||||
s3::endpoint_config config;
|
||||
};
|
||||
|
||||
namespace YAML {
|
||||
template <>
|
||||
struct convert<object_storage_endpoint_param> {
|
||||
static bool decode(const Node& node, ::object_storage_endpoint_param& ep) {
|
||||
ep.endpoint = node["name"].as<std::string>();
|
||||
ep.config.port = node["port"].as<unsigned>();
|
||||
ep.config.use_https = node["https"].as<bool>(false);
|
||||
ep.config.region = node["aws_region"] ? node["aws_region"].as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
|
||||
ep.config.role_arn = node["iam_role_arn"] ? node["iam_role_arn"].as<std::string>() : "";
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
64
main.cc
64
main.cc
@@ -8,7 +8,6 @@
|
||||
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
#include <seastar/util/closeable.hh>
|
||||
@@ -116,6 +115,7 @@
|
||||
#include "message/dictionary_service.hh"
|
||||
#include "utils/disk_space_monitor.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "tools/utils.hh"
|
||||
|
||||
|
||||
#define P11_KIT_FUTURE_UNSTABLE_API
|
||||
@@ -191,68 +191,6 @@ public:
|
||||
sharded<abort_source>& as_sharded_abort_source() { return _abort_sources; }
|
||||
};
|
||||
|
||||
struct object_storage_endpoint_param {
|
||||
sstring endpoint;
|
||||
s3::endpoint_config config;
|
||||
};
|
||||
|
||||
namespace YAML {
|
||||
template<>
|
||||
struct convert<::object_storage_endpoint_param> {
|
||||
static bool decode(const Node& node, ::object_storage_endpoint_param& ep) {
|
||||
ep.endpoint = node["name"].as<std::string>();
|
||||
ep.config.port = node["port"].as<unsigned>();
|
||||
ep.config.use_https = node["https"].as<bool>(false);
|
||||
ep.config.region = node["aws_region"] ? node["aws_region"].as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
|
||||
ep.config.role_arn = node["iam_role_arn"] ? node["iam_role_arn"].as<std::string>() : "";
|
||||
return true;
|
||||
}
|
||||
};
|
||||
} // namespace YAML
|
||||
|
||||
static future<> read_object_storage_config(db::config& db_cfg) {
|
||||
sstring cfg_name;
|
||||
if (!db_cfg.object_storage_config_file().empty()) {
|
||||
cfg_name = db_cfg.object_storage_config_file();
|
||||
} else {
|
||||
cfg_name = db::config::get_conf_sub("object_storage.yaml").native();
|
||||
if (!co_await file_accessible(cfg_name, access_flags::exists)) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
|
||||
auto cfg_file = co_await open_file_dma(cfg_name, open_flags::ro);
|
||||
sstring data;
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
auto sz = co_await cfg_file.size();
|
||||
data = seastar::to_sstring(co_await cfg_file.dma_read_exactly<char>(0, sz));
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await cfg_file.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(ex);
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, s3::endpoint_config> cfg;
|
||||
YAML::Node doc = YAML::Load(data.c_str());
|
||||
for (auto&& section : doc) {
|
||||
auto sec_name = section.first.as<std::string>();
|
||||
if (sec_name != "endpoints") {
|
||||
co_await coroutine::return_exception(std::runtime_error(fmt::format("While parsing object_storage config: section {} currently unsupported.", sec_name)));
|
||||
}
|
||||
|
||||
auto endpoints = section.second.as<std::vector<object_storage_endpoint_param>>();
|
||||
for (auto&& ep : endpoints) {
|
||||
cfg[ep.endpoint] = std::move(ep.config);
|
||||
}
|
||||
}
|
||||
|
||||
db_cfg.object_storage_config.set(std::move(cfg));
|
||||
}
|
||||
|
||||
static future<>
|
||||
read_config(bpo::variables_map& opts, db::config& cfg) {
|
||||
sstring file;
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/s3/client.hh"
|
||||
#include "utils/s3/creds.hh"
|
||||
#include "utils/s3/utils/manip_s3.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
#include "utils/s3/credentials_providers/aws_credentials_provider_chain.hh"
|
||||
#include "utils/s3/credentials_providers/instance_profile_credentials_provider.hh"
|
||||
@@ -672,3 +673,51 @@ SEASTAR_THREAD_TEST_CASE(test_creds) {
|
||||
BOOST_REQUIRE_EQUAL(creds.secret_access_key, "");
|
||||
BOOST_REQUIRE_EQUAL(creds.session_token, "");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(s3_fqn_manipulation) {
|
||||
std::string bucket_name, object_name;
|
||||
// Empty input
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("", bucket_name, object_name), false);
|
||||
BOOST_REQUIRE(bucket_name.empty());
|
||||
BOOST_REQUIRE(object_name.empty());
|
||||
// Incorrect scheme
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("http://bucket/object", bucket_name, object_name), false);
|
||||
BOOST_REQUIRE(bucket_name.empty());
|
||||
BOOST_REQUIRE(object_name.empty());
|
||||
// No scheme
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("/bucket/object", bucket_name, object_name), false);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name.empty(), true);
|
||||
BOOST_REQUIRE_EQUAL(object_name.empty(), true);
|
||||
// Scheme without slashes
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3:", bucket_name, object_name), false);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name.empty(), true);
|
||||
BOOST_REQUIRE_EQUAL(object_name.empty(), true);
|
||||
// Scheme with single slash
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3:/", bucket_name, object_name), true);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name.empty(), true);
|
||||
BOOST_REQUIRE_EQUAL(object_name, "/");
|
||||
// Without bucket
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3://", bucket_name, object_name), true);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name.empty(), true);
|
||||
BOOST_REQUIRE_EQUAL(object_name, "/");
|
||||
// Bucket only, no prefix/object
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3://bucket/", bucket_name, object_name), true);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name, "bucket");
|
||||
BOOST_REQUIRE_EQUAL(object_name, "/");
|
||||
// Same as above, but without trailing slash
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3://bucket", bucket_name, object_name), true);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name, "bucket");
|
||||
BOOST_REQUIRE_EQUAL(object_name, "/");
|
||||
// Bucket and prefix without object
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3://bucket/object", bucket_name, object_name), true);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name, "bucket");
|
||||
BOOST_REQUIRE_EQUAL(object_name, "object");
|
||||
// Bucket and object with object
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3://bucket/prefix1/prefix2/foo.bar", bucket_name, object_name), true);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name, "bucket");
|
||||
BOOST_REQUIRE_EQUAL(object_name, "prefix1/prefix2/foo.bar");
|
||||
// Bucket and object with object and extra slashes
|
||||
BOOST_REQUIRE_EQUAL(s3::s3fqn_to_parts("s3://bucket///prefix1/prefix2//foo.bar", bucket_name, object_name), true);
|
||||
BOOST_REQUIRE_EQUAL(bucket_name, "bucket");
|
||||
BOOST_REQUIRE_EQUAL(object_name, "prefix1/prefix2/foo.bar");
|
||||
}
|
||||
|
||||
@@ -10,7 +10,9 @@
|
||||
# and automatically setting up the fixtures they need.
|
||||
|
||||
import pytest
|
||||
|
||||
import logging
|
||||
import pathlib
|
||||
import boto3
|
||||
from cassandra.cluster import Cluster, NoHostAvailable
|
||||
from cassandra.connection import DRIVER_NAME, DRIVER_VERSION
|
||||
import json
|
||||
@@ -22,7 +24,7 @@ import time
|
||||
import random
|
||||
|
||||
from .util import unique_name, new_test_keyspace, keyspace_has_tablets, cql_session, local_process_id, is_scylla, config_value_context
|
||||
|
||||
from test.pylib.minio_server import MinioServer
|
||||
|
||||
print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
|
||||
|
||||
@@ -42,6 +44,14 @@ def pytest_addoption(parser):
|
||||
# presence.
|
||||
parser.addoption('--omit-scylla-output', action='store_true',
|
||||
help='Omit scylla\'s output from the test output')
|
||||
parser.addoption('--no-minio', action="store_true", help="Signal to not run S3 related tests")
|
||||
s3_options = parser.getgroup("s3-server", description="S3 Server settings")
|
||||
s3_options.addoption('--s3-server-address')
|
||||
s3_options.addoption('--s3-server-port', type=int)
|
||||
s3_options.addoption('--aws-access-key')
|
||||
s3_options.addoption('--aws-secret-key')
|
||||
s3_options.addoption('--aws-region')
|
||||
s3_options.addoption('--s3-server-bucket')
|
||||
|
||||
# "cql" fixture: set up client object for communicating with the CQL API.
|
||||
# The host/port combination of the server are determined by the --host and
|
||||
@@ -278,3 +288,10 @@ def compact_storage(cql):
|
||||
# so the above may fail on cassandra.
|
||||
# This is fine since compact storage is enabled there by default.
|
||||
yield
|
||||
|
||||
# Skip tests that require a running Minio server if the --no-minio option is set, intended to be set from test/cqlpy/run
|
||||
# Otherwise, use the provided minio server to run all S3 related tests
|
||||
@pytest.fixture
|
||||
def skip_s3_tests(request):
|
||||
if request.config.getoption("--no-minio"):
|
||||
pytest.skip("Skipping S3 related tests being run from test/cqlpy/run")
|
||||
|
||||
@@ -54,7 +54,7 @@ run.wait_for_services(pid, [
|
||||
lambda: run.check_rest_api(ip),
|
||||
lambda: check_cql(ip)
|
||||
])
|
||||
success = run.run_pytest(sys.path[0], ['--host=' + ip] + sys.argv[1:])
|
||||
success = run.run_pytest(sys.path[0], ['--no-minio']+['--host=' + ip] + sys.argv[1:])
|
||||
|
||||
run.summary = 'Scylla tests pass' if success else 'Scylla tests failure'
|
||||
|
||||
|
||||
@@ -27,6 +27,8 @@ from . import util
|
||||
from typing import Iterable, Type, Union
|
||||
from cassandra.util import Duration
|
||||
|
||||
from test.cluster.object_store.conftest import s3_server, get_s3_resource
|
||||
|
||||
|
||||
def simple_no_clustering_table(cql, keyspace):
|
||||
table = util.unique_name()
|
||||
@@ -149,7 +151,7 @@ def get_sstables_for_table(data_dir, keyspace, table):
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def scylla_sstable(table_factory, cql, ks, data_dir):
|
||||
def scylla_sstable(table_factory, cql, ks, data_dir, s3_server=None, copy_to_s3=False, everywhere=False):
|
||||
table, schema = table_factory(cql, ks)
|
||||
|
||||
schema_file = os.path.join(data_dir, "..", "test_tools_schema.cql")
|
||||
@@ -158,6 +160,13 @@ def scylla_sstable(table_factory, cql, ks, data_dir):
|
||||
|
||||
sstables = get_sstables_for_table(data_dir, ks, table)
|
||||
|
||||
if copy_to_s3:
|
||||
sstable_path = os.path.dirname(sstables[0])
|
||||
if everywhere:
|
||||
sstables += upload_folder_to_s3(sstable_path, s3_server)
|
||||
else:
|
||||
sstables = upload_folder_to_s3(sstable_path, s3_server)
|
||||
|
||||
try:
|
||||
yield (table, schema_file, sstables)
|
||||
finally:
|
||||
@@ -175,6 +184,63 @@ def all_sstables(sstables):
|
||||
return sstables
|
||||
|
||||
|
||||
def upload_folder_to_s3(folder_path, s3_server):
|
||||
s3_resource = get_s3_resource(s3_server)
|
||||
bucket = s3_resource.Bucket(s3_server.bucket_name)
|
||||
prefix = "just/some/s3/prefix"
|
||||
sstables = []
|
||||
for root, dirs, files in os.walk(folder_path):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
s3_key = os.path.join(prefix, os.path.relpath(file_path, folder_path))
|
||||
if file.endswith('-Data.db'):
|
||||
sstables.append(f"s3://{s3_server.bucket_name}/{s3_key}")
|
||||
bucket.upload_file(file_path, s3_key)
|
||||
return sstables
|
||||
|
||||
|
||||
@pytest.mark.parametrize("what", ["index", "compression-info", "summary", "statistics", "scylla-metadata"])
|
||||
@pytest.mark.parametrize("where", ["s3", "mixed"])
|
||||
def test_scylla_sstable_dump_component_with_s3(skip_s3_tests, cql, test_keyspace, scylla_path, scylla_data_dir,
|
||||
scylla_home_dir, what,
|
||||
where, s3_server):
|
||||
scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml")
|
||||
with open(scylla_yaml_file, "a") as f:
|
||||
f.write(f"\nobject_storage_config_file: {str(s3_server.config_file)}")
|
||||
|
||||
with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir, s3_server,
|
||||
False if where == "local" else True, True if where == "mixed" else False) as (
|
||||
_, schema_file, sstables):
|
||||
out = subprocess.check_output(
|
||||
[scylla_path, "sstable", f"dump-{what}", "--scylla-yaml-file", scylla_yaml_file, "--schema-file",
|
||||
schema_file] + all_sstables(sstables))
|
||||
|
||||
print(out)
|
||||
|
||||
assert out
|
||||
assert json.loads(out)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("where", ["s3", "mixed"])
|
||||
def test_scylla_sstable_dump_data_with_s3(skip_s3_tests, cql, test_keyspace, scylla_path, scylla_data_dir,
|
||||
scylla_home_dir, where,
|
||||
s3_server):
|
||||
scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml")
|
||||
with open(scylla_yaml_file, "a") as f:
|
||||
f.write(f"\nobject_storage_config_file: {str(s3_server.config_file)}")
|
||||
with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir, s3_server,
|
||||
False if where == "local" else True, True if where == "mixed" else False) as (
|
||||
_, schema_file, sstables):
|
||||
args = [scylla_path, "sstable", "dump-data", "--scylla-yaml-file", scylla_yaml_file, "--schema-file",
|
||||
schema_file, "--output-format", "json"]
|
||||
out = subprocess.check_output(args + sstables)
|
||||
|
||||
print(out)
|
||||
|
||||
assert out
|
||||
assert json.loads(out)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("what", ["index", "compression-info", "summary", "statistics", "scylla-metadata"])
|
||||
@pytest.mark.parametrize("which_sstables", [one_sstable, all_sstables])
|
||||
def test_scylla_sstable_dump_component(cql, test_keyspace, scylla_path, scylla_data_dir, what, which_sstables):
|
||||
|
||||
@@ -14,10 +14,12 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/queue.hh>
|
||||
#include <seastar/core/units.hh>
|
||||
#include <seastar/http/short_streams.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include <seastar/core/queue.hh>
|
||||
|
||||
#include "init.hh"
|
||||
#include "compaction/compaction.hh"
|
||||
#include "compaction/compaction_strategy.hh"
|
||||
#include "compaction/compaction_strategy_state.hh"
|
||||
@@ -337,7 +339,7 @@ const std::vector<sstables::shared_sstable> load_sstables(schema_ptr schema, sst
|
||||
|
||||
parallel_for_each(sstable_names, [schema, &sst_man, &sstable_names, &sstables] (const sstring& sst_name) -> future<> {
|
||||
const auto i = std::distance(sstable_names.begin(), std::find(sstable_names.begin(), sstable_names.end(), sst_name));
|
||||
const auto sst_path = std::filesystem::canonical(std::filesystem::path(sst_name));
|
||||
auto sst_path = std::filesystem::path(sst_name);
|
||||
|
||||
if (const auto ftype_opt = co_await file_type(sst_path.c_str(), follow_symlink::yes)) {
|
||||
if (!ftype_opt) {
|
||||
@@ -349,10 +351,22 @@ const std::vector<sstables::shared_sstable> load_sstables(schema_ptr schema, sst
|
||||
}
|
||||
|
||||
|
||||
data_dictionary::storage_options options;
|
||||
auto ed = sstables::parse_path(sst_path, schema->ks_name(), schema->cf_name());
|
||||
const auto dir_path = sst_path.parent_path();
|
||||
auto local = data_dictionary::make_local_options(dir_path);
|
||||
auto sst = sst_man.make_sstable(schema, local, ed.generation, sstables::sstable_state::normal, ed.version, ed.format);
|
||||
|
||||
if (s3::is_s3_fqn(sst_path)) {
|
||||
if (sst_man.config().object_storage_config().empty()) {
|
||||
throw std::invalid_argument("Unable to open SSTable in S3: AWS object storage configuration missing. Please provide a --scylla-yaml-file with "
|
||||
"valid AWS object storage configuration.");
|
||||
}
|
||||
auto endpoint = sst_man.config().object_storage_config().begin()->first;
|
||||
options = data_dictionary::make_s3_options(endpoint, sst_path);
|
||||
} else {
|
||||
sst_path = std::filesystem::canonical(std::filesystem::path(sst_name));
|
||||
const auto dir_path = sst_path.parent_path();
|
||||
options = data_dictionary::make_local_options(dir_path);
|
||||
}
|
||||
auto sst = sst_man.make_sstable(schema, options, ed.generation, sstables::sstable_state::normal, ed.version, ed.format);
|
||||
|
||||
try {
|
||||
co_await sst->load(schema->get_sharder(), sstables::sstable_open_config{.load_first_and_last_position_metadata = false});
|
||||
@@ -3508,6 +3522,8 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
|
||||
}).get();
|
||||
dbcfg.setup_directories();
|
||||
sst_log.debug("Successfully read scylla.yaml from {} location of {}", scylla_yaml_path_source, scylla_yaml_path);
|
||||
read_object_storage_config(dbcfg).get();
|
||||
sst_log.debug("Successfully read object storage settings");
|
||||
} else {
|
||||
dbcfg.experimental_features.set(db::experimental_features_t::all());
|
||||
sst_log.debug("Failed to read scylla.yaml from {} location of {}, some functionality may be unavailable", scylla_yaml_path_source, scylla_yaml_path);
|
||||
@@ -3548,9 +3564,25 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
|
||||
cache_tracker tracker;
|
||||
sstables::directory_semaphore dir_sem(1);
|
||||
abort_source abort;
|
||||
sstables::sstables_manager sst_man("scylla_sstable", large_data_handler, dbcfg, feature_service, tracker,
|
||||
memory::stats().total_memory(), dir_sem,
|
||||
[host_id = locator::host_id::create_random_id()] { return host_id; }, abort);
|
||||
|
||||
sstables::storage_manager::config stm_cfg;
|
||||
stm_cfg.s3_clients_memory = 100_MiB;
|
||||
sharded<sstables::storage_manager> sstm;
|
||||
sstm.start(std::ref(dbcfg), stm_cfg).get();
|
||||
auto stop_sstm = defer([&sstm] { sstm.stop().get(); });
|
||||
|
||||
sstables::sstables_manager sst_man(
|
||||
"scylla_sstable",
|
||||
large_data_handler,
|
||||
dbcfg,
|
||||
feature_service,
|
||||
tracker,
|
||||
1_GiB,
|
||||
dir_sem,
|
||||
[host_id = locator::host_id::create_random_id()] { return host_id; },
|
||||
abort,
|
||||
current_scheduling_group(),
|
||||
&sstm.local());
|
||||
auto close_sst_man = deferred_close(sst_man);
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
|
||||
@@ -60,7 +60,8 @@ target_sources(utils
|
||||
s3/credentials_providers/environment_aws_credentials_provider.cc
|
||||
s3/credentials_providers/instance_profile_credentials_provider.cc
|
||||
s3/credentials_providers/sts_assume_role_credentials_provider.cc
|
||||
s3/credentials_providers/aws_credentials_provider_chain.cc)
|
||||
s3/credentials_providers/aws_credentials_provider_chain.cc
|
||||
s3/utils/manip_s3.cc)
|
||||
target_include_directories(utils
|
||||
PUBLIC
|
||||
${CMAKE_SOURCE_DIR}
|
||||
|
||||
70
utils/s3/utils/manip_s3.cc
Normal file
70
utils/s3/utils/manip_s3.cc
Normal file
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "manip_s3.hh"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace s3 {
|
||||
namespace fs = std::filesystem;
|
||||
constexpr std::string_view s3_fqn_prefix = "s3://";
|
||||
|
||||
fs::path s3_canonicalize(const fs::path& path) {
|
||||
if (!is_s3_fqn(path) || path.string().length() < s3_fqn_prefix.length() - 1) {
|
||||
return path;
|
||||
}
|
||||
// Canonicalizing the original "s3://" changes it to "s3:/". Trim and re-add the "s3://" prefix.
|
||||
auto canonical = path.lexically_normal().string().substr(s3_fqn_prefix.length() - 1);
|
||||
return std::string(s3_fqn_prefix) + canonical;
|
||||
}
|
||||
|
||||
bool is_s3_fqn(const fs::path& fqn) {
|
||||
if (fqn.empty())
|
||||
return false;
|
||||
|
||||
return *(fqn.begin()) == s3_fqn_prefix.substr(0, 3);
|
||||
}
|
||||
|
||||
bool s3fqn_to_parts(const fs::path& fqn, std::string& bucket_name, std::string& object_name) {
|
||||
if (!is_s3_fqn(fqn)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto canonical = s3_canonicalize(fqn);
|
||||
auto it = canonical.begin();
|
||||
|
||||
// Expect at least two components: the scheme (e.g., "s3:") and the bucket name.
|
||||
if (std::distance(it, canonical.end()) < 2) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Skip the scheme component.
|
||||
++it;
|
||||
|
||||
// The next component is the bucket name.
|
||||
bucket_name = it->string();
|
||||
|
||||
// Advance to check for object parts.
|
||||
++it;
|
||||
if (it == canonical.end()) {
|
||||
// No object parts – default to root.
|
||||
object_name = "/";
|
||||
return true;
|
||||
}
|
||||
|
||||
// Combine remaining parts into the object path.
|
||||
fs::path obj;
|
||||
for (; it != canonical.end(); ++it) {
|
||||
obj /= *it;
|
||||
}
|
||||
|
||||
object_name = obj.string().empty() ? "/" : obj.string();
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace s3
|
||||
18
utils/s3/utils/manip_s3.hh
Normal file
18
utils/s3/utils/manip_s3.hh
Normal file
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include <filesystem>
|
||||
|
||||
namespace s3 {
|
||||
|
||||
std::filesystem::path s3_canonicalize(const std::filesystem::path& path);
|
||||
bool is_s3_fqn(const std::filesystem::path& fqn);
|
||||
bool s3fqn_to_parts(const std::filesystem::path& fqn, std::string& bucket_name, std::string& object_name);
|
||||
|
||||
} // namespace s3
|
||||
Reference in New Issue
Block a user