Merge 'Maintain sstable state explicitly' from Pavel Emelyanov

An sstable can be in one of several states -- normal, quarantined, staging, uploading. Right now this "state" is hard-wired into sstable's path, e.g. quarantined sstable would sit in e.g. /var/lib/data/ks-cf-012345/quarantine/ directory. Respectively, there's a bunch of directory names constexprs in sstables.hh defining each "state". Other than being confusing, this approach doesn't work well with S3 backend. Additionally, there's snapshot subdir that adds to the confusion, because snapshot is not quite a state.

This PR converts "state" from constexpr char* directories names into a enum class and patches the sstable creation, opening and state-changing API to use that enum instead of parsing the path.

refs: #13017
refs: #12707

Closes #14152

* github.com:scylladb/scylladb:
  sstable/storage: Make filesystem storage with initial state
  sstable: Maintain state
  sstable: Make .change_state() accept state, not directory string
  sstable: Construct it with state
  sstables_manager: Remove state-less make_sstable()
  table: Make sstables with required state
  test: Make sstables with upload state in some cases
  tools: Make sstables with normal state
  table: Open-code sstables making streaming helpers
  tests: Make sstables with normal state by default
  sstable_directory: Make sstable with required state
  sstable_directory: Construct with state
  distributed_loader: Make sstable with desired state when populating
  distributed_loader: Make sstable with upload state when uploading
  sstable: Introduce state enum
  sstable_directory: Merge verify and g.c. calls
  distributed_loader: Merge verify and gc invocations
  sstable/filesystem: Put underscores to dir members
  sstable/s3: Mark make_s3_object_name() const
  sstable: Remove filename(dir, ...) method
This commit is contained in:
Avi Kivity
2023-08-15 17:44:06 +03:00
20 changed files with 203 additions and 178 deletions

View File

@@ -1735,7 +1735,7 @@ static future<compaction_result> scrub_sstables_validate_mode(sstables::compacti
if (validation_errors != 0) {
for (auto& sst : descriptor.sstables) {
co_await sst->change_state(sstables::quarantine_dir);
co_await sst->change_state(sstables::sstable_state::quarantine);
}
}

View File

@@ -92,6 +92,7 @@ class feature_service;
namespace sstables {
enum class sstable_state;
class sstable;
class compaction_descriptor;
class compaction_completion_desc;
@@ -499,7 +500,7 @@ private:
db_clock::time_point _truncated_at = db_clock::time_point::min();
bool _is_bootstrap_or_replace = false;
sstables::shared_sstable make_sstable(sstring dir);
sstables::shared_sstable make_sstable(sstables::sstable_state state);
public:
void deregister_metrics();
@@ -717,7 +718,7 @@ public:
flat_mutation_reader_v2 make_nonpopulating_cache_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, tracing::trace_state_ptr ts);
sstables::shared_sstable make_streaming_sstable_for_write(std::optional<sstring> subdir = {});
sstables::shared_sstable make_streaming_sstable_for_write();
sstables::shared_sstable make_streaming_staging_sstable();
mutation_source as_mutation_source() const;

View File

@@ -33,7 +33,6 @@
#include <unordered_map>
#include <boost/range/adaptor/map.hpp>
#include "db/view/view_update_generator.hh"
#include "utils/directories.hh"
extern logging::logger dblog;
@@ -85,26 +84,10 @@ io_error_handler error_handler_gen_for_upload_dir(disk_error_signal_type& dummy)
future<>
distributed_loader::process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags) {
// verify owner and mode on the sstables directory
// and all its subdirectories, except for "snapshots"
// as there could be a race with scylla-manager that might
// delete snapshots concurrently
co_await dir.invoke_on(0, [] (const sstables::sstable_directory& d) -> future<> {
fs::path sstable_dir = d.sstable_dir();
co_await utils::directories::verify_owner_and_mode(sstable_dir, utils::directories::recursive::no);
co_await lister::scan_dir(sstable_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [] (fs::path dir, directory_entry de) -> future<> {
if (de.name != sstables::snapshots_dir) {
co_await utils::directories::verify_owner_and_mode(dir / de.name, utils::directories::recursive::yes);
}
});
co_await dir.invoke_on(0, [flags] (sstables::sstable_directory& d) -> future<> {
co_await d.prepare(flags);
});
if (flags.garbage_collect) {
co_await dir.invoke_on(0, [] (sstables::sstable_directory& d) {
return d.garbage_collect();
});
}
co_await dir.invoke_on_all([&dir, flags] (sstables::sstable_directory& d) -> future<> {
// Supposed to be called with the node either down or on behalf of maintenance tasks
// like nodetool refresh
@@ -164,7 +147,7 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
co_await dir.do_for_each_sstable([&table, needs_view_update, &new_sstables] (sstables::shared_sstable sst) -> future<> {
auto gen = table.calculate_generation_for_new_table();
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), needs_view_update ? "staging" : "base", gen);
co_await sst->pick_up_from_upload(!needs_view_update ? sstables::normal_dir : sstables::staging_dir, gen);
co_await sst->pick_up_from_upload(!needs_view_update ? sstables::sstable_state::normal : sstables::sstable_state::staging, gen);
// When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
sst->set_sstable_level(0);
new_sstables.push_back(std::move(sst));
@@ -205,13 +188,12 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
auto stop_erms = deferred_stop(erms);
sharded<sstables::sstable_directory> directory;
auto upload = fs::path(global_table->dir()) / sstables::upload_dir;
directory.start(
sharded_parameter([&global_table] { return std::ref(global_table->get_sstables_manager()); }),
sharded_parameter([&global_table] { return global_table->schema(); }),
sharded_parameter([&global_table, &erms] { return std::ref(erms.local()->get_sharder(*global_table->schema())); }),
sharded_parameter([&global_table] { return global_table->get_storage_options_ptr(); }),
upload, &error_handler_gen_for_upload_dir
global_table->dir(), sstables::sstable_state::upload, &error_handler_gen_for_upload_dir
).get();
auto stop_directory = deferred_stop(directory);
@@ -235,8 +217,8 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
auto generation = sharded_gen.invoke_on(shard, [uuid_sstable_identifiers] (auto& gen) {
return gen(sstables::uuid_identifiers{uuid_sstable_identifiers});
}).get();
return sstm.make_sstable(global_table->schema(), global_table->get_storage_options(),
upload.native(), generation, sstm.get_highest_supported_format(),
return sstm.make_sstable(global_table->schema(), global_table->dir(), global_table->get_storage_options(),
generation, sstables::sstable_state::upload, sstm.get_highest_supported_format(),
sstables::sstable_format_types::big, gc_clock::now(), &error_handler_gen_for_upload_dir);
};
// Pass owned_ranges_ptr to reshard to piggy-back cleanup on the resharding compaction.
@@ -271,7 +253,6 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&
auto global_table = get_table_on_all_shards(db, ks, cf).get0();
sharded<sstables::sstable_directory> directory;
auto table_id = global_table->schema()->id();
auto upload = fs::path(global_table->dir()) / sstables::upload_dir;
sharded<locator::effective_replication_map_ptr> erms;
erms.start(sharded_parameter([&global_table] {
@@ -284,7 +265,7 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&
sharded_parameter([&global_table] { return global_table->schema(); }),
sharded_parameter([&global_table, &erms] { return std::ref(erms.local()->get_sharder(*global_table->schema())); }),
sharded_parameter([&global_table] { return global_table->get_storage_options_ptr(); }),
upload, &error_handler_gen_for_upload_dir
global_table->dir(), sstables::sstable_state::upload, &error_handler_gen_for_upload_dir
).get();
auto stop = deferred_stop(directory);
@@ -340,8 +321,8 @@ public:
return _global_table->get_effective_replication_map();
}));
for (auto subdir : { "", sstables::staging_dir, sstables::quarantine_dir }) {
co_await start_subdir(subdir);
for (auto state : { sstables::sstable_state::normal, sstables::sstable_state::staging, sstables::sstable_state::quarantine }) {
co_await start_subdir(state);
}
co_await smp::invoke_on_all([this] {
@@ -349,9 +330,9 @@ public:
return _global_table->disable_auto_compaction();
});
co_await populate_subdir(sstables::staging_dir, allow_offstrategy_compaction::no);
co_await populate_subdir(sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no);
co_await populate_subdir("", allow_offstrategy_compaction::yes);
co_await populate_subdir(sstables::sstable_state::staging, allow_offstrategy_compaction::no);
co_await populate_subdir(sstables::sstable_state::quarantine, allow_offstrategy_compaction::no, must_exist::no);
co_await populate_subdir(sstables::sstable_state::normal, allow_offstrategy_compaction::yes);
co_await smp::invoke_on_all([this] {
_global_table->mark_ready_for_writes();
@@ -372,12 +353,13 @@ private:
using allow_offstrategy_compaction = bool_class<struct allow_offstrategy_compaction_tag>;
using must_exist = bool_class<struct must_exist_tag>;
future<> populate_subdir(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes);
future<> populate_subdir(sstables::sstable_state state, allow_offstrategy_compaction, must_exist = must_exist::yes);
future<> start_subdir(sstring subdir);
future<> start_subdir(sstables::sstable_state state);
};
future<> table_populator::start_subdir(sstring subdir) {
future<> table_populator::start_subdir(sstables::sstable_state state) {
auto subdir = sstables::state_to_dir(state);
sstring sstdir = get_path(subdir).native();
if (!co_await file_exists(sstdir)) {
co_return;
@@ -392,7 +374,7 @@ future<> table_populator::start_subdir(sstring subdir) {
sharded_parameter([&global_table] { return global_table->schema(); }),
sharded_parameter([this] { return std::ref(_erms.local()->get_sharder(*_global_table->schema())); }),
sharded_parameter([&global_table] { return global_table->get_storage_options_ptr(); }),
fs::path(sstdir),
global_table->dir(), state,
default_io_error_handler_gen()
);
@@ -422,11 +404,12 @@ future<> table_populator::start_subdir(sstring subdir) {
_highest_generation = std::max(generation, _highest_generation);
}
sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstables::generation_type generation, sstables::sstable_version_types v) {
return table.get_sstables_manager().make_sstable(table.schema(), table.get_storage_options(), dir.native(), generation, v, sstables::sstable_format_types::big);
sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_state state, sstables::generation_type generation, sstables::sstable_version_types v) {
return table.get_sstables_manager().make_sstable(table.schema(), table.dir(), table.get_storage_options(), generation, state, v, sstables::sstable_format_types::big);
}
future<> table_populator::populate_subdir(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
future<> table_populator::populate_subdir(sstables::sstable_state state, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
auto subdir = state_to_dir(state);
auto sstdir = get_path(subdir);
dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", _ks, _cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist);
@@ -439,12 +422,12 @@ future<> table_populator::populate_subdir(sstring subdir, allow_offstrategy_comp
auto& directory = *_sstable_directories.at(subdir);
co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, sstdir] (shard_id shard) mutable {
co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, state] (shard_id shard) mutable {
auto gen = smp::submit_to(shard, [this] () {
return _global_table->calculate_generation_for_new_table();
}).get0();
return make_sstable(*_global_table, sstdir, gen, _highest_version);
return make_sstable(*_global_table, state, gen, _highest_version);
});
// The node is offline at this point so we are very lenient with what we consider
@@ -459,9 +442,9 @@ future<> table_populator::populate_subdir(sstring subdir, allow_offstrategy_comp
return sst->get_origin() != sstables::repair_origin;
};
co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, sstdir] (shard_id shard) {
co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, state] (shard_id shard) {
auto gen = _global_table->calculate_generation_for_new_table();
return make_sstable(*_global_table, sstdir, gen, _highest_version);
return make_sstable(*_global_table, state, gen, _highest_version);
}, eligible_for_reshape_on_boot);
co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> {

View File

@@ -278,18 +278,16 @@ table::make_reader_v2(schema_ptr s,
return rd;
}
sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional<sstring> subdir) {
sstring dir = _config.datadir;
if (subdir) {
dir += "/" + *subdir;
}
auto newtab = make_sstable(dir);
tlogger.debug("Created sstable for streaming: ks={}, cf={}, dir={}", schema()->ks_name(), schema()->cf_name(), dir);
sstables::shared_sstable table::make_streaming_sstable_for_write() {
auto newtab = make_sstable(sstables::sstable_state::normal);
tlogger.debug("Created sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name());
return newtab;
}
sstables::shared_sstable table::make_streaming_staging_sstable() {
return make_streaming_sstable_for_write(sstables::staging_dir);
auto newtab = make_sstable(sstables::sstable_state::staging);
tlogger.debug("Created staging sstable for streaming: ks={}, cf={}", schema()->ks_name(), schema()->cf_name());
return newtab;
}
static flat_mutation_reader_v2 maybe_compact_for_streaming(flat_mutation_reader_v2 underlying, const compaction_manager& cm, gc_clock::time_point compaction_time, bool compaction_enabled) {
@@ -436,13 +434,13 @@ static bool belongs_to_other_shard(const std::vector<shard_id>& shards) {
return shards.size() != size_t(belongs_to_current_shard(shards));
}
sstables::shared_sstable table::make_sstable(sstring dir) {
sstables::shared_sstable table::make_sstable(sstables::sstable_state state) {
auto& sstm = get_sstables_manager();
return sstm.make_sstable(_schema, *_storage_opts, dir, calculate_generation_for_new_table(), sstm.get_highest_supported_format(), sstables::sstable::format_types::big);
return sstm.make_sstable(_schema, _config.datadir, *_storage_opts, calculate_generation_for_new_table(), state, sstm.get_highest_supported_format(), sstables::sstable::format_types::big);
}
sstables::shared_sstable table::make_sstable() {
return make_sstable(_config.datadir);
return make_sstable(sstables::sstable_state::normal);
}
void table::notify_bootstrap_or_replace_start() {
@@ -2587,7 +2585,7 @@ future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable>
// completed first.
// The _sstable_deletion_sem prevents list update on off-strategy completion and move_sstables_from_staging()
// from stepping on each other's toe.
co_await sst->change_state(sstables::normal_dir, &delay_commit);
co_await sst->change_state(sstables::sstable_state::normal, &delay_commit);
auto& cg = compaction_group_for_sstable(sst);
if (get_compaction_manager().requires_cleanup(cg.as_table_state(), sst)) {
compaction_groups_to_notify.insert(&cg);

View File

@@ -20,6 +20,7 @@
#include "sstable_directory.hh"
#include "utils/lister.hh"
#include "utils/overloaded_functor.hh"
#include "utils/directories.hh"
#include "replica/database.hh"
#include "db/system_keyspace.hh"
@@ -64,12 +65,15 @@ sstable_directory::sstable_directory(sstables_manager& manager,
schema_ptr schema,
const dht::sharder& sharder,
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
fs::path sstable_dir,
sstring table_dir,
sstable_state state,
io_error_handler_gen error_handler_gen)
: _manager(manager)
, _schema(std::move(schema))
, _storage_opts(std::move(storage_opts))
, _sstable_dir(std::move(sstable_dir))
, _table_dir(std::move(table_dir))
, _state(state)
, _sstable_dir(make_path(_table_dir, _state))
, _error_handler_gen(error_handler_gen)
, _lister(make_components_lister())
, _sharder(sharder)
@@ -126,7 +130,7 @@ void sstable_directory::validate(sstables::shared_sstable sst, process_flags fla
}
future<sstables::shared_sstable> sstable_directory::load_sstable(sstables::entry_descriptor desc, sstables::sstable_open_config cfg) const {
auto sst = _manager.make_sstable(_schema, *_storage_opts, _sstable_dir.native(), desc.generation, desc.version, desc.format, gc_clock::now(), _error_handler_gen);
auto sst = _manager.make_sstable(_schema, _table_dir, *_storage_opts, desc.generation, _state, desc.version, desc.format, gc_clock::now(), _error_handler_gen);
co_await sst->load(_sharder, cfg);
co_return sst;
}
@@ -156,7 +160,7 @@ sstable_directory::process_descriptor(sstables::entry_descriptor desc, process_f
}
future<std::vector<shard_id>> sstable_directory::get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const {
auto sst = _manager.make_sstable(_schema, *_storage_opts, _sstable_dir.native(), desc.generation, desc.version, desc.format, gc_clock::now(), _error_handler_gen);
auto sst = _manager.make_sstable(_schema, _table_dir, *_storage_opts, desc.generation, _state, desc.version, desc.format, gc_clock::now(), _error_handler_gen);
co_await sst->load_owner_shards(_sharder);
validate(sst, flags);
co_return sst->get_shards_for_this_sstable();
@@ -198,6 +202,22 @@ sstable_directory::highest_version_seen() const {
return _max_version_seen;
}
future<> sstable_directory::prepare(process_flags flags) {
// verify owner and mode on the sstables directory
// and all its subdirectories, except for "snapshots"
// as there could be a race with scylla-manager that might
// delete snapshots concurrently
co_await utils::directories::verify_owner_and_mode(_sstable_dir, utils::directories::recursive::no);
co_await lister::scan_dir(_sstable_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [] (fs::path dir, directory_entry de) -> future<> {
if (de.name != sstables::snapshots_dir) {
co_await utils::directories::verify_owner_and_mode(dir / de.name, utils::directories::recursive::yes);
}
});
if (flags.garbage_collect) {
co_await garbage_collect();
}
}
future<> sstable_directory::process_sstable_dir(process_flags flags) {
dirlog.debug("Start processing directory {} for SSTables (storage {})", _sstable_dir, _storage_opts->type_string());
return _lister->process(*this, flags);
@@ -342,7 +362,7 @@ sstable_directory::move_foreign_sstables(sharded<sstable_directory>& source_dire
}
future<shared_sstable> sstable_directory::load_foreign_sstable(foreign_sstable_open_info& info) {
auto sst = _manager.make_sstable(_schema, *_storage_opts, _sstable_dir.native(), info.generation, info.version, info.format, gc_clock::now(), _error_handler_gen);
auto sst = _manager.make_sstable(_schema, _table_dir, *_storage_opts, info.generation, _state, info.version, info.format, gc_clock::now(), _error_handler_gen);
co_await sst->load(std::move(info));
co_return sst;
}

View File

@@ -32,6 +32,7 @@ namespace db { class system_keyspace; }
namespace sstables {
enum class sstable_state;
class sstables_manager;
bool manifest_json_filter(const std::filesystem::path&, const directory_entry& entry);
@@ -133,7 +134,9 @@ private:
sstables_manager& _manager;
schema_ptr _schema;
lw_shared_ptr<const data_dictionary::storage_options> _storage_opts;
std::filesystem::path _sstable_dir;
sstring _table_dir;
sstable_state _state;
std::filesystem::path _sstable_dir; // FIXME -- remove eventually
io_error_handler_gen _error_handler_gen;
std::unique_ptr<components_lister> _lister;
const dht::sharder& _sharder;
@@ -188,7 +191,8 @@ public:
schema_ptr schema,
const dht::sharder& sharder,
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
std::filesystem::path sstable_dir,
sstring table_dir,
sstable_state state,
io_error_handler_gen error_handler_gen);
std::vector<sstables::shared_sstable>& get_unsorted_sstables() {
@@ -206,6 +210,8 @@ public:
// returns what is the highest version seen in this directory.
sstables::sstable_version_types highest_version_seen() const;
future<> prepare(process_flags flags);
// scans a directory containing SSTables. Every generation that is believed to belong to this
// shard is processed, the ones that are not are skipped. Potential pertinence is decided as
// generation % smp::count.
@@ -250,10 +256,6 @@ public:
using can_be_remote = bool_class<struct can_be_remote_tag>;
future<> collect_output_unshared_sstables(std::vector<sstables::shared_sstable> resharded_sstables, can_be_remote);
std::filesystem::path sstable_dir() const noexcept {
return _sstable_dir;
}
// When we compact sstables, we have to atomically instantiate the new
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
// and if A contained some data that was tombstoned by B, and if B was

View File

@@ -72,7 +72,6 @@
#include "sstables/partition_index_cache.hh"
#include "utils/UUID_gen.hh"
#include "sstables_manager.hh"
#include <boost/algorithm/string/predicate.hpp>
#include "tracing/traced_file.hh"
#include "kl/reader.hh"
#include "mx/reader.hh"
@@ -1976,18 +1975,6 @@ std::vector<sstring> sstable::component_filenames() const {
return res;
}
bool sstable::requires_view_building() const {
return boost::algorithm::ends_with(_storage->prefix(), staging_dir);
}
bool sstable::is_quarantined() const noexcept {
return boost::algorithm::ends_with(_storage->prefix(), quarantine_dir);
}
bool sstable::is_uploaded() const noexcept {
return boost::algorithm::ends_with(_storage->prefix(), upload_dir);
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
format_types format, sstring component) {
sstring v = fmt::to_string(version);
@@ -2038,13 +2025,15 @@ future<> sstable::snapshot(const sstring& dir) const {
return _storage->snapshot(*this, dir, storage::absolute_path::yes);
}
future<> sstable::change_state(sstring to, delayed_commit_changes* delay_commit) {
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {
co_await _storage->change_state(*this, to, _generation, delay_commit);
_state = to;
}
future<> sstable::pick_up_from_upload(sstring to, generation_type new_generation) {
future<> sstable::pick_up_from_upload(sstable_state to, generation_type new_generation) {
co_await _storage->change_state(*this, to, new_generation, nullptr);
_generation = std::move(new_generation);
_state = to;
}
future<> delayed_commit_changes::commit() {
@@ -2943,9 +2932,10 @@ mutation_source sstable::as_mutation_source() {
}
sstable::sstable(schema_ptr schema,
sstring table_dir,
const data_dictionary::storage_options& storage,
sstring dir,
generation_type generation,
sstable_state state,
version_types v,
format_types f,
db::large_data_handler& large_data_handler,
@@ -2956,7 +2946,8 @@ sstable::sstable(schema_ptr schema,
: sstable_buffer_size(buffer_size)
, _schema(std::move(schema))
, _generation(generation)
, _storage(make_storage(manager, storage, std::move(dir)))
, _state(state)
, _storage(make_storage(manager, storage, std::move(table_dir), _state))
, _version(v)
, _format(f)
, _index_cache(std::make_unique<partition_index_cache>(

View File

@@ -134,6 +134,35 @@ constexpr auto table_subdirectories = std::to_array({
pending_delete_dir,
});
enum class sstable_state {
normal,
staging,
quarantine,
upload,
};
inline sstring state_to_dir(sstable_state state) {
switch (state) {
case sstable_state::normal:
return normal_dir;
case sstable_state::staging:
return staging_dir;
case sstable_state::quarantine:
return quarantine_dir;
case sstable_state::upload:
return upload_dir;
}
}
// FIXME -- temporary, move to fs storage after patching the rest
inline fs::path make_path(std::string_view table_dir, sstable_state state) {
fs::path ret(table_dir);
if (state != sstable_state::normal) {
ret /= state_to_dir(state).c_str();
}
return ret;
}
constexpr const char* repair_origin = "repair";
class delayed_commit_changes {
@@ -151,9 +180,10 @@ public:
using manager_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
public:
sstable(schema_ptr schema,
sstring table_dir,
const data_dictionary::storage_options& storage,
sstring dir,
generation_type generation,
sstable_state state,
version_types v,
format_types f,
db::large_data_handler& large_data_handler,
@@ -214,11 +244,11 @@ public:
//
// Known states are normal, staging, upload and quarantine.
// It's up to the storage driver how to implement this.
future<> change_state(sstring to, delayed_commit_changes* delay = nullptr);
future<> change_state(sstable_state to, delayed_commit_changes* delay = nullptr);
// Filesystem-specific call to grab an sstable from upload dir and
// put it into the desired destination assigning the given generation
future<> pick_up_from_upload(sstring to, generation_type new_generation);
future<> pick_up_from_upload(sstable_state to, generation_type new_generation);
generation_type generation() const {
return _generation;
@@ -376,11 +406,11 @@ public:
return filename(component_type::Index);
}
bool requires_view_building() const;
bool requires_view_building() const noexcept { return _state == sstable_state::staging; }
bool is_quarantined() const noexcept;
bool is_quarantined() const noexcept { return _state == sstable_state::quarantine; }
bool is_uploaded() const noexcept;
bool is_uploaded() const noexcept { return _state == sstable_state::upload; }
std::vector<std::pair<component_type, sstring>> all_components() const;
@@ -469,10 +499,7 @@ public:
private:
sstring filename(component_type f) const {
return filename(_storage->prefix(), f);
}
sstring filename(const sstring& dir, component_type f) const {
auto dir = _storage->prefix();
return filename(dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
}
@@ -515,6 +542,7 @@ private:
schema_ptr _schema;
generation_type _generation{0};
sstable_state _state;
std::unique_ptr<storage> _storage;

View File

@@ -102,15 +102,16 @@ bool sstables_manager::uuid_sstable_identifiers() const {
}
shared_sstable sstables_manager::make_sstable(schema_ptr schema,
sstring table_dir,
const data_dictionary::storage_options& storage,
sstring dir,
generation_type generation,
sstable_state state,
sstable_version_types v,
sstable_format_types f,
gc_clock::time_point now,
io_error_handler_gen error_handler_gen,
size_t buffer_size) {
return make_lw_shared<sstable>(std::move(schema), storage, std::move(dir), generation, v, f, get_large_data_handler(), *this, now, std::move(error_handler_gen), buffer_size);
return make_lw_shared<sstable>(std::move(schema), std::move(table_dir), storage, generation, state, v, f, get_large_data_handler(), *this, now, std::move(error_handler_gen), buffer_size);
}
sstable_writer_config sstables_manager::configure_writer(sstring origin) const {

View File

@@ -104,11 +104,10 @@ public:
explicit sstables_manager(db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker&, size_t available_memory, directory_semaphore& dir_sem, storage_manager* shared = nullptr);
virtual ~sstables_manager();
// Constructs a shared sstable
shared_sstable make_sstable(schema_ptr schema,
const data_dictionary::storage_options& storage, // FIXME -- move dir on options
sstring dir,
shared_sstable make_sstable(schema_ptr schema, sstring table_dir,
const data_dictionary::storage_options& storage,
generation_type generation,
sstable_state state = sstable_state::normal,
sstable_version_types v = get_highest_sstable_version(),
sstable_format_types f = sstable_format_types::big,
gc_clock::time_point now = gc_clock::now(),

View File

@@ -35,8 +35,8 @@ namespace sstables {
// cannot define these classes in an anonymous namespace, as we need to
// declare these storage classes as "friend" of class sstable
class filesystem_storage final : public sstables::storage {
sstring dir;
std::optional<sstring> temp_dir; // Valid while the sstable is being created, until sealed
sstring _dir;
std::optional<sstring> _temp_dir; // Valid while the sstable is being created, until sealed
private:
using mark_for_removal = bool_class<class mark_for_removal_tag>;
@@ -51,15 +51,17 @@ private:
future<> rename_new_file(const sstable& sst, sstring from_name, sstring to_name) const;
virtual void change_dir_for_test(sstring nd) override {
dir = std::move(nd);
_dir = std::move(nd);
}
public:
explicit filesystem_storage(sstring dir_) : dir(std::move(dir_)) {}
explicit filesystem_storage(sstring dir, sstable_state state)
: _dir(make_path(dir, state).native())
{}
virtual future<> seal(const sstable& sst) override;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const override;
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) override;
virtual future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
@@ -71,7 +73,7 @@ public:
return sstable_directory::delete_with_pending_deletion_log;
}
virtual sstring prefix() const override { return dir; }
virtual sstring prefix() const override { return _dir; }
};
future<data_sink> filesystem_storage::make_data_or_index_sink(sstable& sst, component_type type) {
@@ -107,8 +109,8 @@ future<> filesystem_storage::rename_new_file(const sstable& sst, sstring from_na
future<file> filesystem_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
auto create_flags = open_flags::create | open_flags::exclusive;
auto readonly = (flags & create_flags) != create_flags;
auto tgt_dir = !readonly && temp_dir ? *temp_dir : dir;
auto name = sst.filename(tgt_dir, type);
auto tgt_dir = !readonly && _temp_dir ? *_temp_dir : _dir;
auto name = tgt_dir + "/" + sst.component_basename(type);
auto f = open_sstable_component_file_non_checked(name, flags, options, check_integrity);
@@ -148,14 +150,14 @@ void filesystem_storage::open(sstable& sst) {
// Flushing parent directory to guarantee that temporary TOC file reached
// the disk.
sst.sstable_write_io_check(sync_directory, dir).get();
sst.sstable_write_io_check(sync_directory, _dir).get();
}
future<> filesystem_storage::seal(const sstable& sst) {
// SSTable sealing is about renaming temporary TOC file after guaranteeing
// that each component reached the disk safely.
co_await remove_temp_dir();
auto dir_f = co_await open_checked_directory(sst._write_error_handler, dir);
auto dir_f = co_await open_checked_directory(sst._write_error_handler, _dir);
// Guarantee that every component of this sstable reached the disk.
co_await sst.sstable_write_io_check([&] { return dir_f.flush(); });
// Rename TOC because it's no longer temporary.
@@ -167,28 +169,28 @@ future<> filesystem_storage::seal(const sstable& sst) {
}
future<> filesystem_storage::touch_temp_dir(const sstable& sst) {
if (temp_dir) {
if (_temp_dir) {
co_return;
}
auto tmp = fmt::format("{}/{}{}", dir, sst._generation, tempdir_extension);
auto tmp = fmt::format("{}/{}{}", _dir, sst._generation, tempdir_extension);
sstlog.debug("Touching temp_dir={}", tmp);
co_await sst.sstable_touch_directory_io_check(tmp);
temp_dir = std::move(tmp);
_temp_dir = std::move(tmp);
}
future<> filesystem_storage::remove_temp_dir() {
if (!temp_dir) {
if (!_temp_dir) {
co_return;
}
sstlog.debug("Removing temp_dir={}", temp_dir);
sstlog.debug("Removing temp_dir={}", _temp_dir);
try {
co_await remove_file(*temp_dir);
co_await remove_file(*_temp_dir);
} catch (...) {
sstlog.error("Could not remove temporary directory: {}", std::current_exception());
throw;
}
temp_dir.reset();
_temp_dir.reset();
}
static bool is_same_file(const seastar::stat_data& sd1, const seastar::stat_data& sd2) noexcept {
@@ -230,7 +232,7 @@ future<> filesystem_storage::check_create_links_replay(const sstable& sst, const
const std::vector<std::pair<sstables::component_type, sstring>>& comps) const {
return parallel_for_each(comps, [this, &sst, &dst_dir, dst_gen] (const auto& p) mutable {
auto comp = p.second;
auto src = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, comp);
auto src = sstable::filename(_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, comp);
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, dst_gen, sst._format, comp);
return do_with(std::move(src), std::move(dst), [this] (const sstring& src, const sstring& dst) mutable {
return file_exists(dst).then([&, this] (bool exists) mutable {
@@ -247,7 +249,7 @@ future<> filesystem_storage::check_create_links_replay(const sstable& sst, const
if (!same) {
auto msg = format("Error while linking SSTable: {} to {}: File exists", src, dst);
sstlog.error("{}", msg);
return make_exception_future<>(malformed_sstable_exception(msg, dir));
return make_exception_future<>(malformed_sstable_exception(msg, _dir));
}
return make_ready_future<>();
});
@@ -304,7 +306,7 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
co_await sst.sstable_write_io_check(idempotent_link_file, sst.filename(component_type::TOC), std::move(dst));
co_await sst.sstable_write_io_check(sync_directory, dst_dir);
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation] (auto p) {
auto src = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, p.second);
auto src = sstable::filename(_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, p.second);
auto dst = sstable::filename(dst_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, generation, sst._format, p.second);
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
});
@@ -313,9 +315,9 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
if (mark_for_removal) {
// Now that the source sstable is linked to new_dir, mark the source links for
// deletion by leaving a TemporaryTOC file in the source directory.
auto src_temp_toc = sstable::filename(dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, component_type::TemporaryTOC);
auto src_temp_toc = sstable::filename(_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, sst._generation, sst._format, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
co_await sst.sstable_write_io_check(sync_directory, dir);
co_await sst.sstable_write_io_check(sync_directory, _dir);
} else {
// Now that the source sstable is linked to dir, remove
// the TemporaryTOC file at the destination.
@@ -331,7 +333,7 @@ future<> filesystem_storage::create_links(const sstable& sst, const sstring& dir
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs) const {
if (!abs) {
dir = this->dir + "/" + dir + "/";
dir = _dir + "/" + dir + "/";
}
co_await sst.sstable_touch_directory_io_check(dir);
co_await create_links(sst, dir);
@@ -339,11 +341,11 @@ future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_
future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
co_await touch_directory(new_dir);
sstring old_dir = dir;
sstring old_dir = _dir;
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal::yes);
dir = new_dir;
_dir = new_dir;
generation_type old_generation = sst._generation;
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
return sst.sstable_write_io_check(remove_file, sstable::filename(old_dir, sst._schema->ks_name(), sst._schema->cf_name(), sst._version, old_generation, sst._format, p.second));
@@ -358,8 +360,9 @@ future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generatio
}
}
future<> filesystem_storage::change_state(const sstable& sst, sstring to, generation_type new_generation, delayed_commit_changes* delay_commit) {
auto path = fs::path(dir);
future<> filesystem_storage::change_state(const sstable& sst, sstable_state state, generation_type new_generation, delayed_commit_changes* delay_commit) {
auto to = state_to_dir(state);
auto path = fs::path(_dir);
auto current = path.filename().native();
// Moving between states means moving between basedir/state subdirectories.
@@ -407,12 +410,12 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, std::current_exception());
}
if (temp_dir) {
if (_temp_dir) {
try {
co_await recursive_remove_directory(fs::path(*temp_dir));
temp_dir.reset();
co_await recursive_remove_directory(fs::path(*_temp_dir));
_temp_dir.reset();
} catch (...) {
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *temp_dir, std::current_exception());
sstlog.warn("Exception when deleting temporary sstable directory {}: {}", *_temp_dir, std::current_exception());
}
}
}
@@ -427,7 +430,7 @@ class s3_storage : public sstables::storage {
static constexpr auto status_sealed = "sealed";
static constexpr auto status_removing = "removing";
sstring make_s3_object_name(const sstable& sst, component_type type);
sstring make_s3_object_name(const sstable& sst, component_type type) const;
future<> ensure_remote_prefix(const sstable& sst);
@@ -443,7 +446,7 @@ public:
virtual future<> seal(const sstable& sst) override;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const override;
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) override;
virtual future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
@@ -460,7 +463,7 @@ public:
virtual sstring prefix() const override { return _location; }
};
sstring s3_storage::make_s3_object_name(const sstable& sst, component_type type) {
sstring s3_storage::make_s3_object_name(const sstable& sst, component_type type) const {
return format("/{}/{}/{}", _bucket, *_remote_prefix, sstable_version_constants::get_component_map(sst.get_version()).at(type));
}
@@ -511,7 +514,7 @@ future<> s3_storage::seal(const sstable& sst) {
co_await sst.manager().system_keyspace().sstables_registry_update_entry_status(_location, sst.generation(), status_sealed);
}
future<> s3_storage::change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) {
future<> s3_storage::change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) {
// FIXME -- this "move" means changing sstable state, e.g. move from staging
// or upload to base. To make this work the "status" part of the entry location
// must be detached from the entry location itself, see PR#12707
@@ -546,10 +549,10 @@ future<> s3_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs
co_await coroutine::return_exception(std::runtime_error("Snapshotting S3 objects not implemented"));
}
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstring dir) {
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstring dir, sstable_state state) {
return std::visit(overloaded_functor {
[dir] (const data_dictionary::storage_options::local& loc) mutable -> std::unique_ptr<sstables::storage> {
return std::make_unique<sstables::filesystem_storage>(std::move(dir));
[dir, state] (const data_dictionary::storage_options::local& loc) mutable -> std::unique_ptr<sstables::storage> {
return std::make_unique<sstables::filesystem_storage>(std::move(dir), state);
},
[dir, &manager] (const data_dictionary::storage_options::s3& os) mutable -> std::unique_ptr<sstables::storage> {
return std::make_unique<sstables::s3_storage>(manager.get_endpoint_client(os.endpoint), os.bucket, std::move(dir));

View File

@@ -26,6 +26,7 @@ class storage_options;
namespace sstables {
enum class sstable_state;
class delayed_commit_changes;
class sstable;
class sstables_manager;
@@ -52,7 +53,7 @@ public:
virtual future<> seal(const sstable& sst) = 0;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const = 0;
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) = 0;
virtual future<> change_state(const sstable& sst, sstable_state to, generation_type generation, delayed_commit_changes* delay) = 0;
// runs in async context
virtual void open(sstable& sst) = 0;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept = 0;
@@ -65,6 +66,6 @@ public:
virtual sstring prefix() const = 0;
};
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstring dir);
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstring table_dir, sstable_state state);
} // namespace sstables

View File

@@ -1250,7 +1250,7 @@ SEASTAR_TEST_CASE(populate_from_quarantine_works) {
auto idx = tests::random::get_int<size_t>(0, sstables.size() - 1);
testlog.debug("Moving sstable #{} out of {} to quarantine", idx, sstables.size());
auto sst = sstables[idx];
co_await sst->change_state(sstables::quarantine_dir);
co_await sst->change_state(sstables::sstable_state::quarantine);
found |= true;
});
co_return found;
@@ -1304,7 +1304,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
}
auto idx = tests::random::get_int<size_t>(0, sstables.size() - 1);
auto sst = sstables[idx];
co_await sst->change_state(sstables::quarantine_dir);
co_await sst->change_state(sstables::sstable_state::quarantine);
});
});
}

View File

@@ -5172,7 +5172,7 @@ SEASTAR_TEST_CASE(test_sstables_excluding_staging_correctness) {
auto sst_gen = env.make_sst_factory(s);
auto staging_sst = make_sstable_containing(sst_gen, {*sorted_muts.begin()});
staging_sst->change_state(sstables::staging_dir).get();
staging_sst->change_state(sstables::sstable_state::staging).get();
BOOST_REQUIRE(staging_sst->requires_view_building());
auto regular_sst = make_sstable_containing(sst_gen, {*sorted_muts.rbegin()});

View File

@@ -75,7 +75,7 @@ make_sstable_for_this_shard(std::function<sstables::shared_sstable()> sst_factor
/// Arguments passed to the function are passed to table::make_sstable
template <typename... Args>
sstables::shared_sstable
make_sstable_for_all_shards(replica::database& db, replica::table& table, fs::path sstdir, sstables::generation_type generation) {
make_sstable_for_all_shards(replica::database& db, replica::table& table, sstables::sstable_state state, sstables::generation_type generation) {
// Unlike the previous helper, we'll assume we're in a thread here. It's less flexible
// but the users are usually in a thread, and rewrite_toc_without_scylla_component requires
// a thread. We could fix that, but deferring that for now.
@@ -88,7 +88,7 @@ make_sstable_for_all_shards(replica::database& db, replica::table& table, fs::pa
mt->apply(std::move(m));
}
data_dictionary::storage_options local;
auto sst = table.get_sstables_manager().make_sstable(s, local, sstdir.native(), generation);
auto sst = table.get_sstables_manager().make_sstable(s, table.dir(), local, generation, state);
write_memtable_to_sstable(*mt, sst, table.get_sstables_manager().configure_writer("test")).get();
mt->clear_gently().get();
// We can't write an SSTable with bad sharding, so pretend
@@ -136,6 +136,7 @@ public:
// Called from a seastar thread
static void with_sstable_directory(
fs::path path,
sstables::sstable_state state,
wrapped_test_env env_wrap,
noncopyable_function<void (sharded<sstable_directory>&)> func) {
@@ -159,7 +160,7 @@ static void with_sstable_directory(
seastar::sharded_parameter([] { return test_table_schema(); }),
seastar::sharded_parameter([] { return std::ref(test_table_schema()->get_sharder()); }),
seastar::sharded_parameter([] { return make_lw_shared<data_dictionary::storage_options>(); }),
path.native(), default_io_error_handler_gen()).get();
path.native(), state, default_io_error_handler_gen()).get();
func(sstdir);
}
@@ -167,7 +168,7 @@ static void with_sstable_directory(
static void with_sstable_directory(
wrapped_test_env env_wrap,
noncopyable_function<void (sharded<sstable_directory>&)> func) {
with_sstable_directory(env_wrap.tmpdir_path(), std::move(env_wrap), std::move(func));
with_sstable_directory(env_wrap.tmpdir_path(), sstables::sstable_state::normal, std::move(env_wrap), std::move(func));
}
SEASTAR_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) {
@@ -359,7 +360,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_gene
}).get();
}
with_sstable_directory(dir.path(), env, [] (sharded<sstables::sstable_directory>& sstdir) {
with_sstable_directory(dir.path(), sstables::sstable_state::normal, env, [] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
});
@@ -391,7 +392,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_ge
}).get();
}
with_sstable_directory(dir.path(), env, [] (sharded<sstables::sstable_directory>& sstdir) {
with_sstable_directory(dir.path(), sstables::sstable_state::normal, env, [] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
});
@@ -416,7 +417,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) {
return cf.flush();
}).get();
with_sstable_directory(path, e, [&] (sharded<sstable_directory>& sstdir) {
with_sstable_directory(path, sstables::sstable_state::normal, e, [&] (sharded<sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, {}).get();
// Collect all sstable file names
@@ -485,7 +486,6 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto& cf = e.local_db().find_column_family("ks", "cf");
auto upload_path = fs::path(cf.dir()) / sstables::upload_dir;
e.db().invoke_on_all([] (replica::database& db) {
auto& cf = db.find_column_family("ks", "cf");
@@ -502,10 +502,10 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) {
return gen(sstables::uuid_identifiers::no);
}).get();
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation);
make_sstable_for_all_shards(e.db().local(), cf, sstables::sstable_state::upload, generation);
}
with_sstable_directory(upload_path, e, [&] (sharded<sstables::sstable_directory>& sstdir) {
with_sstable_directory(fs::path(cf.dir()), sstables::sstable_state::upload, e, [&] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
@@ -514,13 +514,13 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
sharded_gen.start(max_generation_seen.as_int()).get();
auto stop_generator = deferred_stop(sharded_gen);
auto make_sstable = [&e, upload_path, &sharded_gen] (shard_id shard) {
auto make_sstable = [&e, &sharded_gen] (shard_id shard) {
auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) {
return gen(sstables::uuid_identifiers::no);
}).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
data_dictionary::storage_options local;
return cf.get_sstables_manager().make_sstable(cf.schema(), local, upload_path.native(), generation);
return cf.get_sstables_manager().make_sstable(cf.schema(), cf.dir(), local, generation, sstables::sstable_state::upload);
};
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable)).get();
verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get();
@@ -555,10 +555,10 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly_with_owned
auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) {
return gen(sstables::uuid_identifiers::no);
}).get();
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation);
make_sstable_for_all_shards(e.db().local(), cf, sstables::sstable_state::upload, generation);
}
with_sstable_directory(upload_path, e, [&] (sharded<sstables::sstable_directory>& sstdir) {
with_sstable_directory(fs::path(cf.dir()), sstables::sstable_state::upload, e, [&] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
@@ -573,7 +573,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly_with_owned
}).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
data_dictionary::storage_options local;
return cf.get_sstables_manager().make_sstable(cf.schema(), local, upload_path.native(), generation);
return cf.get_sstables_manager().make_sstable(cf.schema(), cf.dir(), local, generation, sstables::sstable_state::upload);
};
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(e.db().local().get_keyspace_local_ranges("ks"));
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable), std::move(owned_ranges_ptr)).get();
@@ -591,7 +591,6 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto& cf = e.local_db().find_column_family("ks", "cf");
auto upload_path = fs::path(cf.dir()) / sstables::upload_dir;
e.db().invoke_on_all([] (replica::database& db) {
auto& cf = db.find_column_family("ks", "cf");
@@ -609,10 +608,10 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
auto generation = sharded_gen.invoke_on(0, [] (auto& gen) {
return gen(sstables::uuid_identifiers::no);
}).get();
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation);
make_sstable_for_all_shards(e.db().local(), cf, sstables::sstable_state::upload, generation);
}
with_sstable_directory(upload_path, e, [&e, upload_path] (sharded<sstables::sstable_directory>& sstdir) {
with_sstable_directory(fs::path(cf.dir()), sstables::sstable_state::upload, e, [&e] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
@@ -621,13 +620,13 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
sharded_gen.start(max_generation_seen.as_int()).get();
auto stop_generator = deferred_stop(sharded_gen);
auto make_sstable = [&e, upload_path, &sharded_gen] (shard_id shard) {
auto make_sstable = [&e, &sharded_gen] (shard_id shard) {
auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) {
return gen(sstables::uuid_identifiers::no);
}).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
data_dictionary::storage_options local;
return cf.get_sstables_manager().make_sstable(cf.schema(), local, upload_path.native(), generation);
return cf.get_sstables_manager().make_sstable(cf.schema(), cf.dir(), local, generation, sstables::sstable_state::upload);
};
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable)).get();
verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get();
@@ -644,7 +643,6 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto& cf = e.local_db().find_column_family("ks", "cf");
auto upload_path = fs::path(cf.dir()) / sstables::upload_dir;
e.db().invoke_on_all([] (replica::database& db) {
auto& cf = db.find_column_family("ks", "cf");
@@ -661,10 +659,10 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) {
return gen(sstables::uuid_identifiers::no);
}).get();
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation);
make_sstable_for_all_shards(e.db().local(), cf, sstables::sstable_state::upload, generation);
}
with_sstable_directory(upload_path, e, [&, upload_path] (sharded<sstables::sstable_directory>& sstdir) {
with_sstable_directory(fs::path(cf.dir()), sstables::sstable_state::upload, e, [&] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
@@ -673,13 +671,13 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
sharded_gen.start(max_generation_seen.as_int()).get();
auto stop_generator = deferred_stop(sharded_gen);
auto make_sstable = [&e, upload_path, &sharded_gen] (shard_id shard) {
auto make_sstable = [&e, &sharded_gen] (shard_id shard) {
auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) {
return gen(sstables::uuid_identifiers::no);
}).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
data_dictionary::storage_options local;
return cf.get_sstables_manager().make_sstable(cf.schema(), local, upload_path.native(), generation);
return cf.get_sstables_manager().make_sstable(cf.schema(), cf.dir(), local, generation, sstables::sstable_state::upload);
};
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable)).get();
verify_that_all_sstables_are_local(sstdir, 2 * smp::count * smp::count).get();

View File

@@ -63,8 +63,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
auto [ sst, cur_dir ] = copy_sst_to_tmpdir(tmp.path(), env, uncompressed_schema(), fs::path(uncompressed_dir()), gen_generator());
sstring old_path = sst->get_storage().prefix();
touch_directory(format("{}/{}", old_path, sstables::staging_dir)).get();
sst->change_state(sstables::staging_dir).get();
sst->change_state(sstables::normal_dir).get();
sst->change_state(sstables::sstable_state::staging).get();
sst->change_state(sstables::sstable_state::normal).get();
BOOST_REQUIRE(sstable_directory::compare_sstable_storage_prefix(old_path, sst->get_storage().prefix()));
sst->close_files().get();

View File

@@ -90,7 +90,7 @@ public:
shared_sstable make_sstable(schema_ptr schema, sstring dir, sstables::generation_type generation,
sstable::version_types v = sstables::get_highest_sstable_version(), sstable::format_types f = sstable::format_types::big,
size_t buffer_size = default_sstable_buffer_size, gc_clock::time_point now = gc_clock::now()) {
return _impl->mgr.make_sstable(std::move(schema), _impl->storage, dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
return _impl->mgr.make_sstable(std::move(schema), dir, _impl->storage, generation, sstables::sstable_state::normal, v, f, now, default_io_error_handler_gen(), buffer_size);
}
shared_sstable make_sstable(schema_ptr schema, sstring dir, sstable::version_types v = sstables::get_highest_sstable_version()) {

View File

@@ -77,13 +77,13 @@ struct table_for_tests {
sstables::shared_sstable make_sstable() {
auto& table = *_data->cf;
auto& sstables_manager = table.get_sstables_manager();
return sstables_manager.make_sstable(_data->s, _data->storage, _data->cfg.datadir, table.calculate_generation_for_new_table());
return sstables_manager.make_sstable(_data->s, _data->cfg.datadir, _data->storage, table.calculate_generation_for_new_table());
}
sstables::shared_sstable make_sstable(sstables::sstable_version_types version) {
auto& table = *_data->cf;
auto& sstables_manager = table.get_sstables_manager();
return sstables_manager.make_sstable(_data->s, _data->storage, _data->cfg.datadir, table.calculate_generation_for_new_table(), version);
return sstables_manager.make_sstable(_data->s, _data->cfg.datadir, _data->storage, table.calculate_generation_for_new_table(), sstables::sstable_state::normal, version);
}
std::function<sstables::shared_sstable()> make_sst_factory() {

View File

@@ -356,7 +356,7 @@ mutation_opt read_schema_table_mutation(sharded<sstable_manager_service>& sst_ma
sharded_parameter([&schema_factory] { return schema_factory(); }),
sharded_parameter([&] { return std::ref(schema_factory()->get_sharder()); }),
sharded_parameter([] { return make_lw_shared<const data_dictionary::storage_options>(); }),
schema_table_data_path,
schema_table_data_path.native(), sstables::sstable_state::normal,
sharded_parameter([] { return default_io_error_handler_gen(); })).get();
auto stop_sst_dirs = deferred_stop(sst_dirs);

View File

@@ -258,7 +258,7 @@ const std::vector<sstables::shared_sstable> load_sstables(schema_ptr schema, sst
auto ed = sstables::entry_descriptor::make_descriptor(dir_path.c_str(), sst_filename.c_str(), schema->ks_name(), schema->cf_name());
data_dictionary::storage_options local;
auto sst = sst_man.make_sstable(schema, local, dir_path.c_str(), ed.generation, ed.version, ed.format);
auto sst = sst_man.make_sstable(schema, dir_path.c_str(), local, ed.generation, sstables::sstable_state::normal, ed.version, ed.format);
co_await sst->load(schema->get_sharder(), sstables::sstable_open_config{.load_first_and_last_position_metadata = false});
@@ -813,7 +813,7 @@ private:
throw std::runtime_error(fmt::format("cannot create output sstable {}, file already exists", sst_name));
}
data_dictionary::storage_options local;
return _sst_man.make_sstable(_schema, local, _output_dir, generation, version, format);
return _sst_man.make_sstable(_schema, _output_dir, local, generation, sstables::sstable_state::normal, version, format);
}
sstables::sstable_writer_config do_configure_writer(sstring origin) const {
return _sst_man.configure_writer(std::move(origin));
@@ -2433,7 +2433,7 @@ void write_operation(schema_ptr schema, reader_permit permit, const std::vector<
auto writer_cfg = manager.configure_writer("scylla-sstable");
writer_cfg.validation_level = validation_level;
data_dictionary::storage_options local;
auto sst = manager.make_sstable(schema, local, output_dir, generation, version, format);
auto sst = manager.make_sstable(schema, output_dir, local, generation, sstables::sstable_state::normal, version, format);
sst->write_components(std::move(reader), 1, schema, writer_cfg, encoding_stats{}).get();
}