Merge 'sstables: prepare for uuid-based generation_type' from Benny Halevy

Preparing for #10459, this series defines sstables::generation_type::int_t
as `int64_t` at the moment and use that instead of naked `int64_t` variables
so it can be changed in the future to hold e.g. a `std::variant<int64_t, sstables::generation_id>`.

sstables::new_generation was defined to generation new, unique generations.
Currently it is based on incrementing a counter, but it can be extended in the future
to manufacture UUIDs.

The unit tests are cleaned up in this series to minimize their dependency on numeric generations.
Basically, they should be used for loading sstables with hard coded generation numbers stored under `test/resource/sstables`.

For all the rest, the tests should use existing and mechanisms introduced in this series such as generation_factory, sst_factory and smart make_sstable methods in sstable_test_env and table_for_tests to generate new sstables with a unique generation, and use the abstract sst->generation() method to get their generation if needed, without resorting the the actual value it may hold.

Closes #12994

* github.com:scylladb/scylladb:
  everywhere: use sstables::generation_type
  test: sstable_test_env: use make_new_generation
  sstable_directory::components_lister::process: fixup indentation
  sstables: make highest_generation_seen return optional generation
  replica: table: add make_new_generation function
  replica: table: move sstable generation related functions out of line
  test: sstables: use generation_type::int_t
  sstables: generation_type: define int_t
This commit is contained in:
Botond Dénes
2023-03-30 17:05:06 +03:00
27 changed files with 545 additions and 404 deletions

View File

@@ -432,7 +432,12 @@ private:
// Ensures that concurrent updates to sstable set will work correctly
seastar::named_semaphore _sstable_set_mutation_sem = {1, named_semaphore_exception_factory{"sstable set mutation"}};
mutable row_cache _cache; // Cache covers only sstables.
std::optional<int64_t> _sstable_generation = {};
// FIXME: until we use uuid for sstables generation (#10459)
// _sstable_generation keeps track of the highest
// generation number in the table so that newly created sstables
// will use numeric generation numbers larger than this one.
// Initialized when the table is populated via update_sstables_known_generation.
std::optional<sstables::generation_type> _sstable_generation = {};
db::replay_position _highest_rp;
db::replay_position _flush_rp;
@@ -532,6 +537,8 @@ public:
const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& old_sstables);
};
static sstables::generation_type make_new_generation(std::optional<sstables::generation_type> prev = std::nullopt);
private:
using compaction_group_ptr = std::unique_ptr<compaction_group>;
std::vector<std::unique_ptr<compaction_group>> make_compaction_groups();
@@ -564,20 +571,11 @@ private:
future<> update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts);
struct merge_comparator;
// update the sstable generation, making sure that new new sstables don't overwrite this one.
void update_sstables_known_generation(sstables::generation_type generation) {
if (!_sstable_generation) {
_sstable_generation = 1;
}
_sstable_generation = std::max<uint64_t>(*_sstable_generation, sstables::generation_value(generation) / smp::count + 1);
}
// update the sstable generation, making sure (in calculate_generation_for_new_table)
// that new new sstables don't overwrite this one.
void update_sstables_known_generation(std::optional<sstables::generation_type> generation);
sstables::generation_type calculate_generation_for_new_table() {
assert(_sstable_generation);
// FIXME: better way of ensuring we don't attempt to
// overwrite an existing table.
return sstables::generation_from_value((*_sstable_generation)++ * smp::count + this_shard_id());
}
sstables::generation_type calculate_generation_for_new_table();
private:
void rebuild_statistics();

View File

@@ -282,13 +282,6 @@ distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<r
co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop);
}
future<sstables::generation_type>
highest_generation_seen(sharded<sstables::sstable_directory>& directory) {
return directory.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_generation_seen), sstables::generation_from_value(0), [] (sstables::generation_type a, sstables::generation_type b) {
return std::max(a, b);
});
}
future<sstables::sstable::version_types>
highest_version_seen(sharded<sstables::sstable_directory>& dir, sstables::sstable_version_types system_version) {
using version = sstables::sstable_version_types;
@@ -408,9 +401,9 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
co_return new_sstables.size();
}
sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, int64_t generation_value) {
sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstables::generation_type generation_value) {
auto& sstm = table.get_sstables_manager();
return sstm.make_sstable(table.schema(), dir.native(), sstables::generation_from_value(generation_value), sstm.get_highest_supported_format(), sstables::sstable_format_types::big, gc_clock::now(), &error_handler_gen_for_upload_dir);
return sstm.make_sstable(table.schema(), dir.native(), generation_value, sstm.get_highest_supported_format(), sstables::sstable_format_types::big, gc_clock::now(), &error_handler_gen_for_upload_dir);
}
future<>
@@ -442,24 +435,27 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
process_sstable_dir(directory, flags).get();
auto generation = highest_generation_seen(directory).get0();
auto shard_generation_base = sstables::generation_value(generation) / smp::count + 1;
auto shard_generation_base = generation.value_or(replica::table::make_new_generation()).value() / smp::count + 1;
// We still want to do our best to keep the generation numbers shard-friendly.
// Each destination shard will manage its own generation counter.
std::vector<std::atomic<int64_t>> shard_gen(smp::count);
std::vector<std::atomic<sstables::generation_type::int_t>> shard_gen(smp::count);
for (shard_id s = 0; s < smp::count; ++s) {
shard_gen[s].store(shard_generation_base * smp::count + s, std::memory_order_relaxed);
}
reshard(directory, db, ks, cf, [&global_table, upload, &shard_gen] (shard_id shard) mutable {
// we need generation calculated by instance of cf at requested shard
// we need generation calculated by instance of cf at requested shard
auto new_generation_for_shard = [&] (shard_id shard) {
auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed);
return make_sstable(*global_table, upload, gen);
return sstables::generation_type(gen);
};
reshard(directory, db, ks, cf, [&] (shard_id shard) mutable {
return make_sstable(*global_table, upload, new_generation_for_shard(shard));
}, service::get_local_streaming_priority()).get();
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [global_table, upload, &shard_gen] (shard_id shard) {
auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed);
return make_sstable(*global_table, upload, gen);
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [&] (shard_id shard) {
return make_sstable(*global_table, upload, new_generation_for_shard(shard));
}, [] (const sstables::shared_sstable&) { return true; }, service::get_local_streaming_priority()).get();
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
@@ -557,7 +553,7 @@ class table_populator {
fs::path _base_path;
std::unordered_map<sstring, lw_shared_ptr<sharded<sstables::sstable_directory>>> _sstable_directories;
sstables::sstable_version_types _highest_version = sstables::oldest_writable_sstable_format;
sstables::generation_type _highest_generation = sstables::generation_from_value(0);
std::optional<sstables::generation_type> _highest_generation;
public:
table_populator(distributed<replica::database>& db, sstring ks, sstring cf)
@@ -655,7 +651,11 @@ future<> table_populator::start_subdir(sstring subdir) {
auto generation = co_await highest_generation_seen(directory);
_highest_version = std::max(sst_version, _highest_version);
_highest_generation = std::max(generation, _highest_generation);
if (generation) {
_highest_generation = _highest_generation ?
std::max(*generation, *_highest_generation) :
*generation;
}
}
sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstables::generation_type generation, sstables::sstable_version_types v) {

View File

@@ -66,6 +66,33 @@ static seastar::metrics::label keyspace_label("ks");
using namespace std::chrono_literals;
sstables::generation_type table::make_new_generation(std::optional<sstables::generation_type> prev) {
auto prev_value = prev.value_or(sstables::generation_type(0)).value();
auto next_value = prev_value - prev_value % smp::count + smp::count + this_shard_id();
tlogger.trace("new_generation {} -> {}", prev_value, next_value);
return sstables::generation_type(next_value);
}
void table::update_sstables_known_generation(std::optional<sstables::generation_type> generation) {
auto gen = generation.value_or(sstables::generation_type(0)).value();
auto normalized_generation = gen - gen % smp::count + this_shard_id();
if (!_sstable_generation || normalized_generation > _sstable_generation->value()) {
_sstable_generation.emplace(normalized_generation);
tlogger.debug("{}.{} updated highest known generation to {}", schema()->ks_name(), schema()->cf_name(), *_sstable_generation);
}
}
sstables::generation_type table::calculate_generation_for_new_table() {
// FIXME: better way of ensuring we don't attempt to
// overwrite an existing table.
// See https://github.com/scylladb/scylladb/issues/10459
// for uuid-based sstable generation
auto ret = make_new_generation(_sstable_generation);
tlogger.debug("{}.{} new sstable generation {}", schema()->ks_name(), schema()->cf_name(), ret);
_sstable_generation = ret;
return ret;
}
flat_mutation_reader_v2
table::make_sstable_reader(schema_ptr s,
reader_permit permit,

View File

@@ -13,47 +13,67 @@
#include <compare>
#include <limits>
#include <iostream>
#include <boost/range/adaptors.hpp>
#include <seastar/core/sstring.hh>
namespace sstables {
class generation_type {
int64_t _value;
public:
using int_t = int64_t;
private:
int_t _value;
public:
generation_type() = delete;
explicit constexpr generation_type(int64_t value) noexcept: _value(value) {}
constexpr int64_t value() const noexcept { return _value; }
explicit constexpr generation_type(int_t value) noexcept: _value(value) {}
constexpr int_t value() const noexcept { return _value; }
constexpr bool operator==(const generation_type& other) const noexcept { return _value == other._value; }
constexpr std::strong_ordering operator<=>(const generation_type& other) const noexcept { return _value <=> other._value; }
};
constexpr generation_type generation_from_value(int64_t value) {
constexpr generation_type generation_from_value(generation_type::int_t value) {
return generation_type{value};
}
constexpr int64_t generation_value(generation_type generation) {
constexpr generation_type::int_t generation_value(generation_type generation) {
return generation.value();
}
template <std::ranges::range Range, typename Target = std::vector<sstables::generation_type>>
Target generations_from_values(const Range& values) {
return boost::copy_range<Target>(values | boost::adaptors::transformed([] (auto value) {
return generation_type(value);
}));
}
template <typename Target = std::vector<sstables::generation_type>>
Target generations_from_values(std::initializer_list<generation_type::int_t> values) {
return boost::copy_range<Target>(values | boost::adaptors::transformed([] (auto value) {
return generation_type(value);
}));
}
} //namespace sstables
namespace std {
template <>
struct hash<sstables::generation_type> {
size_t operator()(const sstables::generation_type& generation) const noexcept {
return hash<int64_t>{}(generation.value());
return hash<sstables::generation_type::int_t>{}(generation.value());
}
};
// for min_max_tracker
template <>
struct numeric_limits<sstables::generation_type> : public numeric_limits<int64_t> {
struct numeric_limits<sstables::generation_type> : public numeric_limits<sstables::generation_type::int_t> {
static constexpr sstables::generation_type min() noexcept {
return sstables::generation_type{numeric_limits<int64_t>::min()};
return sstables::generation_type{numeric_limits<sstables::generation_type::int_t>::min()};
}
static constexpr sstables::generation_type max() noexcept {
return sstables::generation_type{numeric_limits<int64_t>::max()};
return sstables::generation_type{numeric_limits<sstables::generation_type::int_t>::max()};
}
};
} //namespace std

View File

@@ -173,7 +173,7 @@ sstring sstable_directory::sstable_filename(const sstables::entry_descriptor& de
return sstable::filename(_sstable_dir.native(), _schema->ks_name(), _schema->cf_name(), desc.version, desc.generation, desc.format, component_type::Data);
}
generation_type
std::optional<generation_type>
sstable_directory::highest_generation_seen() const {
return _max_generation_seen;
}
@@ -236,12 +236,23 @@ future<> sstable_directory::components_lister::process(sstable_directory& direct
_state->descriptors.erase(desc.generation);
}
directory._max_generation_seen = boost::accumulate(_state->generations_found | boost::adaptors::map_keys, generation_from_value(0), [] (generation_type a, generation_type b) {
return std::max<generation_type>(a, b);
});
auto msg = format("After {} scanned, {} descriptors found, {} different files found",
location, _state->descriptors.size(), _state->generations_found.size());
dirlog.debug("After {} scanned, seen generation {}. {} descriptors found, {} different files found ",
location, directory._max_generation_seen, _state->descriptors.size(), _state->generations_found.size());
if (!_state->generations_found.empty()) {
// FIXME: for now set _max_generation_seen is any generation were found
// With https://github.com/scylladb/scylladb/issues/10459,
// We should do that only if any _numeric_ generations were found
directory._max_generation_seen = boost::accumulate(_state->generations_found | boost::adaptors::map_keys, sstables::generation_type(0), [] (generation_type a, generation_type b) {
return std::max<generation_type>(a, b);
});
msg = format("{}, highest generation seen: {}", msg, *directory._max_generation_seen);
} else {
msg = format("{}, no numeric generation was seen", msg);
}
dirlog.debug("{}", msg);
// _descriptors is everything with a TOC. So after we remove this, what's left is
// SSTables for which a TOC was not found.
@@ -510,4 +521,20 @@ future<> sstable_directory::replay_pending_delete_log(fs::path pending_delete_lo
}
}
future<std::optional<sstables::generation_type>>
highest_generation_seen(sharded<sstables::sstable_directory>& directory) {
auto highest = co_await directory.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_generation_seen), sstables::generation_type(0), [] (std::optional<sstables::generation_type> a, std::optional<sstables::generation_type> b) {
if (a && b) {
return std::max(*a, *b);
} else if (a) {
return *a;
} else if (b) {
return *b;
} else {
return sstables::generation_type(0);
}
});
co_return highest.value() ? std::make_optional(highest): std::nullopt;
}
}

View File

@@ -110,7 +110,7 @@ private:
io_error_handler_gen _error_handler_gen;
std::unique_ptr<components_lister> _lister;
generation_type _max_generation_seen = generation_from_value(0);
std::optional<generation_type> _max_generation_seen;
sstables::sstable_version_types _max_version_seen = sstables::sstable_version_types::ka;
// SSTables that are unshared and belong to this shard. They are already stored as an
@@ -169,7 +169,7 @@ public:
future<> move_foreign_sstables(sharded<sstable_directory>& source_directory);
// returns what is the highest generation seen in this directory.
generation_type highest_generation_seen() const;
std::optional<generation_type> highest_generation_seen() const;
// returns what is the highest version seen in this directory.
sstables::sstable_version_types highest_version_seen() const;
@@ -233,4 +233,6 @@ public:
static bool compare_sstable_storage_prefix(const sstring& a, const sstring& b) noexcept;
};
future<std::optional<sstables::generation_type>> highest_generation_seen(sharded<sstables::sstable_directory>& directory);
}

View File

@@ -2414,7 +2414,7 @@ static entry_descriptor make_entry_descriptor(sstring sstdir, sstring fname, sst
} else {
throw malformed_sstable_exception(seastar::format("invalid version for file {}. Name doesn't match any known version.", fname));
}
return entry_descriptor(sstdir, ks, cf, generation_from_value(boost::lexical_cast<unsigned long>(generation)), version, format_from_string(format), sstable::component_from_sstring(version, component));
return entry_descriptor(sstdir, ks, cf, generation_from_value(boost::lexical_cast<sstables::generation_type::int_t>(generation)), version, format_from_string(format), sstable::component_from_sstring(version, component));
}
entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname) {

View File

@@ -27,7 +27,7 @@ struct my_consumer {
};
}
static future<> broken_sst(sstring dir, unsigned long generation, schema_ptr s, sstring msg, std::optional<sstring> sst_name,
static future<> broken_sst(sstring dir, sstables::generation_type::int_t generation, schema_ptr s, sstring msg, std::optional<sstring> sst_name,
sstable_version_types version = la) {
return sstables::test_env::do_with_async([=] (sstables::test_env& env) {
try {
@@ -46,7 +46,7 @@ static future<> broken_sst(sstring dir, unsigned long generation, schema_ptr s,
});
}
static future<> broken_sst(sstring dir, unsigned long generation, sstring msg, std::optional<sstring> sst_name = std::nullopt) {
static future<> broken_sst(sstring dir, sstables::generation_type::int_t generation, sstring msg, std::optional<sstring> sst_name = std::nullopt) {
// Using an empty schema for this function, which is only about loading
// a malformed component and checking that it fails.
auto s = make_shared_schema({}, "ks", "cf", {}, {}, {}, {}, utf8_type);

View File

@@ -391,12 +391,12 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
require_exist(file_name, true);
};
auto component_basename = [&ks, &cf] (int64_t gen, component_type ctype) {
return sst::component_basename(ks, cf, sstables::get_highest_sstable_version(), generation_from_value(gen), sst::format_types::big, ctype);
auto component_basename = [&ks, &cf] (sstables::generation_type gen, component_type ctype) {
return sst::component_basename(ks, cf, sstables::get_highest_sstable_version(), gen, sst::format_types::big, ctype);
};
auto gen_filename = [&sst_dir, &ks, &cf] (int64_t gen, component_type ctype) {
return sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(gen), sst::format_types::big, ctype);
auto gen_filename = [&sst_dir, &ks, &cf] (sstables::generation_type gen, component_type ctype) {
return sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), gen, sst::format_types::big, ctype);
};
touch_dir(pending_delete_dir);
@@ -409,33 +409,42 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
const sstring toc_text = "TOC.txt\nData.db\n";
std::optional<sstables::generation_type> prev_gen;
std::vector<sstables::generation_type> gen;
size_t num_gens = 9;
gen.reserve(num_gens);
for (auto i = 0; i < num_gens; i++) {
prev_gen = replica::table::make_new_generation(prev_gen);
gen.emplace_back(*prev_gen);
}
// Regular log file with single entry
write_file(gen_filename(2, component_type::TOC), toc_text);
touch_file(gen_filename(2, component_type::Data));
write_file(gen_filename(gen[2], component_type::TOC), toc_text);
touch_file(gen_filename(gen[2], component_type::Data));
write_file(pending_delete_dir + "/sstables-2-2.log",
component_basename(2, component_type::TOC) + "\n");
component_basename(gen[2], component_type::TOC) + "\n");
// Temporary log file with single entry
write_file(pending_delete_dir + "/sstables-3-3.log.tmp",
component_basename(3, component_type::TOC) + "\n");
component_basename(gen[3], component_type::TOC) + "\n");
// Regular log file with multiple entries
write_file(gen_filename(4, component_type::TOC), toc_text);
touch_file(gen_filename(4, component_type::Data));
write_file(gen_filename(5, component_type::TOC), toc_text);
touch_file(gen_filename(5, component_type::Data));
write_file(gen_filename(gen[4], component_type::TOC), toc_text);
touch_file(gen_filename(gen[4], component_type::Data));
write_file(gen_filename(gen[5], component_type::TOC), toc_text);
touch_file(gen_filename(gen[5], component_type::Data));
write_file(pending_delete_dir + "/sstables-4-5.log",
component_basename(4, component_type::TOC) + "\n" +
component_basename(5, component_type::TOC) + "\n");
component_basename(gen[4], component_type::TOC) + "\n" +
component_basename(gen[5], component_type::TOC) + "\n");
// Regular log file with multiple entries and some deleted sstables
write_file(gen_filename(6, component_type::TemporaryTOC), toc_text);
touch_file(gen_filename(6, component_type::Data));
write_file(gen_filename(7, component_type::TemporaryTOC), toc_text);
write_file(gen_filename(gen[6], component_type::TemporaryTOC), toc_text);
touch_file(gen_filename(gen[6], component_type::Data));
write_file(gen_filename(gen[7], component_type::TemporaryTOC), toc_text);
write_file(pending_delete_dir + "/sstables-6-8.log",
component_basename(6, component_type::TOC) + "\n" +
component_basename(7, component_type::TOC) + "\n" +
component_basename(8, component_type::TOC) + "\n");
component_basename(gen[6], component_type::TOC) + "\n" +
component_basename(gen[7], component_type::TOC) + "\n" +
component_basename(gen[8], component_type::TOC) + "\n");
do_with_cql_env_and_compaction_groups([&] (cql_test_env& e) {
// Empty log file
@@ -445,24 +454,24 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
require_exist(pending_delete_dir + "/sstables-1-1.log.tmp", false);
// Regular log file with single entry
require_exist(gen_filename(2, component_type::TOC), false);
require_exist(gen_filename(2, component_type::Data), false);
require_exist(gen_filename(gen[2], component_type::TOC), false);
require_exist(gen_filename(gen[2], component_type::Data), false);
require_exist(pending_delete_dir + "/sstables-2-2.log", false);
// Temporary log file with single entry
require_exist(pending_delete_dir + "/sstables-3-3.log.tmp", false);
// Regular log file with multiple entries
require_exist(gen_filename(4, component_type::TOC), false);
require_exist(gen_filename(4, component_type::Data), false);
require_exist(gen_filename(5, component_type::TOC), false);
require_exist(gen_filename(5, component_type::Data), false);
require_exist(gen_filename(gen[4], component_type::TOC), false);
require_exist(gen_filename(gen[4], component_type::Data), false);
require_exist(gen_filename(gen[5], component_type::TOC), false);
require_exist(gen_filename(gen[5], component_type::Data), false);
require_exist(pending_delete_dir + "/sstables-4-5.log", false);
// Regular log file with multiple entries and some deleted sstables
require_exist(gen_filename(6, component_type::TemporaryTOC), false);
require_exist(gen_filename(6, component_type::Data), false);
require_exist(gen_filename(7, component_type::TemporaryTOC), false);
require_exist(gen_filename(gen[6], component_type::TemporaryTOC), false);
require_exist(gen_filename(gen[6], component_type::Data), false);
require_exist(gen_filename(gen[7], component_type::TemporaryTOC), false);
require_exist(pending_delete_dir + "/sstables-6-8.log", false);
}, db_cfg_ptr).get();
}

View File

@@ -17,6 +17,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/util/closeable.hh>
#include "sstables/generation_type.hh"
#include "test/lib/scylla_test_case.hh"
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/mutation_assertions.hh"
@@ -3825,14 +3826,14 @@ static future<> do_test_clustering_order_merger_sstable_set(bool reversed) {
auto pr = dht::partition_range::make_singular(dht::ring_position(g._pk));
auto make_tested = [&env, query_schema, pk = g._pk, &pr, &query_slice, reversed]
(const time_series_sstable_set& sst_set,
const std::unordered_set<int64_t>& included_gens, streamed_mutation::forwarding fwd) {
const std::unordered_set<sstables::generation_type>& included_gens, streamed_mutation::forwarding fwd) {
auto permit = env.make_reader_permit();
auto q = sst_set.make_position_reader_queue(
[query_schema, &pr, &query_slice, fwd, permit] (sstable& sst) {
return sst.make_reader(query_schema, permit, pr,
query_slice, seastar::default_priority_class(), nullptr, fwd);
},
[included_gens] (const sstable& sst) { return included_gens.contains(generation_value(sst.generation())); },
[included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); },
pk.key(), query_schema, permit, fwd, reversed);
return make_clustering_combined_reader(query_schema, permit, fwd, std::move(q));
};
@@ -3855,7 +3856,7 @@ static future<> do_test_clustering_order_merger_sstable_set(bool reversed) {
time_series_sstable_set sst_set(table_schema);
mutation merged(table_schema, g._pk);
std::unordered_set<int64_t> included_gens;
std::unordered_set<sstables::generation_type> included_gens;
auto sst_factory = env.make_sst_factory(table_schema);
for (auto& mb: scenario.readers_data) {
sstables::shared_sstable sst;
@@ -3874,7 +3875,7 @@ static future<> do_test_clustering_order_merger_sstable_set(bool reversed) {
}
if (dist(engine)) {
included_gens.insert(sst->generation().value());
included_gens.insert(sst->generation());
if (mb.m) {
merged += *mb.m;
}

View File

@@ -1333,7 +1333,7 @@ memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_
return seastar::async([&] {
while (num_sstables != target_num_sstables) {
++num_sstables;
auto sst = sst_man.make_sstable(s, sstables_dir.path().string(), sstables::generation_type{num_sstables});
auto sst = tbl.make_sstable();
auto writer_cfg = sst_man.configure_writer("test");
sst->write_components(
make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), mut, s->full_slice()),

View File

@@ -8,6 +8,7 @@
#include <boost/test/unit_test.hpp>
#include "sstables/generation_type.hh"
#include "test/lib/scylla_test_case.hh"
#include <seastar/testing/thread_test_case.hh>
#include <seastar/core/thread.hh>
@@ -23,9 +24,7 @@ using namespace std::chrono_literals;
SEASTAR_TEST_CASE(test_schema_changes) {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
int gen = 1;
std::map<std::tuple<sstables::sstable::version_types, schema_ptr>, std::tuple<shared_sstable, int>> cache;
std::map<std::tuple<sstables::sstable::version_types, schema_ptr>, shared_sstable> cache;
for_each_schema_change([&] (schema_ptr base, const std::vector<mutation>& base_mutations,
schema_ptr changed, const std::vector<mutation>& changed_mutations) {
for (auto version : writable_sstable_versions) {
@@ -39,20 +38,15 @@ SEASTAR_TEST_CASE(test_schema_changes) {
mt->apply(m);
}
created_with_base_schema = make_sstable_easy(env, mt, env.manager().configure_writer(), gen, version, base_mutations.size());
created_with_base_schema = make_sstable_containing(env.make_sstable(base), mt);
created_with_changed_schema = env.make_sstable(changed, gen, version);
created_with_changed_schema->load().get();
cache.emplace(std::tuple { version, base }, std::tuple { created_with_base_schema, gen });
gen++;
cache.emplace(std::tuple { version, base }, created_with_base_schema);
} else {
created_with_base_schema = std::get<shared_sstable>(it->second);
created_with_changed_schema = env.make_sstable(changed, std::get<int>(it->second), version);
created_with_changed_schema->load().get();
created_with_base_schema = it->second;
}
created_with_changed_schema = env.reusable_sst(changed, created_with_base_schema).get();
const auto pr = dht::partition_range::make_open_ended_both_sides();
auto mr = assert_that(created_with_base_schema->as_mutation_source()

View File

@@ -52,11 +52,11 @@ class sstable_assertions final {
test_env& _env;
shared_sstable _sst;
public:
sstable_assertions(test_env& env, schema_ptr schema, const sstring& path, sstable_version_types version = sstable_version_types::mc, int generation = 1)
sstable_assertions(test_env& env, schema_ptr schema, const sstring& path, sstable_version_types version = sstable_version_types::mc, sstables::generation_type::int_t generation = 1)
: _env(env)
, _sst(_env.make_sstable(std::move(schema),
path,
generation,
sstables::generation_type(generation),
version,
sstable_format_types::big,
1))
@@ -3005,11 +3005,11 @@ SEASTAR_TEST_CASE(test_uncompressed_collections_read) {
});
}
static sstables::shared_sstable open_sstable(test_env& env, schema_ptr schema, sstring dir, unsigned long generation) {
static sstables::shared_sstable open_sstable(test_env& env, schema_ptr schema, sstring dir, sstables::generation_type generation) {
return env.reusable_sst(std::move(schema), dir, generation, sstables::sstable::version_types::mc).get0();
}
static std::vector<sstables::shared_sstable> open_sstables(test_env& env, schema_ptr s, sstring dir, std::vector<unsigned long> generations) {
static std::vector<sstables::shared_sstable> open_sstables(test_env& env, schema_ptr s, sstring dir, std::vector<sstables::generation_type> generations) {
std::vector<sstables::shared_sstable> result;
for(auto generation: generations) {
result.push_back(open_sstable(env, s, dir, generation));
@@ -3019,7 +3019,8 @@ static std::vector<sstables::shared_sstable> open_sstables(test_env& env, schema
// Must be called in a seastar thread.
static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_ptr s,
sstring table_name, std::vector<unsigned long> generations) {
sstring table_name, std::vector<sstables::generation_type::int_t> gen_values) {
auto generations = generations_from_values(gen_values);
auto cm = make_lw_shared<compaction_manager_for_testing>(false);
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto tracker = make_lw_shared<cache_tracker>();
@@ -3028,12 +3029,11 @@ static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_pt
lw_shared_ptr<replica::memtable> mt = make_lw_shared<replica::memtable>(s);
auto sstables = open_sstables(env, s, format("test/resource/sstables/3.x/uncompressed/{}", table_name), generations);
auto new_generation = generations.back() + 1;
sstables::shared_sstable compacted_sst;
auto desc = sstables::compaction_descriptor(std::move(sstables), default_priority_class());
desc.creator = [&] (shard_id dummy) {
compacted_sst = env.make_sstable(s, new_generation);
compacted_sst = env.make_sstable(s);
return compacted_sst;
};
desc.replacer = replacer_fn_no_op();
@@ -4585,7 +4585,7 @@ static std::unique_ptr<index_reader> get_index_reader(shared_sstable sst, reader
tracing::trace_state_ptr(), use_caching::yes);
}
shared_sstable make_test_sstable(test_env& env, schema_ptr schema, const sstring& table_name, int64_t gen = 1) {
shared_sstable make_test_sstable(test_env& env, schema_ptr schema, const sstring& table_name, sstables::generation_type::int_t gen = 1) {
return env.reusable_sst(schema, get_read_index_test_path(table_name), gen).get0();
}
@@ -5118,7 +5118,7 @@ SEASTAR_TEST_CASE(test_sstable_reader_on_unknown_column) {
auto _ = env.tempdir().make_sweeper();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = index_block_size;
auto sst = make_sstable_easy(env, mt, cfg, 1, version);
auto sst = make_sstable_easy(env, mt, cfg, version);
BOOST_REQUIRE_EXCEPTION(
assert_that(sst->make_reader(read_schema, env.make_reader_permit(), query::full_partition_range, read_schema->full_slice()))

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <iterator>
#include <seastar/core/sstring.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/align.hh>
@@ -93,10 +94,10 @@ atomic_cell make_atomic_cell(data_type dt, bytes_view value, uint32_t ttl = 0, u
// open_sstables() opens several generations of the same sstable, returning,
// after all the tables have been open, their vector.
static future<std::vector<sstables::shared_sstable>> open_sstables(test_env& env, schema_ptr s, sstring dir, std::vector<unsigned long> generations) {
static future<std::vector<sstables::shared_sstable>> open_sstables(test_env& env, schema_ptr s, sstring dir, std::vector<sstables::generation_type> generations) {
return do_with(std::vector<sstables::shared_sstable>(),
[&env, dir = std::move(dir), generations = std::move(generations), s] (auto& ret) mutable {
return parallel_for_each(generations, [&env, &ret, &dir, s] (unsigned long generation) {
return parallel_for_each(generations, [&env, &ret, &dir, s] (auto generation) {
return env.reusable_sst(s, dir, generation).then([&ret] (sstables::shared_sstable sst) {
ret.push_back(std::move(sst));
});
@@ -106,6 +107,11 @@ static future<std::vector<sstables::shared_sstable>> open_sstables(test_env& env
});
}
static future<std::vector<sstables::shared_sstable>> open_sstables(test_env& env, schema_ptr s, sstring dir, std::vector<sstables::generation_type::int_t> gen_values) {
auto generations = generations_from_values(gen_values);
return open_sstables(env, std::move(s), std::move(dir), std::move(generations));
}
// mutation_reader for sstable keeping all the required objects alive.
static flat_mutation_reader_v2 sstable_reader(shared_sstable sst, schema_ptr s, reader_permit permit) {
return sst->as_mutation_source().make_reader_v2(s, std::move(permit), query::full_partition_range, s->full_slice());
@@ -211,7 +217,7 @@ SEASTAR_TEST_CASE(compact) {
auto sstables = open_sstables(env, s, "test/resource/sstables/compaction", {1,2,3}).get();
std::vector<shared_sstable> new_sstables;
auto new_sstable = [&] {
auto sst = env.make_sstable(s);
auto sst = cf.make_sstable();
new_sstables.push_back(sst);
return sst;
};
@@ -224,7 +230,7 @@ SEASTAR_TEST_CASE(compact) {
// john | 20 | deleted
// nadav - deleted partition
BOOST_REQUIRE_EQUAL(new_sstables.size(), 1);
auto sst = env.reusable_sst(s, new_sstables[0]->generation().value()).get();
auto sst = env.reusable_sst(s, new_sstables[0]).get();
auto reader = sstable_reader(sst, s, env.make_reader_permit());
auto close_reader = deferred_close(reader);
auto verify_mutation = [&] (std::function<void(mutation_opt)> verify) {
@@ -296,39 +302,37 @@ static std::vector<sstables::shared_sstable> get_candidates_for_leveled_strategy
return candidates;
}
struct compact_sstables_result {
std::vector<sstables::shared_sstable> input_sstables;
std::vector<sstables::shared_sstable> output_sstables;
};
// Return vector of sstables generated by compaction. Only relevant for leveled one.
static future<std::vector<unsigned long>> compact_sstables(test_env& env, std::vector<unsigned long> generations_to_compact,
unsigned long new_generation, bool create_sstables, uint64_t min_sstable_size, compaction_strategy_type strategy) {
static future<compact_sstables_result> compact_sstables(test_env& env, std::vector<sstables::shared_sstable> sstables_to_compact, size_t create_sstables,
uint64_t min_sstable_size, compaction_strategy_type strategy) {
BOOST_REQUIRE(smp::count == 1);
return seastar::async(
[&env, generations = std::move(generations_to_compact), new_generation, create_sstables, min_sstable_size, strategy] () mutable {
[&env, sstables = std::move(sstables_to_compact), create_sstables, min_sstable_size, strategy] () mutable {
schema_builder builder(make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type));
builder.set_compressor_params(compression_parameters::no_compression());
builder.set_min_compaction_threshold(4);
auto s = builder.build(schema_builder::compact_storage::no);
auto sst_gen = [&, generation = make_lw_shared<unsigned long>(create_sstables ? generations[0] : new_generation)] () mutable {
return env.make_sstable(s, (*generation)++);
};
auto sst_gen = env.make_sst_factory(s);
auto cf = env.make_table_for_tests(s);
auto stop_cf = deferred_stop(cf);
std::vector<sstables::shared_sstable> sstables;
std::vector<unsigned long> created;
std::vector<sstables::shared_sstable> created;
if (!create_sstables) {
auto opened_sstables = open_sstables(env, s, env.tempdir().path().native(), generations).get();
for (auto& sst : opened_sstables) {
sstables.push_back(sst);
}
} else {
for (auto generation : generations) {
if (create_sstables) {
for (auto i = 0; i < create_sstables; i++) {
auto mt = make_lw_shared<replica::memtable>(s);
const column_definition& r1_col = *s->get_column_definition("r1");
sstring k = "key" + to_sstring(generation);
auto sst = sst_gen();
sstring k = "key" + to_sstring(sst->generation());
auto key = partition_key::from_exploded(*s, {to_bytes(k)});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});
@@ -336,18 +340,16 @@ static future<std::vector<unsigned long>> compact_sstables(test_env& env, std::v
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(utf8_type, bytes(min_sstable_size, 'a')));
mt->apply(std::move(m));
auto sst = make_sstable_containing(sst_gen, mt);
sst = make_sstable_containing(sst, mt);
sstables.push_back(sst);
}
}
auto new_sstable = [&] {
auto sst = sst_gen();
created.push_back(sst->generation().value());
created.push_back(sst);
return sst;
};
// We must have opened at least all original candidates.
BOOST_REQUIRE(generations.size() == sstables.size());
if (strategy == compaction_strategy_type::size_tiered) {
// Calling function that will return a list of sstables to compact based on size-tiered strategy.
@@ -381,51 +383,61 @@ static future<std::vector<unsigned long>> compact_sstables(test_env& env, std::v
throw std::runtime_error("unexpected strategy");
}
return created;
return compact_sstables_result{std::move(sstables), std::move(created)};
});
}
static future<> compact_sstables(test_env& env, std::vector<unsigned long> generations_to_compact, unsigned long new_generation, bool create_sstables = true) {
static future<compact_sstables_result> create_and_compact_sstables(test_env& env, size_t create_sstables) {
uint64_t min_sstable_size = 50;
return compact_sstables(env, std::move(generations_to_compact), new_generation, create_sstables, min_sstable_size,
compaction_strategy_type::size_tiered).then([new_generation] (auto ret) {
// size tiered compaction will output at most one sstable, let's assert that.
BOOST_REQUIRE(ret.size() == 1);
BOOST_REQUIRE(ret[0] == new_generation);
return make_ready_future<>();
});
auto res = co_await compact_sstables(env, {}, create_sstables, min_sstable_size, compaction_strategy_type::size_tiered);
// size tiered compaction will output at most one sstable, let's assert that.
BOOST_REQUIRE(res.output_sstables.size() == 1);
co_return res;
}
static future<> check_compacted_sstables(test_env& env, unsigned long generation, std::vector<unsigned long> compacted_generations) {
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type);
static future<compact_sstables_result> compact_sstables(test_env& env, std::vector<sstables::shared_sstable> sstables_to_compact) {
uint64_t min_sstable_size = 50;
auto res = co_await compact_sstables(env, std::move(sstables_to_compact), 0, min_sstable_size, compaction_strategy_type::size_tiered);
// size tiered compaction will output at most one sstable, let's assert that.
BOOST_REQUIRE(res.output_sstables.size() == 1);
co_return res;
}
auto generations = make_lw_shared<std::vector<unsigned long>>(std::move(compacted_generations));
static future<> check_compacted_sstables(test_env& env, compact_sstables_result res) {
return seastar::async([&env, res = std::move(res)] {
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type);
return env.reusable_sst(s, generation).then([&env, s, generations] (shared_sstable sst) {
BOOST_REQUIRE_EQUAL(res.output_sstables.size(), 1);
auto sst = env.reusable_sst(s, res.output_sstables[0]).get();
auto reader = sstable_reader(sst, s, env.make_reader_permit()); // reader holds sst and s alive.
auto keys = make_lw_shared<std::vector<partition_key>>();
auto close_reader = deferred_close(reader);
std::vector<partition_key> keys;
return with_closeable(std::move(reader), [generations, s, keys] (flat_mutation_reader_v2& reader) {
return do_for_each(*generations, [&reader, keys] (unsigned long generation) mutable {
return read_mutation_from_flat_mutation_reader(reader).then([keys] (mutation_opt m) {
BOOST_REQUIRE(m);
keys->push_back(m->key());
});
}).then([s, keys, generations] {
// keys from compacted sstable aren't ordered lexographically,
// thus we must read all keys into a vector, sort the vector
// lexographically, then proceed with the comparison.
std::sort(keys->begin(), keys->end(), partition_key::less_compare(*s));
BOOST_REQUIRE(keys->size() == generations->size());
auto i = 0;
for (auto& k : *keys) {
sstring original_k = "key" + to_sstring((*generations)[i++]);
BOOST_REQUIRE(k.equal(*s, partition_key::from_singular(*s, data_value(original_k))));
while (auto m = read_mutation_from_flat_mutation_reader(reader).get()) {
keys.push_back(m->key());
}
// keys from compacted sstable aren't ordered lexographically,
// thus we must read all keys into a vector, sort the vector
// lexographically, then proceed with the comparison.
std::sort(keys.begin(), keys.end(), partition_key::less_compare(*s));
BOOST_REQUIRE_EQUAL(keys.size(), res.input_sstables.size());
auto generations = boost::copy_range<std::vector<sstables::generation_type>>(res.input_sstables |
boost::adaptors::transformed([] (const sstables::shared_sstable& sst) { return sst->generation(); }));
for (auto& k : keys) {
bool found = false;
for (auto it = generations.begin(); it != generations.end(); ++it) {
sstring original_k = "key" + to_sstring(*it);
found = k.equal(*s, partition_key::from_singular(*s, data_value(original_k)));
if (found) {
generations.erase(it);
break;
}
return make_ready_future<>();
});
});
}
BOOST_REQUIRE(found);
}
});
}
@@ -441,21 +453,30 @@ SEASTAR_TEST_CASE(compact_02) {
// strategy algorithm that selects candidates for compaction.
return test_env::do_with_async([] (test_env& env) {
// Compact 4 sstables into 1 using size-tiered strategy to select sstables.
// E.g.: generations 18, 19, 20 and 21 will be compacted into generation 22.
compact_sstables(env, { 18, 19, 20, 21 }, 22).get();
// Check that generation 22 contains all keys of generations 18, 19, 20 and 21.
check_compacted_sstables(env, 22, { 18, 19, 20, 21 }).get();
compact_sstables(env, { 23, 24, 25, 26 }, 27).get();
check_compacted_sstables(env, 27, { 23, 24, 25, 26 }).get();
compact_sstables(env, { 28, 29, 30, 31 }, 32).get();
check_compacted_sstables(env, 32, { 28, 29, 30, 31 }).get();
compact_sstables(env, { 33, 34, 35, 36 }, 37).get();
check_compacted_sstables(env, 37, { 33, 34, 35, 36 }).get();
std::vector<sstables::shared_sstable> all_input_sstables;
std::vector<sstables::shared_sstable> compacted;
auto compact_and_verify = [&] (size_t count) mutable{
// Compact `count` sstables into 1 using size-tiered strategy to select sstables.
// E.g.: generations 18, 19, 20 and 21 will be compacted into generation 22.
auto res = create_and_compact_sstables(env, count).get();
std::copy(res.input_sstables.begin(), res.input_sstables.end(), std::back_inserter(all_input_sstables));
compacted.emplace_back(res.output_sstables[0]);
// Check that generation 22 contains all keys of generations 18, 19, 20 and 21.
check_compacted_sstables(env, std::move(res)).get();
};
static constexpr size_t num_rounds = 4;
static constexpr size_t sstables_in_round = 4;
for (auto i = 0; i < num_rounds; ++i) {
compact_and_verify(sstables_in_round);
}
// In this step, we compact 4 compacted sstables.
compact_sstables(env, { 22, 27, 32, 37 }, 38, false).get();
auto res = compact_sstables(env, std::move(compacted)).get();
res.input_sstables = std::move(all_input_sstables);
// Check that the compacted sstable contains all keys.
check_compacted_sstables(env, 38, { 18, 19, 20, 21, 23, 24, 25, 26, 28, 29, 30, 31, 33, 34, 35, 36 }).get();
check_compacted_sstables(env, std::move(res)).get();
});
}
@@ -714,15 +735,14 @@ SEASTAR_TEST_CASE(leveled_04) {
SEASTAR_TEST_CASE(leveled_05) {
// NOTE: Generations from 48 to 51 are used here.
return test_env::do_with_async([] (test_env& env) {
// Check compaction code with leveled strategy. In this test, two sstables of level 0 will be created.
auto generations = compact_sstables(env, { 48, 49 }, 50, true, 1024*1024, compaction_strategy_type::leveled).get();
BOOST_REQUIRE(generations.size() == 2);
BOOST_REQUIRE(generations[0] == 50);
BOOST_REQUIRE(generations[1] == 51);
static constexpr size_t sstables_in_round = 2;
for (auto gen : generations) {
auto fname = sstable::filename(env.tempdir().path().native(), "ks", "cf", sstables::get_highest_sstable_version(), generation_from_value(gen), big, component_type::Data);
BOOST_REQUIRE(file_size(fname).get0() >= 1024*1024);
// Check compaction code with leveled strategy. In this test, two sstables of level 0 will be created.
auto res = compact_sstables(env, {}, sstables_in_round, 1024*1024, compaction_strategy_type::leveled).get();
BOOST_REQUIRE_EQUAL(res.input_sstables.size(), sstables_in_round);
for (const auto& sst : res.output_sstables) {
BOOST_REQUIRE(sst->data_size() >= 1024*1024);
}
});
}
@@ -1217,7 +1237,7 @@ SEASTAR_TEST_CASE(test_sstable_max_local_deletion_time_2) {
auto sst2 = get_usable_sst(mt);
BOOST_REQUIRE(now.time_since_epoch().count() == sst2->get_stats_metadata().max_local_deletion_time);
auto creator = [&] { return cf.make_sstable(version); };
auto creator = sst_gen;
auto info = compact_sstables(sstables::compaction_descriptor({sst1, sst2}, default_priority_class()), cf, creator).get0();
BOOST_REQUIRE(info.new_sstables.size() == 1);
BOOST_REQUIRE(((now + gc_clock::duration(100)).time_since_epoch().count()) ==
@@ -1702,7 +1722,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test_2) {
auto sst2 = make_sstable_containing(sst_gen, mt);
check_min_max_column_names(sst2, {"9ck101"}, {"9ck298"});
auto creator = [&] { return cf.make_sstable(version); };
auto creator = sst_gen;
auto info = compact_sstables(sstables::compaction_descriptor({sst, sst2}, default_priority_class()), cf, creator).get0();
BOOST_REQUIRE(info.new_sstables.size() == 1);
check_min_max_column_names(info.new_sstables.front(), {"0ck100"}, {"9ck298"});
@@ -1720,7 +1740,7 @@ SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) {
int max_threshold = cf->schema()->max_compaction_threshold();
candidates.reserve(max_threshold+1);
for (auto i = 0; i < (max_threshold+1); i++) { // (max_threshold+1) sstables of similar size
auto sst = env.make_sstable(cf.schema());
auto sst = cf.make_sstable();
sstables::test(sst).set_data_file_size(1);
candidates.push_back(std::move(sst));
}
@@ -2871,14 +2891,14 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
BOOST_REQUIRE(old_sstables.size() == 1);
BOOST_REQUIRE(new_sstables.size() == 1);
// check that sstable replacement follows token order
BOOST_REQUIRE(*expected_sst == generation_value(old_sstables.front()->generation()));
BOOST_REQUIRE(*expected_sst == old_sstables.front()->generation());
expected_sst++;
// check that previously released sstables were already closed
if (generation_value(old_sstables.front()->generation()) % 4 == 0) {
// Due to performance reasons, sstables are not released immediately, but in batches.
// At the time of writing, mutation_reader_merger releases it's sstable references
// in batches of 4. That's why we only perform this check every 4th sstable.
BOOST_REQUIRE(*closed_sstables_tracker == generation_value(old_sstables.front()->generation()));
BOOST_REQUIRE(*closed_sstables_tracker == old_sstables.front()->generation());
}
do_replace(old_sstables, new_sstables);
@@ -2906,8 +2926,8 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
});
BOOST_REQUIRE(desc.sstables.size() == expected_input);
auto sstable_run = boost::copy_range<std::set<int64_t>>(desc.sstables
| boost::adaptors::transformed([] (auto& sst) { return generation_value(sst->generation()); }));
auto sstable_run = boost::copy_range<std::set<sstables::generation_type>>(desc.sstables
| boost::adaptors::transformed([] (auto& sst) { return sst->generation(); }));
auto expected_sst = sstable_run.begin();
auto closed_sstables_tracker = sstable_run.begin();
auto replacer = [&] (sstables::compaction_completion_desc desc) {
@@ -3064,15 +3084,15 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
auto partial_sstable_run_sst = make_sstable_easy(env, make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), { std::move(mut) }), sst_cfg);
column_family_test(cf).add_sstable(partial_sstable_run_sst).get();
column_family_test::update_sstables_known_generation(*cf, generation_value(partial_sstable_run_sst->generation()));
column_family_test::update_sstables_known_generation(*cf, partial_sstable_run_sst->generation());
auto generation_exists = [&cf] (int64_t generation) {
auto generation_exists = [&cf] (sstables::generation_type generation) {
auto sstables = cf->get_sstables();
auto entry = boost::range::find_if(*sstables, [generation] (shared_sstable sst) { return generation == generation_value(sst->generation()); });
auto entry = boost::range::find_if(*sstables, [generation] (shared_sstable sst) { return generation == sst->generation(); });
return entry != sstables->end();
};
BOOST_REQUIRE(generation_exists(generation_value(partial_sstable_run_sst->generation())));
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
// register partial sstable run
auto cm_test = compaction_manager_test(cf.get_compaction_manager());
@@ -3081,7 +3101,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
}).get();
// make sure partial sstable run has none of its fragments compacted.
BOOST_REQUIRE(generation_exists(generation_value(partial_sstable_run_sst->generation())));
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
});
}
@@ -3667,7 +3687,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) {
{
std::vector <shared_sstable> sstables;
for (auto i = 0; i < 256; i++) {
auto sst = env.make_sstable(s, "", i + 1);
auto sst = env.make_sstable(s);
auto key = keys[i].key();
sstables::test(sst).set_values_for_leveled_strategy(1 /* size */, 0 /* level */, 0 /* max ts */, key, key);
sstables.push_back(std::move(sst));
@@ -3679,7 +3699,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) {
{
std::vector <shared_sstable> sstables;
for (auto i = 0; i < 256; i++) {
auto sst = env.make_sstable(s, "", i + 1);
auto sst = env.make_sstable(s);
auto key = keys[0].key();
sstables::test(sst).set_values_for_leveled_strategy(1 /* size */, 0 /* level */, 0 /* max ts */, key, key);
sstables.push_back(std::move(sst));
@@ -3689,7 +3709,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) {
}
// single sstable
{
auto sst = env.make_sstable(s, "", 1);
auto sst = env.make_sstable(s);
auto key = keys[0].key();
sstables::test(sst).set_values_for_leveled_strategy(1 /* size */, 0 /* level */, 0 /* max ts */, key, key);
@@ -3969,7 +3989,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
mutations_for_big_files.push_back(make_row(i, std::chrono::hours(1)));
}
std::unordered_set<int64_t> generations_for_small_files;
std::unordered_set<sstables::generation_type> generations_for_small_files;
std::vector<sstables::shared_sstable> sstables;
sstables.reserve(64);
@@ -3981,7 +4001,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
//
if (i % 2 == 0) {
sst = make_sstable_containing(sst_gen, mutations_for_small_files);
generations_for_small_files.insert(generation_value(sst->generation()));
generations_for_small_files.insert(sst->generation());
} else {
sst = make_sstable_containing(sst_gen, mutations_for_big_files);
}
@@ -3993,7 +4013,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
BOOST_REQUIRE_EQUAL(ret.sstables.size(), uint64_t(s->max_compaction_threshold()));
// fail if any file doesn't belong to set of small files
bool has_big_sized_files = boost::algorithm::any_of(ret.sstables, [&] (const sstables::shared_sstable& sst) {
return !generations_for_small_files.contains(generation_value(sst->generation()));
return !generations_for_small_files.contains(sst->generation());
});
BOOST_REQUIRE(!has_big_sized_files);
};
@@ -4626,9 +4646,7 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
auto cf = env.make_table_for_tests(s);
auto close_cf = deferred_stop(cf);
auto sst_gen = [&]() mutable {
return cf.make_sstable();
};
auto sst_gen = cf.make_sst_factory();
using namespace std::chrono;
auto now = gc_clock::now().time_since_epoch() + duration_cast<microseconds>(seconds(tests::random::get_int(0, 3600*24)));

View File

@@ -477,9 +477,9 @@ SEASTAR_TEST_CASE(test_counter_write) {
});
}
static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen,
static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema,
const partition_key& first_key, const partition_key& last_key, uint32_t level = 0) {
auto sst = env.make_sstable(schema, "", gen);
auto sst = env.make_sstable(schema);
sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, first_key, last_key);
return sst;
}
@@ -1761,7 +1761,7 @@ SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) {
for (auto&& mf : fragments) {
mut.apply(mf);
}
auto sst = make_sstable_easy(env, make_flat_mutation_reader_from_mutations_v2(table.schema(), std::move(permit), { std::move(mut) }), cfg, 1, version);
auto sst = make_sstable_easy(env, make_flat_mutation_reader_from_mutations_v2(table.schema(), std::move(permit), { std::move(mut) }), cfg, version);
auto ms = as_mutation_source(sst);
for (uint32_t i = 3; i < seq; i++) {
@@ -1810,7 +1810,7 @@ SEASTAR_TEST_CASE(test_skipping_using_index) {
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1; // So that every fragment is indexed
cfg.promoted_index_auto_scale_threshold = 0; // disable auto-scaling
auto sst = make_sstable_easy(env, make_flat_mutation_reader_from_mutations_v2(table.schema(), env.make_reader_permit(), partitions), cfg, 1, version);
auto sst = make_sstable_easy(env, make_flat_mutation_reader_from_mutations_v2(table.schema(), env.make_reader_permit(), partitions), cfg, version);
auto ms = as_mutation_source(sst);
auto rd = ms.make_reader_v2(table.schema(),
@@ -1936,51 +1936,69 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
const auto decorated_keys = tests::generate_partition_keys(8, s);
auto check = [] (sstable_set::incremental_selector& selector, const dht::decorated_key& key, std::unordered_set<int64_t> expected_gens) {
auto new_sstable = [&] (sstable_set& set, size_t k0, size_t k1, uint32_t level) {
auto key0 = decorated_keys[k0];
auto tok0 = key0.token();
auto key1 = decorated_keys[k1];
auto tok1 = key1.token();
testlog.debug("creating sstable with k[{}] token={} k[{}] token={} level={}", k0, tok0, k1, tok1, level);
auto sst = sstable_for_overlapping_test(env, s, key0.key(), key1.key(), level);
set.insert(sst);
return sst;
};
auto check = [&] (sstable_set::incremental_selector& selector, size_t k, std::unordered_set<shared_sstable> expected_ssts) {
const dht::decorated_key& key = decorated_keys[k];
auto sstables = selector.select(key).sstables;
BOOST_REQUIRE_EQUAL(sstables.size(), expected_gens.size());
testlog.debug("checking sstables for key[{}] token={} found={} expected={}", k, decorated_keys[k].token(), sstables.size(), expected_ssts.size());
BOOST_REQUIRE_EQUAL(sstables.size(), expected_ssts.size());
for (auto& sst : sstables) {
BOOST_REQUIRE(expected_gens.contains(generation_value(sst->generation())));
BOOST_REQUIRE(expected_ssts.contains(sst));
expected_ssts.erase(sst);
}
BOOST_REQUIRE(expected_ssts.empty());
};
{
sstable_set set = cs.make_sstable_set(s);
set.insert(sstable_for_overlapping_test(env, s, 1, decorated_keys[0].key(), decorated_keys[1].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 2, decorated_keys[0].key(), decorated_keys[1].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 3, decorated_keys[3].key(), decorated_keys[4].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 4, decorated_keys[4].key(), decorated_keys[4].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 5, decorated_keys[4].key(), decorated_keys[5].key(), 1));
std::vector<shared_sstable> ssts;
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 3, 4, 1));
ssts.push_back(new_sstable(set, 4, 4, 1));
ssts.push_back(new_sstable(set, 4, 5, 1));
sstable_set::incremental_selector sel = set.make_incremental_selector();
check(sel, decorated_keys[0], {1, 2});
check(sel, decorated_keys[1], {1, 2});
check(sel, decorated_keys[2], {});
check(sel, decorated_keys[3], {3});
check(sel, decorated_keys[4], {3, 4, 5});
check(sel, decorated_keys[5], {5});
check(sel, decorated_keys[6], {});
check(sel, decorated_keys[7], {});
check(sel, 0, std::unordered_set<shared_sstable>{ssts[0], ssts[1]});
check(sel, 1, std::unordered_set<shared_sstable>{ssts[0], ssts[1]});
check(sel, 2, std::unordered_set<shared_sstable>{});
check(sel, 3, std::unordered_set<shared_sstable>{ssts[2]});
check(sel, 4, std::unordered_set<shared_sstable>{ssts[2], ssts[3], ssts[4]});
check(sel, 5, std::unordered_set<shared_sstable>{ssts[4]});
check(sel, 6, std::unordered_set<shared_sstable>{});
check(sel, 7, std::unordered_set<shared_sstable>{});
}
{
sstable_set set = cs.make_sstable_set(s);
set.insert(sstable_for_overlapping_test(env, s, 0, decorated_keys[0].key(), decorated_keys[1].key(), 0));
set.insert(sstable_for_overlapping_test(env, s, 1, decorated_keys[0].key(), decorated_keys[1].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 2, decorated_keys[0].key(), decorated_keys[1].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 3, decorated_keys[3].key(), decorated_keys[4].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 4, decorated_keys[4].key(), decorated_keys[4].key(), 1));
set.insert(sstable_for_overlapping_test(env, s, 5, decorated_keys[4].key(), decorated_keys[5].key(), 1));
std::unordered_map<dht::token, std::unordered_set<shared_sstable>> map;
std::vector<shared_sstable> ssts;
ssts.push_back(new_sstable(set, 0, 1, 0));
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 3, 4, 1));
ssts.push_back(new_sstable(set, 4, 4, 1));
ssts.push_back(new_sstable(set, 4, 5, 1));
sstable_set::incremental_selector sel = set.make_incremental_selector();
check(sel, decorated_keys[0], {0, 1, 2});
check(sel, decorated_keys[1], {0, 1, 2});
check(sel, decorated_keys[2], {0});
check(sel, decorated_keys[3], {0, 3});
check(sel, decorated_keys[4], {0, 3, 4, 5});
check(sel, decorated_keys[5], {0, 5});
check(sel, decorated_keys[6], {0});
check(sel, decorated_keys[7], {0});
check(sel, 0, std::unordered_set<shared_sstable>{ssts[0], ssts[1], ssts[2]});
check(sel, 1, std::unordered_set<shared_sstable>{ssts[0], ssts[1], ssts[2]});
check(sel, 2, std::unordered_set<shared_sstable>{ssts[0]});
check(sel, 3, std::unordered_set<shared_sstable>{ssts[0], ssts[3]});
check(sel, 4, std::unordered_set<shared_sstable>{ssts[0], ssts[3], ssts[4], ssts[5]});
check(sel, 5, std::unordered_set<shared_sstable>{ssts[0], ssts[5]});
check(sel, 6, std::unordered_set<shared_sstable>{ssts[0]});
check(sel, 7, std::unordered_set<shared_sstable>{ssts[0]});
}
return make_ready_future<>();
@@ -1998,12 +2016,12 @@ SEASTAR_TEST_CASE(sstable_set_erase) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
sstable_set set = cs.make_sstable_set(s);
auto sst = sstable_for_overlapping_test(env, s, 0, key, key, 0);
auto sst = sstable_for_overlapping_test(env, s, key, key, 0);
set.insert(sst);
assert_sstable_set_size(set, 1);
auto unleveled_sst = sstable_for_overlapping_test(env, s, 1, key, key, 0);
auto leveled_sst = sstable_for_overlapping_test(env, s, 2, key, key, 1);
auto unleveled_sst = sstable_for_overlapping_test(env, s, key, key, 0);
auto leveled_sst = sstable_for_overlapping_test(env, s, key, key, 1);
set.erase(unleveled_sst);
set.erase(leveled_sst);
assert_sstable_set_size(set, 1);
@@ -2016,12 +2034,12 @@ SEASTAR_TEST_CASE(sstable_set_erase) {
// triggers use-after-free, described in #4572, by operating on interval that relies on info of a destroyed sstable object.
{
auto sst = sstable_for_overlapping_test(env, s, 0, key, key, 1);
auto sst = sstable_for_overlapping_test(env, s, key, key, 1);
set.insert(sst);
assert_sstable_set_size(set, 1);
}
auto sst2 = sstable_for_overlapping_test(env, s, 0, key, key, 1);
auto sst2 = sstable_for_overlapping_test(env, s, key, key, 1);
set.insert(sst2);
assert_sstable_set_size(set, 2);
@@ -2032,11 +2050,11 @@ SEASTAR_TEST_CASE(sstable_set_erase) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options());
sstable_set set = cs.make_sstable_set(s);
auto sst = sstable_for_overlapping_test(env, s, 0, key, key, 0);
auto sst = sstable_for_overlapping_test(env, s, key, key, 0);
set.insert(sst);
assert_sstable_set_size(set, 1);
auto sst2 = sstable_for_overlapping_test(env, s, 1, key, key, 0);
auto sst2 = sstable_for_overlapping_test(env, s, key, key, 0);
set.erase(sst2);
assert_sstable_set_size(set, 1);
BOOST_REQUIRE(set.all()->contains(sst));
@@ -2111,7 +2129,6 @@ SEASTAR_TEST_CASE(sstable_owner_shards) {
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 1);
return m;
};
auto gen = make_lw_shared<unsigned>(1);
auto make_shared_sstable = [&] (std::unordered_set<unsigned> shards, unsigned ignore_msb, unsigned smp_count) {
auto key_schema = schema_builder(s).with_sharder(smp_count, ignore_msb).build();
auto mut = [&] (auto shard) {
@@ -2119,9 +2136,9 @@ SEASTAR_TEST_CASE(sstable_owner_shards) {
};
auto muts = boost::copy_range<std::vector<mutation>>(shards
| boost::adaptors::transformed([&] (auto shard) { return mut(shard); }));
auto sst_gen = [&env, s, gen, ignore_msb] () mutable {
auto sst_gen = [&] () mutable {
auto schema = schema_builder(s).with_sharder(1, ignore_msb).build();
auto sst = env.make_sstable(std::move(schema), (*gen)++);
auto sst = env.make_sstable(std::move(schema));
return sst;
};
auto sst = make_sstable_containing(sst_gen, std::move(muts));
@@ -2303,7 +2320,7 @@ SEASTAR_TEST_CASE(test_broken_promoted_index_is_skipped) {
.with_column("v", int32_type)
.build(schema_builder::compact_storage::yes);
auto sst = env.make_sstable(s, get_test_dir("broken_non_compound_pi_and_range_tombstone", s), 1, version);
auto sst = env.make_sstable(s, get_test_dir("broken_non_compound_pi_and_range_tombstone", s), sstables::generation_type(1), version);
try {
sst->load().get();
} catch (...) {
@@ -2497,13 +2514,11 @@ SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
auto s = ss.schema();
const auto keys = tests::generate_partition_keys(6, s);
auto next_gen = [gen = make_lw_shared<unsigned>(1)] { return (*gen)++; };
sstables::sstable_run run;
auto insert = [&] (int first_key_idx, int last_key_idx) {
auto sst = sstable_for_overlapping_test(env, s, next_gen(),
keys[first_key_idx].key(), keys[last_key_idx].key());
auto sst = sstable_for_overlapping_test(env, s, keys[first_key_idx].key(), keys[last_key_idx].key());
return run.insert(sst);
};
@@ -2666,7 +2681,7 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
auto mr = make_flat_mutation_reader_from_mutations_v2(ss.schema(), env.make_reader_permit(), {mut});
sstable_writer_config cfg = env.manager().configure_writer();
auto sst = make_sstable_easy(env, std::move(mr), cfg, 0, version, 0);
auto sst = make_sstable_easy(env, std::move(mr), cfg, version, 0);
auto sst_mr = sst->as_mutation_source().make_reader_v2(s, env.make_reader_permit(), query::full_partition_range, s->full_slice());
auto close_mr = deferred_close(sst_mr);
@@ -2738,7 +2753,7 @@ SEASTAR_TEST_CASE(test_missing_partition_end_fragment) {
auto mr = make_flat_mutation_reader_from_fragments(s, env.make_reader_permit(), std::move(frags));
auto close_mr = deferred_close(mr);
auto sst = env.make_sstable(s, 0, version);
auto sst = env.make_sstable(s, version);
sstable_writer_config cfg = env.manager().configure_writer();
try {
@@ -2760,7 +2775,6 @@ SEASTAR_TEST_CASE(test_sstable_origin) {
const auto pk = tests::generate_partition_key(s);
auto mut = mutation(s, pk);
ss.add_row(mut, ss.make_ckey(0), "val");
int gen = 1;
for (const auto version : all_sstable_versions) {
if (version < sstable_version_types::mc) {
@@ -2770,14 +2784,14 @@ SEASTAR_TEST_CASE(test_sstable_origin) {
// Test empty sstable_origin.
auto mr = make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), {mut});
sstable_writer_config cfg = env.manager().configure_writer("");
auto sst = make_sstable_easy(env, std::move(mr), cfg, gen++, version, 0);
auto sst = make_sstable_easy(env, std::move(mr), cfg, version, 0);
BOOST_REQUIRE_EQUAL(sst->get_origin(), "");
// Test that a random sstable_origin is stored and retrieved properly.
mr = make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), {mut});
sstring origin = fmt::format("test-{}", tests::random::get_sstring());
cfg = env.manager().configure_writer(origin);
sst = make_sstable_easy(env, std::move(mr), cfg, gen++, version, 0);
sst = make_sstable_easy(env, std::move(mr), cfg, version, 0);
BOOST_REQUIRE_EQUAL(sst->get_origin(), origin);
}
});
@@ -2794,9 +2808,9 @@ SEASTAR_TEST_CASE(compound_sstable_set_basic_test) {
lw_shared_ptr<sstables::sstable_set> compound = make_lw_shared(sstables::make_compound_sstable_set(s, {set1, set2}));
const auto keys = tests::generate_partition_keys(2, s);
set1->insert(sstable_for_overlapping_test(env, s, 1, keys[0].key(), keys[1].key(), 0));
set2->insert(sstable_for_overlapping_test(env, s, 2, keys[0].key(), keys[1].key(), 0));
set2->insert(sstable_for_overlapping_test(env, s, 3, keys[0].key(), keys[1].key(), 0));
set1->insert(sstable_for_overlapping_test(env, s, keys[0].key(), keys[1].key(), 0));
set2->insert(sstable_for_overlapping_test(env, s, keys[0].key(), keys[1].key(), 0));
set2->insert(sstable_for_overlapping_test(env, s, keys[0].key(), keys[1].key(), 0));
BOOST_REQUIRE(boost::accumulate(*compound->all() | boost::adaptors::transformed([] (const sstables::shared_sstable& sst) { return generation_value(sst->generation()); }), unsigned(0)) == 6);
{
@@ -2875,8 +2889,6 @@ SEASTAR_TEST_CASE(test_validate_checksums) {
const std::map<sstring, sstring> no_compression_params = {};
const std::map<sstring, sstring> lz4_compression_params = {{compression_parameters::SSTABLE_COMPRESSION, "LZ4Compressor"}};
int gen = 0;
for (const auto version : writable_sstable_versions) {
testlog.info("version={}", version);
for (const auto& compression_params : {no_compression_params, lz4_compression_params}) {
@@ -2886,7 +2898,7 @@ SEASTAR_TEST_CASE(test_validate_checksums) {
auto mr = make_flat_mutation_reader_from_mutations_v2(schema, permit, muts);
auto close_mr = deferred_close(mr);
auto sst = env.make_sstable(sst_schema, gen++, version);
auto sst = env.make_sstable(sst_schema, version);
sstable_writer_config cfg = env.manager().configure_writer();
auto wr = sst->get_writer(*sst_schema, 1, cfg, encoding_stats{}, default_priority_class());

View File

@@ -59,6 +59,22 @@ schema_ptr test_table_schema() {
using namespace sstables;
class generation_for_sharded_test {
std::optional<sstables::generation_type> _gen;
shard_id _shard = this_shard_id();
public:
generation_for_sharded_test(std::optional<sstables::generation_type> gen = std::nullopt) noexcept : _gen(std::move(gen)) {};
// Must be called from a seastar thread.
sstables::generation_type regenerate() noexcept {
return smp::submit_to(_shard, [&] {
_gen = replica::table::make_new_generation(_gen);
return *_gen;
}).get();
}
};
// Must be called from a seastar thread.
sstables::shared_sstable
make_sstable_for_this_shard(std::function<sstables::shared_sstable()> sst_factory) {
auto s = test_table_schema();
@@ -73,7 +89,7 @@ make_sstable_for_this_shard(std::function<sstables::shared_sstable()> sst_factor
/// Arguments passed to the function are passed to table::make_sstable
template <typename... Args>
sstables::shared_sstable
make_sstable_for_all_shards(replica::database& db, replica::table& table, fs::path sstdir, int64_t generation) {
make_sstable_for_all_shards(replica::database& db, replica::table& table, fs::path sstdir, sstables::generation_type::int_t generation) {
// Unlike the previous helper, we'll assume we're in a thread here. It's less flexible
// but the users are usually in a thread, and rewrite_toc_without_scylla_component requires
// a thread. We could fix that, but deferring that for now.
@@ -85,17 +101,18 @@ make_sstable_for_all_shards(replica::database& db, replica::table& table, fs::pa
m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0));
mt->apply(std::move(m));
}
auto sst = table.get_sstables_manager().make_sstable(s, sstdir.native(), generation_from_value(generation++));
auto sst = table.get_sstables_manager().make_sstable(s, sstdir.native(), sstables::generation_type(generation));
write_memtable_to_sstable(*mt, sst, table.get_sstables_manager().configure_writer("test")).get();
mt->clear_gently().get();
// We can't write an SSTable with bad sharding, so pretend
// it came from Cassandra
testlog.debug("make_sstable_for_all_shards: {}: rewriting TOC", sst->get_filename());
sstables::test(sst).remove_component(sstables::component_type::Scylla).get();
sstables::test(sst).rewrite_toc_without_scylla_component();
return sst;
}
sstables::shared_sstable new_sstable(sstables::test_env& env, fs::path dir, int64_t gen) {
sstables::shared_sstable new_sstable(sstables::test_env& env, fs::path dir, sstables::generation_type gen) {
testlog.debug("new_sstable: dir={} gen={}", dir, gen);
return env.make_sstable(test_table_schema(), dir.native(), gen);
}
@@ -105,22 +122,6 @@ sstables::shared_sstable new_env_sstable(sstables::test_env& env) {
return env.make_sstable(test_table_schema());
}
sstables::shared_sstable new_env_sstable_with_gen(sstables::test_env& env, int64_t gen) {
testlog.debug("new_env_sstable_with_gen: gen={} dir={}", gen, env.tempdir().path());
return env.make_sstable(test_table_schema(), gen);
}
// there is code for this in distributed_loader.cc but this is so simple it is not worth polluting
// the public namespace for it. Repeat it here.
inline future<int64_t>
highest_generation_seen(sharded<sstables::sstable_directory>& dir) {
return dir.map_reduce0(std::mem_fn(&sstable_directory::highest_generation_seen), generation_from_value(0), [] (generation_type a, generation_type b) {
return std::max<generation_type>(a, b);
}).then([] (generation_type gen) {
return generation_value(gen);
});
}
class wrapped_test_env {
std::function<sstables::sstables_manager* ()> _get_mgr;
std::optional<tmpdir> tmpdir_opt;
@@ -192,9 +193,9 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) {
with_sstable_directory(env, [] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, {}).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
auto max_generation_seen = highest_generation_seen(sstdir).get0();
// No generation found on empty directory.
BOOST_REQUIRE_EQUAL(max_generation_seen, 0);
BOOST_REQUIRE(!max_generation_seen);
});
});
}
@@ -311,15 +312,26 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
// Test that we see the right generation during the scan. Temporary files are skipped
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
sstables::test_env::do_with_sharded_async([] (sharded<test_env>& env) {
make_sstable_for_this_shard(std::bind(new_env_sstable_with_gen, std::ref(env.local()), 3333));
auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable_with_gen, std::ref(env.local()), 6666));
rename_file(test::filename(*sst, sstables::component_type::TOC).native(), test::filename(*sst, sstables::component_type::TemporaryTOC).native()).get();
auto sst1 = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local())));
auto sst2 = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local())));
rename_file(test::filename(*sst2, sstables::component_type::TOC).native(), test::filename(*sst2, sstables::component_type::TemporaryTOC).native()).get();
with_sstable_directory(env, [] (sharded<sstables::sstable_directory>& sstdir) {
std::vector<bool> gen1_seen;
gen1_seen.resize(smp::count);
with_sstable_directory(env, [&] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
BOOST_REQUIRE_EQUAL(max_generation_seen, 3333);
sstdir.invoke_on_all([&] (sstables::sstable_directory& sstdir) {
return seastar::async([&] {
sstdir.do_for_each_sstable([&] (const shared_sstable& sst) {
BOOST_REQUIRE(sst->generation() == sst1->generation());
BOOST_REQUIRE(!gen1_seen[this_shard_id()]);
gen1_seen[this_shard_id()] = true;
return make_ready_future<>();
}).get();
});
}).get();
});
BOOST_REQUIRE_EQUAL(std::count(gen1_seen.begin(), gen1_seen.end(), true), 1);
}).get();
}
@@ -351,7 +363,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_gene
// this is why it is annoying for the internal functions in the test infrastructure to
// assume threaded execution
return seastar::async([dir, i, &env] {
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir, i));
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir, sstables::generation_type(i)));
});
}).get();
}
@@ -374,7 +386,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_ge
// this is why it is annoying for the internal functions in the test infrastructure to
// assume threaded execution
return seastar::async([dir, i, &env] {
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir, i + 1));
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir, sstables::generation_type(i + 1)));
});
}).get();
}
@@ -486,13 +498,13 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++);
}
with_sstable_directory(upload_path, e, [&e, upload_path] (sharded<sstables::sstable_directory>& sstdir) {
with_sstable_directory(upload_path, e, [&] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<int64_t> generation_for_test = {};
generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed);
auto max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<sstables::generation_type::int_t> generation_for_test = {};
generation_for_test.store(max_generation_seen->value() + 1, std::memory_order_relaxed);
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) {
auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed);
@@ -530,9 +542,9 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<int64_t> generation_for_test = {};
generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed);
auto max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<sstables::generation_type::int_t> generation_for_test = {};
generation_for_test.store(max_generation_seen->value() + 1, std::memory_order_relaxed);
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) {
auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed);
@@ -570,9 +582,9 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<int64_t> generation_for_test = {};
generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed);
auto max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<sstables::generation_type::int_t> generation_for_test = {};
generation_for_test.store(max_generation_seen->value() + 1, std::memory_order_relaxed);
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) {
auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed);

View File

@@ -19,7 +19,7 @@ using namespace sstables;
namespace fs = std::filesystem;
// Must be called from a seastar thread
static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schema_ptr schema_ptr, fs::path src_path, unsigned long gen) {
static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schema_ptr schema_ptr, fs::path src_path, sstables::generation_type::int_t gen) {
auto sst = env.reusable_sst(schema_ptr, src_path.native(), gen).get0();
auto dst_path = tmp_path / src_path.filename() / format("gen-{}", gen);
recursive_touch_directory(dst_path.native()).get();
@@ -35,7 +35,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
auto env = test_env();
auto stop_env = defer([&env] { env.stop().get(); });
int64_t gen = 1;
sstables::generation_type::int_t gen = 1;
auto [ sst, cur_dir ] = copy_sst_to_tmpdir(tmp.path(), env, uncompressed_schema(), fs::path(uncompressed_dir()), gen);
for (auto i = 0; i < 2; i++) {
@@ -76,7 +76,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
// Returns true when done
//
// Must be called from a seastar thread
static bool partial_create_links(sstable_ptr sst, fs::path dst_path, int64_t gen, int count) {
static bool partial_create_links(sstable_ptr sst, fs::path dst_path, sstables::generation_type::int_t gen, int count) {
auto schema = sst->get_schema();
auto tmp_toc = sstable::filename(dst_path.native(), schema->ks_name(), schema->cf_name(), sst->get_version(), generation_from_value(gen), sstable_format_types::big, component_type::TemporaryTOC);
link_file(test::filename(*sst, component_type::TOC).native(), tmp_toc).get();
@@ -101,7 +101,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
auto env = test_env();
auto stop_env = defer([&env] { env.stop().get(); });
int64_t gen = 1;
sstables::generation_type::int_t gen = 1;
auto [ sst, cur_dir ] = copy_sst_to_tmpdir(tmp.path(), env, uncompressed_schema(), fs::path(uncompressed_dir()), gen);
bool done;
@@ -122,7 +122,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_exists_failure) {
auto env = test_env();
auto stop_env = defer([&env] { env.stop().get(); });
int64_t gen = 1;
sstables::generation_type::int_t gen = 1;
auto [ src_sst, cur_dir ] = copy_sst_to_tmpdir(tmp.path(), env, uncompressed_schema(), fs::path(uncompressed_dir()), gen);
auto [ dst_sst, new_dir ] = copy_sst_to_tmpdir(tmp.path(), env, uncompressed_schema(), fs::path(uncompressed_dir()), ++gen);

View File

@@ -9,6 +9,7 @@
#include <boost/test/unit_test.hpp>
#include <seastar/net/inet_address.hh>
#include "sstables/generation_type.hh"
#include "test/lib/scylla_test_case.hh"
#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/closeable.hh>
@@ -559,8 +560,8 @@ static schema_ptr tombstone_overlap_schema() {
}
static future<sstable_ptr> ka_sst(sstables::test_env& env, schema_ptr schema, sstring dir, unsigned long generation) {
auto sst = env.make_sstable(std::move(schema), dir, generation, sstables::sstable::version_types::ka, big);
static future<sstable_ptr> ka_sst(sstables::test_env& env, schema_ptr schema, sstring dir, sstables::generation_type::int_t generation) {
auto sst = env.make_sstable(std::move(schema), dir, sstables::generation_from_value(generation), sstables::sstable::version_types::ka, big);
auto fut = sst->load();
return std::move(fut).then([sst = std::move(sst)] {
return make_ready_future<sstable_ptr>(std::move(sst));
@@ -1021,7 +1022,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
auto sst = make_sstable_easy(env, mt, cfg, 1, version);
auto sst = make_sstable_easy(env, mt, cfg, version);
{
assert_that(get_index_reader(sst, env.make_reader_permit())).has_monotonic_positions(*s);
@@ -1071,7 +1072,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
auto sst = make_sstable_easy(env, mt, cfg, 1, version);
auto sst = make_sstable_easy(env, mt, cfg, version);
{
assert_that(get_index_reader(sst, env.make_reader_permit())).has_monotonic_positions(*s);
@@ -1118,7 +1119,7 @@ SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
auto sst = make_sstable_easy(env, mt, cfg, generation, version);
auto sst = make_sstable_easy(env, mt, cfg, version);
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck})).build();
@@ -1154,7 +1155,7 @@ SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound
mt->apply(m);
sstable_writer_config cfg = env.manager().configure_writer();
auto sst = make_sstable_easy(env, mt, cfg, 1, version);
auto sst = make_sstable_easy(env, mt, cfg, version);
{
auto slice = partition_slice_builder(*s).build();
@@ -1185,7 +1186,7 @@ SEASTAR_TEST_CASE(test_promoted_index_is_absent_for_schemas_without_clustering_k
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
auto sst = make_sstable_easy(env, mt, cfg, 1, version);
auto sst = make_sstable_easy(env, mt, cfg, version);
assert_that(get_index_reader(sst, env.make_reader_permit())).is_empty(*s);
}
});
@@ -1223,7 +1224,7 @@ SEASTAR_TEST_CASE(test_writing_combined_stream_with_tombstones_at_the_same_posit
auto combined_permit = env.make_reader_permit();
auto mr = make_combined_reader(s, combined_permit,
mt1->make_flat_reader(s, combined_permit), mt2->make_flat_reader(s, combined_permit));
auto sst = make_sstable_easy(env, std::move(mr), env.manager().configure_writer(), 1, version);
auto sst = make_sstable_easy(env, std::move(mr), env.manager().configure_writer(), version);
assert_that(sst->as_mutation_source().make_reader_v2(s, env.make_reader_permit()))
.produces(m1 + m2)
@@ -1292,7 +1293,7 @@ SEASTAR_TEST_CASE(test_key_count_estimation) {
}
auto _ = env.tempdir().make_sweeper();
shared_sstable sst = make_sstable_easy(env, mt, env.manager().configure_writer(), 1, version, pks.size());
shared_sstable sst = make_sstable_easy(env, mt, env.manager().configure_writer(), version, pks.size());
auto max_est = sst->get_estimated_key_count();
testlog.trace("count = {}", count);
@@ -1453,16 +1454,17 @@ SEASTAR_TEST_CASE(test_reading_serialization_header) {
auto m1ow = md1_overwrite.build(s);
mt->apply(m1ow);
std::optional<sstables::generation_type> gen;
{
// SSTable class has way too many responsibilities. In particular, it mixes the reading and
// writting parts. Let's use a separate objects for writing and reading to ensure that nothing
// carries over that wouldn't normally be read from disk.
auto sst = env.make_sstable(s, 1);
auto sst = env.make_sstable(s);
gen.emplace(sst->generation());
sst->write_components(mt->make_flat_reader(s, env.make_reader_permit()), 2, s, env.manager().configure_writer(), mt->get_encoding_stats()).get();
}
auto sst = env.make_sstable(s, 1);
sst->load().get();
auto sst = env.reusable_sst(s, *gen).get();
auto hdr = sst->get_serialization_header();
BOOST_CHECK_EQUAL(hdr.static_columns.elements.size(), 1);
@@ -1549,7 +1551,7 @@ SEASTAR_TEST_CASE(test_counter_header_size) {
mt->apply(m);
for (const auto version : writable_sstable_versions) {
auto sst = make_sstable_easy(env, mt, env.manager().configure_writer(), 1, version);
auto sst = make_sstable_easy(env, mt, env.manager().configure_writer(), version);
assert_that(sst->as_mutation_source().make_reader_v2(s, env.make_reader_permit()))
.produces(m)
.produces_end_of_stream();

View File

@@ -35,18 +35,17 @@ SEASTAR_TEST_CASE(test_sstables_sstable_set_read_modify_write) {
auto pk = tests::generate_partition_key(s);
auto mut = mutation(s, pk);
ss.add_row(mut, ss.make_ckey(0), "val");
int gen = 1;
auto mr = make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), {mut});
sstable_writer_config cfg = env.manager().configure_writer("");
auto sst1 = make_sstable_easy(env, std::move(mr), cfg, gen++);
auto sst1 = make_sstable_easy(env, std::move(mr), cfg);
auto ss1 = make_lw_shared<sstables::sstable_set>(make_sstable_set(ss.schema(), make_lw_shared<sstable_list>({sst1})));
BOOST_REQUIRE_EQUAL(ss1->all()->size(), 1);
// Test that a random sstable_origin is stored and retrieved properly.
mr = make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), {mut});
auto sst2 = make_sstable_easy(env, std::move(mr), cfg, gen++);
auto sst2 = make_sstable_easy(env, std::move(mr), cfg);
auto ss2 = make_lw_shared<sstables::sstable_set>(*ss1);
ss2->insert(sst2);
@@ -63,11 +62,10 @@ SEASTAR_TEST_CASE(test_time_series_sstable_set_read_modify_write) {
auto pk = tests::generate_partition_key(s);
auto mut = mutation(s, pk);
ss.add_row(mut, ss.make_ckey(0), "val");
int gen = 1;
sstable_writer_config cfg = env.manager().configure_writer("");
auto mr = make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), {mut});
auto sst1 = make_sstable_easy(env, std::move(mr), cfg, gen++);
auto sst1 = make_sstable_easy(env, std::move(mr), cfg);
auto ss1 = make_lw_shared<time_series_sstable_set>(ss.schema());
ss1->insert(sst1);
@@ -75,7 +73,7 @@ SEASTAR_TEST_CASE(test_time_series_sstable_set_read_modify_write) {
// Test that a random sstable_origin is stored and retrieved properly.
mr = make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), {mut});
auto sst2 = make_sstable_easy(env, std::move(mr), cfg, gen++);
auto sst2 = make_sstable_easy(env, std::move(mr), cfg);
auto ss2 = make_lw_shared<time_series_sstable_set>(*ss1);
ss2->insert(sst2);

View File

@@ -39,7 +39,7 @@ bytes as_bytes(const sstring& s) {
return { reinterpret_cast<const int8_t*>(s.data()), s.size() };
}
future<> test_using_working_sst(schema_ptr s, sstring dir, int64_t gen) {
future<> test_using_working_sst(schema_ptr s, sstring dir, sstables::generation_type::int_t gen) {
return test_env::do_with([s = std::move(s), dir = std::move(dir), gen] (test_env& env) {
return env.reusable_sst(std::move(s), std::move(dir), gen).discard_result();
});
@@ -64,9 +64,9 @@ SEASTAR_TEST_CASE(composite_index) {
template<typename Func>
inline auto
test_using_reusable_sst(schema_ptr s, sstring dir, unsigned long gen, Func&& func) {
test_using_reusable_sst(schema_ptr s, sstring dir, sstables::generation_type::int_t gen, Func&& func) {
return test_env::do_with([s = std::move(s), dir = std::move(dir), gen, func = std::move(func)] (test_env& env) {
return env.reusable_sst(std::move(s), std::move(dir), gen).then([&env, func = std::move(func)] (sstable_ptr sst) mutable {
return env.reusable_sst(std::move(s), std::move(dir), generation_from_value(gen)).then([&env, func = std::move(func)] (sstable_ptr sst) mutable {
return func(env, std::move(sst));
});
});
@@ -94,7 +94,7 @@ SEASTAR_TEST_CASE(composite_index_read) {
}
template<uint64_t Position, uint64_t EntryPosition, uint64_t EntryKeySize>
future<> summary_query(schema_ptr schema, sstring path, int generation) {
future<> summary_query(schema_ptr schema, sstring path, sstables::generation_type::int_t generation) {
return test_using_reusable_sst(std::move(schema), path, generation, [] (test_env& env, sstable_ptr ptr) {
return sstables::test(ptr).read_summary_entry(Position).then([ptr] (auto entry) {
BOOST_REQUIRE(entry.position == EntryPosition);
@@ -105,7 +105,7 @@ future<> summary_query(schema_ptr schema, sstring path, int generation) {
}
template<uint64_t Position, uint64_t EntryPosition, uint64_t EntryKeySize>
future<> summary_query_fail(schema_ptr schema, sstring path, int generation) {
future<> summary_query_fail(schema_ptr schema, sstring path, sstables::generation_type::int_t generation) {
return summary_query<Position, EntryPosition, EntryKeySize>(std::move(schema), path, generation).then_wrapped([] (auto fut) {
try {
fut.get();
@@ -171,9 +171,9 @@ SEASTAR_TEST_CASE(missing_summary_first_last_sane) {
});
}
static future<sstable_ptr> do_write_sst(test_env& env, schema_ptr schema, sstring load_dir, sstring write_dir, unsigned long generation) {
static future<sstable_ptr> do_write_sst(test_env& env, schema_ptr schema, sstring load_dir, sstring write_dir, sstables::generation_type generation) {
return env.reusable_sst(std::move(schema), load_dir, generation).then([write_dir, generation] (sstable_ptr sst) {
sstables::test(sst).change_generation_number(generation + 1);
sstables::test(sst).change_generation_number(replica::table::make_new_generation(generation));
sstables::test(sst).change_dir(write_dir);
auto fut = sstables::test(sst).store();
return std::move(fut).then([sst = std::move(sst)] {
@@ -182,9 +182,9 @@ static future<sstable_ptr> do_write_sst(test_env& env, schema_ptr schema, sstrin
});
}
static future<> write_sst_info(schema_ptr schema, sstring load_dir, sstring write_dir, unsigned long generation) {
static future<> write_sst_info(schema_ptr schema, sstring load_dir, sstring write_dir, sstables::generation_type::int_t generation) {
return test_env::do_with([schema = std::move(schema), load_dir = std::move(load_dir), write_dir = std::move(write_dir), generation] (test_env& env) {
return do_write_sst(env, std::move(schema), load_dir, write_dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
return do_write_sst(env, std::move(schema), load_dir, write_dir, sstables::generation_type(generation)).then([] (auto ptr) { return make_ready_future<>(); });
});
}
@@ -208,13 +208,23 @@ static future<std::pair<bufptr_t, size_t>> read_file(sstring file_path)
struct sstdesc {
sstring dir;
int64_t gen;
sstables::generation_type gen;
sstdesc(sstring dir, sstables::generation_type gen)
: dir(std::move(dir))
, gen(gen)
{}
sstdesc(sstring dir, sstables::generation_type::int_t gen_val)
: dir(std::move(dir))
, gen(gen_val)
{}
};
static future<> compare_files(sstdesc file1, sstdesc file2, component_type component) {
auto file_path = sstable::filename(file1.dir, "ks", "cf", la, generation_from_value(file1.gen), big, component);
auto file_path = sstable::filename(file1.dir, "ks", "cf", la, file1.gen, big, component);
return read_file(file_path).then([component, file2] (auto ret) {
auto file_path = sstable::filename(file2.dir, "ks", "cf", la, generation_from_value(file2.gen), big, component);
auto file_path = sstable::filename(file2.dir, "ks", "cf", la, file2.gen, big, component);
return read_file(file_path).then([ret = std::move(ret)] (auto ret2) {
// assert that both files have the same size.
BOOST_REQUIRE(ret.second == ret2.second);
@@ -242,11 +252,10 @@ SEASTAR_TEST_CASE(check_compressed_info_func) {
future<>
write_and_validate_sst(schema_ptr s, sstring dir, noncopyable_function<future<> (shared_sstable sst1, shared_sstable sst2)> func) {
return test_env::do_with([s = std::move(s), dir = std::move(dir), func = std::move(func)] (test_env& env) mutable {
return do_write_sst(env, s, dir, env.tempdir().path().string(), 1).then([&env, s = std::move(s), func = std::move(func)] (auto sst1) {
auto sst2 = env.make_sstable(s, 2, sst1->get_version());
return func(std::move(sst1), std::move(sst2));
});
return test_env::do_with_async([s = std::move(s), dir = std::move(dir), func = std::move(func)] (test_env& env) mutable {
auto sst1 = do_write_sst(env, s, dir, env.tempdir().path().native(), env.new_generation()).get();
auto sst2 = env.make_sstable(s, sst1->get_version());
func(std::move(sst1), std::move(sst2)).get();
});
}
@@ -469,7 +478,7 @@ SEASTAR_TEST_CASE(wrong_range) {
}
static future<>
test_sstable_exists(sstring dir, unsigned long generation, bool exists) {
test_sstable_exists(sstring dir, sstables::generation_type::int_t generation, bool exists) {
auto file_path = sstable::filename(dir, "ks", "cf", la, generation_from_value(generation), big, component_type::Data);
return open_file_dma(file_path, open_flags::ro).then_wrapped([exists] (future<file> f) {
if (exists) {

View File

@@ -42,12 +42,12 @@ public:
return table_s.on_compaction_completion(sstables::compaction_completion_desc{ .old_sstables = sstables_to_remove, .new_sstables = new_sstables }, sstables::offstrategy::no);
}
static void update_sstables_known_generation(replica::column_family& cf, unsigned generation) {
cf.update_sstables_known_generation(generation_from_value(generation));
static void update_sstables_known_generation(replica::column_family& cf, sstables::generation_type generation) {
cf.update_sstables_known_generation(generation);
}
static uint64_t calculate_generation_for_new_table(replica::column_family& cf) {
return generation_value(cf.calculate_generation_for_new_table());
static sstables::generation_type calculate_generation_for_new_table(replica::column_family& cf) {
return cf.calculate_generation_for_new_table();
}
};

View File

@@ -18,6 +18,7 @@
#include "gms/feature_service.hh"
#include "sstables/version.hh"
#include "sstables/sstable_directory.hh"
#include "replica/database.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/test_services.hh"
@@ -56,14 +57,16 @@ class test_env {
db::nop_large_data_handler nop_ld_handler;
test_env_sstables_manager mgr;
reader_concurrency_semaphore semaphore;
unsigned long generation = 1;
std::optional<sstables::generation_type> generation;
impl(test_env_config cfg);
impl(impl&&) = delete;
impl(const impl&) = delete;
unsigned long new_generation() noexcept {
return generation++;
sstables::generation_type new_generation() noexcept {
auto ret = replica::table::make_new_generation(generation);
generation = ret;
return ret;
}
};
std::unique_ptr<impl> _impl;
@@ -77,7 +80,7 @@ public:
});
}
unsigned long new_generation() noexcept {
sstables::generation_type new_generation() noexcept {
return _impl->new_generation();
}
@@ -87,29 +90,18 @@ public:
return _impl->mgr.make_sstable(std::move(schema), dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
}
shared_sstable make_sstable(schema_ptr schema, sstring dir, unsigned long gen_value,
sstable::version_types v = sstables::get_highest_sstable_version(), sstable::format_types f = sstable::format_types::big,
size_t buffer_size = default_sstable_buffer_size, gc_clock::time_point now = gc_clock::now()) {
return make_sstable(std::move(schema), std::move(dir), generation_from_value(gen_value), v, f, buffer_size, now);
}
shared_sstable make_sstable(schema_ptr schema, sstring dir, sstable::version_types v = sstables::get_highest_sstable_version()) {
return make_sstable(std::move(schema), std::move(dir), _impl->generation++, std::move(v));
return make_sstable(std::move(schema), std::move(dir), new_generation(), std::move(v));
}
shared_sstable make_sstable(schema_ptr schema, unsigned long generation,
shared_sstable make_sstable(schema_ptr schema, sstables::generation_type generation,
sstable::version_types v = sstables::get_highest_sstable_version(), sstable::format_types f = sstable::format_types::big,
size_t buffer_size = default_sstable_buffer_size, gc_clock::time_point now = gc_clock::now()) {
return make_sstable(std::move(schema), _impl->dir.path().native(), generation, std::move(v), std::move(f), buffer_size, now);
}
shared_sstable make_sstable(schema_ptr schema, sstable::version_types v = sstables::get_highest_sstable_version()) {
return make_sstable(std::move(schema), _impl->generation++, std::move(v));
}
shared_sstable make_sstable(schema_ptr schema, unsigned long gen_value, sstable::version_types v, size_t buffer_size,
gc_clock::time_point now = gc_clock::now()) {
return make_sstable(std::move(schema), gen_value, v, sstable::format_types::big, buffer_size, now);
return make_sstable(std::move(schema), _impl->dir.path().native(), std::move(v));
}
std::function<shared_sstable()> make_sst_factory(schema_ptr s) {
@@ -125,7 +117,7 @@ public:
}
struct sst_not_found : public std::runtime_error {
sst_not_found(const sstring& dir, unsigned long generation)
sst_not_found(const sstring& dir, sstables::generation_type generation)
: std::runtime_error(format("no versions of sstable generation {} found in {}", generation, dir))
{}
};
@@ -136,7 +128,7 @@ public:
// therefore may block. The future value is a shared sstable - a reference-
// counting pointer to an sstable - allowing for the returned handle to
// be passed around until no longer needed.
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, unsigned long generation,
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation,
sstable::version_types version, sstable::format_types f = sstable::format_types::big) {
auto sst = make_sstable(std::move(schema), dir, generation, version, f);
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
@@ -144,27 +136,41 @@ public:
return make_ready_future<shared_sstable>(std::move(sst));
});
}
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type::int_t gen_value,
sstable::version_types version, sstable::format_types f = sstable::format_types::big) {
return reusable_sst(std::move(schema), std::move(dir), sstables::generation_type(gen_value), version, f);
}
future<shared_sstable> reusable_sst(schema_ptr schema, unsigned long generation,
future<shared_sstable> reusable_sst(schema_ptr schema, sstables::generation_type generation,
sstable::version_types version, sstable::format_types f = sstable::format_types::big) {
return reusable_sst(std::move(schema), _impl->dir.path().native(), std::move(generation), std::move(version), std::move(f));
}
// looks up the sstable in the given dir
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, unsigned long generation);
future<shared_sstable> reusable_sst(schema_ptr schema, unsigned long generation) {
return reusable_sst(std::move(schema), _impl->dir.path().native(), generation);
future<shared_sstable> reusable_sst(schema_ptr schema, sstables::generation_type::int_t gen_value,
sstable::version_types version, sstable::format_types f = sstable::format_types::big) {
return reusable_sst(std::move(schema), sstables::generation_type(gen_value), std::move(version), std::move(f));
}
future<shared_sstable> reusable_sst(schema_ptr schema, shared_sstable sst) {
return reusable_sst(std::move(schema), sst->get_storage().prefix(), sst->generation().value(), sst->get_version());
return reusable_sst(std::move(schema), sst->get_storage().prefix(), sst->generation(), sst->get_version());
}
future<shared_sstable> reusable_sst(shared_sstable sst) {
return reusable_sst(sst->get_schema(), std::move(sst));
}
// looks up the sstable in the given dir
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation);
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type::int_t gen_value) {
return reusable_sst(std::move(schema), std::move(dir), sstables::generation_type(gen_value));
}
future<shared_sstable> reusable_sst(schema_ptr schema, sstables::generation_type generation) {
return reusable_sst(std::move(schema), _impl->dir.path().native(), generation);
}
future<shared_sstable> reusable_sst(schema_ptr schema, sstables::generation_type::int_t gen_value) {
return reusable_sst(std::move(schema), sstables::generation_type(gen_value));
}
test_env_sstables_manager& manager() { return _impl->mgr; }
reader_concurrency_semaphore& semaphore() { return _impl->semaphore; }
db::config& db_config() { return *_impl->db_config; }

View File

@@ -92,7 +92,7 @@ shared_sstable make_sstable(sstables::test_env& env, schema_ptr s, sstring dir,
mt->apply(m);
}
auto sst = env.make_sstable(s, dir_path.string(), 1, version, sstable_format_types::big, default_sstable_buffer_size, query_time);
auto sst = env.make_sstable(s, dir_path.string(), env.new_generation(), version, sstable_format_types::big, default_sstable_buffer_size, query_time);
auto mr = mt->make_flat_reader(s, env.make_reader_permit());
sst->write_components(std::move(mr), mutations.size(), s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
@@ -100,16 +100,16 @@ shared_sstable make_sstable(sstables::test_env& env, schema_ptr s, sstring dir,
}
shared_sstable make_sstable_easy(test_env& env, flat_mutation_reader_v2 rd, sstable_writer_config cfg,
int64_t generation, const sstables::sstable::version_types version, int expected_partition) {
sstables::generation_type gen, const sstables::sstable::version_types version, int expected_partition) {
auto s = rd.schema();
auto sst = env.make_sstable(s, generation, version, sstable_format_types::big);
auto sst = env.make_sstable(s, gen, version, sstable_format_types::big);
sst->write_components(std::move(rd), expected_partition, s, cfg, encoding_stats{}).get();
sst->load().get();
return sst;
}
shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr<replica::memtable> mt, sstable_writer_config cfg,
unsigned long gen, const sstable::version_types v, int estimated_partitions, gc_clock::time_point query_time) {
sstables::generation_type gen, const sstable::version_types v, int estimated_partitions, gc_clock::time_point query_time) {
schema_ptr s = mt->schema();
auto sst = env.make_sstable(s, gen, v, sstable_format_types::big, default_sstable_buffer_size, query_time);
auto mr = mt->make_flat_reader(s, env.make_reader_permit());
@@ -137,12 +137,12 @@ future<compaction_result> compact_sstables(compaction_manager& cm, sstables::com
co_return ret;
}
static sstring toc_filename(const sstring& dir, schema_ptr schema, unsigned int generation, sstable_version_types v) {
return sstable::filename(dir, schema->ks_name(), schema->cf_name(), v, generation_from_value(generation),
static sstring toc_filename(const sstring& dir, schema_ptr schema, sstables::generation_type generation, sstable_version_types v) {
return sstable::filename(dir, schema->ks_name(), schema->cf_name(), v, generation,
sstable_format_types::big, component_type::TOC);
}
future<shared_sstable> test_env::reusable_sst(schema_ptr schema, sstring dir, unsigned long generation) {
future<shared_sstable> test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation) {
for (auto v : boost::adaptors::reverse(all_sstable_versions)) {
if (co_await file_exists(toc_filename(dir, schema, generation, v))) {
co_return co_await reusable_sst(schema, dir, generation, v);

View File

@@ -138,8 +138,8 @@ public:
return sstables::binary_search(p, entries, sk);
}
void change_generation_number(int64_t generation) {
_sst->_generation = generation_from_value(generation);
void change_generation_number(sstables::generation_type generation) {
_sst->_generation = generation;
}
void change_dir(sstring dir) {
@@ -303,6 +303,16 @@ future<compaction_result> compact_sstables(compaction_manager& cm, sstables::com
can_purge_tombstones can_purge = can_purge_tombstones::yes);
shared_sstable make_sstable_easy(test_env& env, flat_mutation_reader_v2 rd, sstable_writer_config cfg,
int64_t generation = 1, const sstables::sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1);
sstables::generation_type gen, const sstables::sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1);
shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr<replica::memtable> mt, sstable_writer_config cfg,
unsigned long gen = 1, const sstable::version_types v = sstables::get_highest_sstable_version(), int estimated_partitions = 1, gc_clock::time_point = gc_clock::now());
sstables::generation_type gen, const sstable::version_types v = sstables::get_highest_sstable_version(), int estimated_partitions = 1, gc_clock::time_point = gc_clock::now());
inline shared_sstable make_sstable_easy(test_env& env, flat_mutation_reader_v2 rd, sstable_writer_config cfg,
const sstables::sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1) {
return make_sstable_easy(env, std::move(rd), std::move(cfg), env.new_generation(), version, expected_partition);
}
inline shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr<replica::memtable> mt, sstable_writer_config cfg,
const sstable::version_types version = sstables::get_highest_sstable_version(), int estimated_partitions = 1, gc_clock::time_point query_time = gc_clock::now()) {
return make_sstable_easy(env, std::move(mt), std::move(cfg), env.new_generation(), version, estimated_partitions, query_time);
}

View File

@@ -205,11 +205,7 @@ static sizes calculate_sizes(cache_tracker& tracker, const mutation_settings& se
tmpdir sstable_dir;
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
for (auto v : sstables::all_sstable_versions) {
auto sst = env.make_sstable(s,
sstable_dir.path().string(),
1 /* generation */,
v,
sstables::sstable::format_types::big);
auto sst = env.make_sstable(s, v);
auto mt2 = make_lw_shared<replica::memtable>(s);
mt2->apply(*mt, env.make_reader_permit()).get();
write_memtable_to_sstable_for_test(*mt2, sst).get();

View File

@@ -179,7 +179,7 @@ public:
}
future<> load_sstables(unsigned iterations) {
_sst.push_back(_env.make_sstable(s, this->dir(), 0));
_sst.push_back(_env.make_sstable(s, this->dir()));
return _sst.back()->load();
}
@@ -195,7 +195,7 @@ public:
size_t partitions = _mt->partition_count();
test_setup::create_empty_test_dir(dir()).get();
auto sst = _env.make_sstable(s, dir(), idx, sstables::get_highest_sstable_version(), sstable::format_types::big, _cfg.buffer_size);
auto sst = _env.make_sstable(s, dir(), sstables::generation_type(idx), sstables::get_highest_sstable_version(), sstable::format_types::big, _cfg.buffer_size);
auto start = perf_sstable_test_env::now();
write_memtable_to_sstable_for_test(*_mt, sst).get();
@@ -209,10 +209,10 @@ public:
}
future<double> compaction(int idx) {
return test_setup::create_empty_test_dir(dir()).then([this, idx] {
return sstables::test_env::do_with_async_returning<double>([this, idx] (sstables::test_env& env) {
auto sst_gen = [this, gen = make_lw_shared<unsigned>(idx)] () mutable {
return _env.make_sstable(s, dir(), (*gen)++, sstables::get_highest_sstable_version(), sstable::format_types::big, _cfg.buffer_size);
return test_setup::create_empty_test_dir(dir()).then([this] {
return sstables::test_env::do_with_async_returning<double>([this] (sstables::test_env& env) {
auto sst_gen = [this] () mutable {
return _env.make_sstable(s, dir(), _env.new_generation(), sstables::get_highest_sstable_version(), sstable::format_types::big, _cfg.buffer_size);
};
std::vector<shared_sstable> ssts;

View File

@@ -2561,7 +2561,7 @@ const std::vector<option> all_options {
typed_option<std::string>("output-format", "json", "the output-format, one of (text, json)"),
typed_option<std::string>("input-file", "the file containing the input"),
typed_option<std::string>("output-dir", ".", "directory to place the output files to"),
typed_option<int64_t>("generation", "generation of generated sstable"),
typed_option<sstables::generation_type::int_t>("generation", "generation of generated sstable"),
typed_option<std::string>("validation-level", "clustering_key", "degree of validation on the output, one of (partition_region, token, partition_key, clustering_key)"),
typed_option<std::string>("script-file", "script file to load and execute"),
typed_option<program_options::string_map>("script-arg", {}, "parameter(s) for the script"),