Merge 'compaction: validate: validate the index too' from Botond Dénes
In addition to the data file itself. Currently validation avoids the index altogether, using the crawling reader which only relies on the data file and ignores the index+summary. This is because a corrupt sstable usually has a corrupt index too and using both at the same time might hide the corruption. This patch adds targeted validation of the index, independent of and in addition to the already existing data validation: it validates the order of index entries as well as whether the entry points to a complete partition in the data file. This will usually result in duplicate errors for out-of-order partitions: one for the data file and one for the index file. Fixes: #9611 Closes #11405 * github.com:scylladb/scylladb: test/cql-pytest: add test_sstable_validation.py test/cql-pytest: extract scylla_path,temp_workdir fixtures to conftest.py tools/scylla-sstables: write validation result to stdout sstables/sstable: validate(): delegate to mx validator for mx sstables sstables/mx/reader: add mx specific validator mutation/mutation_fragment_stream_validator: add validator() accessor to validating filter sstables/mx/reader: template data_consume_rows_context_m on the consumer sstables/mx/reader: move row_processing_result to namespace scope sstables/mx/reader: use data_consumer::proceed directly sstables/mx/reader.cc: extend namespace to end-of-file (cosmetic) compaction/compaction: remove now unused scrub_validate_mode_validate_reader() compaction/compaction: move away from scrub_validate_mode_validate_reader() tools/scylla-sstable: move away from scrub_validate_mode_validate_reader() test/boost/sstable_compaction_test: move away from scrub_validate_mode_validate_reader() sstables/sstable: add validate() method compaction/compaction: scrub_sstables_validate_mode(): validate sstables one-by-one compaction: scrub: use error messages from validator mutation_fragment_stream_validator: produce error messages in low-level validator
This commit is contained in:
@@ -1236,62 +1236,8 @@ public:
|
||||
|
||||
class scrub_compaction final : public regular_compaction {
|
||||
public:
|
||||
static void report_invalid_partition(compaction_type type, mutation_fragment_stream_validator& validator, const dht::decorated_key& new_key,
|
||||
std::string_view action = "") {
|
||||
const auto& schema = validator.schema();
|
||||
const auto& current_key = validator.previous_partition_key();
|
||||
clogger.error("[{} compaction {}.{}] Invalid partition {} ({}), partition is out-of-order compared to previous partition {} ({}){}{}",
|
||||
type,
|
||||
schema.ks_name(),
|
||||
schema.cf_name(),
|
||||
new_key.key().with_schema(schema),
|
||||
new_key,
|
||||
current_key.key().with_schema(schema),
|
||||
current_key,
|
||||
action.empty() ? "" : "; ",
|
||||
action);
|
||||
}
|
||||
static void report_invalid_partition_start(compaction_type type, mutation_fragment_stream_validator& validator, const dht::decorated_key& new_key,
|
||||
std::string_view action = "") {
|
||||
const auto& schema = validator.schema();
|
||||
const auto& current_key = validator.previous_partition_key();
|
||||
clogger.error("[{} compaction {}.{}] Invalid partition start for partition {} ({}), previous partition {} ({}) didn't end with a partition-end fragment{}{}",
|
||||
type,
|
||||
schema.ks_name(),
|
||||
schema.cf_name(),
|
||||
new_key.key().with_schema(schema),
|
||||
new_key,
|
||||
current_key.key().with_schema(schema),
|
||||
current_key,
|
||||
action.empty() ? "" : "; ",
|
||||
action);
|
||||
}
|
||||
static void report_invalid_mutation_fragment(compaction_type type, mutation_fragment_stream_validator& validator, const mutation_fragment_v2& mf,
|
||||
std::string_view action = "") {
|
||||
const auto& schema = validator.schema();
|
||||
const auto& key = validator.previous_partition_key();
|
||||
const auto prev_pos = validator.previous_position();
|
||||
clogger.error("[{} compaction {}.{}] Invalid {} fragment{} ({}) in partition {} ({}),"
|
||||
" fragment is out-of-order compared to previous {} fragment{} ({}){}{}",
|
||||
type,
|
||||
schema.ks_name(),
|
||||
schema.cf_name(),
|
||||
mf.mutation_fragment_kind(),
|
||||
mf.has_key() ? format(" with key {}", mf.key().with_schema(schema)) : "",
|
||||
mf.position(),
|
||||
key.key().with_schema(schema),
|
||||
key,
|
||||
prev_pos.region(),
|
||||
prev_pos.has_key() ? format(" with key {}", prev_pos.key().with_schema(schema)) : "",
|
||||
prev_pos,
|
||||
action.empty() ? "" : "; ",
|
||||
action);
|
||||
}
|
||||
static void report_invalid_end_of_stream(compaction_type type, mutation_fragment_stream_validator& validator, std::string_view action = "") {
|
||||
const auto& schema = validator.schema();
|
||||
const auto& key = validator.previous_partition_key();
|
||||
clogger.error("[{} compaction {}.{}] Invalid end-of-stream, last partition {} ({}) didn't end with a partition-end fragment{}{}",
|
||||
type, schema.ks_name(), schema.cf_name(), key.key().with_schema(schema), key, action.empty() ? "" : "; ", action);
|
||||
static void report_validation_error(compaction_type type, const ::schema& schema, sstring what, std::string_view action = "") {
|
||||
clogger.error("[{} compaction {}.{}] {}{}{}", type, schema.ks_name(), schema.cf_name(), what, action.empty() ? "" : "; ", action);
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -1314,9 +1260,9 @@ private:
|
||||
++_validation_errors;
|
||||
}
|
||||
|
||||
void on_unexpected_partition_start(const mutation_fragment_v2& ps) {
|
||||
auto report_fn = [this, &ps] (std::string_view action = "") {
|
||||
report_invalid_partition_start(compaction_type::Scrub, _validator, ps.as_partition_start().key(), action);
|
||||
void on_unexpected_partition_start(const mutation_fragment_v2& ps, sstring error) {
|
||||
auto report_fn = [this, error] (std::string_view action = "") {
|
||||
report_validation_error(compaction_type::Scrub, *_schema, error, action);
|
||||
};
|
||||
maybe_abort_scrub(report_fn);
|
||||
report_fn("Rectifying by adding assumed missing partition-end");
|
||||
@@ -1338,9 +1284,9 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
skip on_invalid_partition(const dht::decorated_key& new_key) {
|
||||
auto report_fn = [this, &new_key] (std::string_view action = "") {
|
||||
report_invalid_partition(compaction_type::Scrub, _validator, new_key, action);
|
||||
skip on_invalid_partition(const dht::decorated_key& new_key, sstring error) {
|
||||
auto report_fn = [this, error] (std::string_view action = "") {
|
||||
report_validation_error(compaction_type::Scrub, *_schema, error, action);
|
||||
};
|
||||
maybe_abort_scrub(report_fn);
|
||||
if (_scrub_mode == compaction_type_options::scrub::mode::segregate) {
|
||||
@@ -1354,9 +1300,9 @@ private:
|
||||
return skip::yes;
|
||||
}
|
||||
|
||||
skip on_invalid_mutation_fragment(const mutation_fragment_v2& mf) {
|
||||
auto report_fn = [this, &mf] (std::string_view action = "") {
|
||||
report_invalid_mutation_fragment(compaction_type::Scrub, _validator, mf, "");
|
||||
skip on_invalid_mutation_fragment(const mutation_fragment_v2& mf, sstring error) {
|
||||
auto report_fn = [this, error] (std::string_view action = "") {
|
||||
report_validation_error(compaction_type::Scrub, *_schema, error, action);
|
||||
};
|
||||
maybe_abort_scrub(report_fn);
|
||||
|
||||
@@ -1391,9 +1337,9 @@ private:
|
||||
return skip::yes;
|
||||
}
|
||||
|
||||
void on_invalid_end_of_stream() {
|
||||
auto report_fn = [this] (std::string_view action = "") {
|
||||
report_invalid_end_of_stream(compaction_type::Scrub, _validator, action);
|
||||
void on_invalid_end_of_stream(sstring error) {
|
||||
auto report_fn = [this, error] (std::string_view action = "") {
|
||||
report_validation_error(compaction_type::Scrub, *_schema, error, action);
|
||||
};
|
||||
maybe_abort_scrub(report_fn);
|
||||
// Handle missing partition_end
|
||||
@@ -1412,21 +1358,27 @@ private:
|
||||
// and shouldn't be verified. We know the last fragment the
|
||||
// validator saw is a partition-start, passing it another one
|
||||
// will confuse it.
|
||||
if (!_skip_to_next_partition && !_validator(mf)) {
|
||||
on_unexpected_partition_start(mf);
|
||||
if (!_skip_to_next_partition) {
|
||||
if (auto res = _validator(mf); !res) {
|
||||
on_unexpected_partition_start(mf, res.what());
|
||||
}
|
||||
// Continue processing this partition start.
|
||||
}
|
||||
_skip_to_next_partition = false;
|
||||
// Then check that the partition monotonicity stands.
|
||||
const auto& dk = mf.as_partition_start().key();
|
||||
if (!_validator(dk) && on_invalid_partition(dk) == skip::yes) {
|
||||
continue;
|
||||
if (auto res = _validator(dk); !res) {
|
||||
if (on_invalid_partition(dk, res.what()) == skip::yes) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else if (_skip_to_next_partition) {
|
||||
continue;
|
||||
} else {
|
||||
if (!_validator(mf) && on_invalid_mutation_fragment(mf) == skip::yes) {
|
||||
continue;
|
||||
if (auto res = _validator(mf); !res) {
|
||||
if (on_invalid_mutation_fragment(mf, res.what()) == skip::yes) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
push_mutation_fragment(std::move(mf));
|
||||
@@ -1435,8 +1387,8 @@ private:
|
||||
_end_of_stream = _reader.is_end_of_stream() && _reader.is_buffer_empty();
|
||||
|
||||
if (_end_of_stream) {
|
||||
if (!_validator.on_end_of_stream()) {
|
||||
on_invalid_end_of_stream();
|
||||
if (auto res = _validator.on_end_of_stream(); !res) {
|
||||
on_invalid_end_of_stream(res.what());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1717,81 +1669,29 @@ static std::unique_ptr<compaction> make_compaction(table_state& table_s, sstable
|
||||
return descriptor.options.visit(visitor_factory);
|
||||
}
|
||||
|
||||
future<uint64_t> scrub_validate_mode_validate_reader(flat_mutation_reader_v2 reader, const compaction_data& cdata) {
|
||||
auto schema = reader.schema();
|
||||
|
||||
uint64_t errors = 0;
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
auto validator = mutation_fragment_stream_validator(*schema);
|
||||
|
||||
while (auto mf_opt = co_await reader()) {
|
||||
if (cdata.is_stop_requested()) [[unlikely]] {
|
||||
// Compaction manager will catch this exception and re-schedule the compaction.
|
||||
throw compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested);
|
||||
}
|
||||
|
||||
const auto& mf = *mf_opt;
|
||||
|
||||
if (mf.is_partition_start()) {
|
||||
const auto& ps = mf.as_partition_start();
|
||||
if (!validator(mf)) {
|
||||
scrub_compaction::report_invalid_partition_start(compaction_type::Scrub, validator, ps.key());
|
||||
validator.reset(mf);
|
||||
++errors;
|
||||
}
|
||||
if (!validator(ps.key())) {
|
||||
scrub_compaction::report_invalid_partition(compaction_type::Scrub, validator, ps.key());
|
||||
validator.reset(ps.key());
|
||||
++errors;
|
||||
}
|
||||
} else {
|
||||
if (!validator(mf)) {
|
||||
scrub_compaction::report_invalid_mutation_fragment(compaction_type::Scrub, validator, mf);
|
||||
validator.reset(mf);
|
||||
++errors;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!validator.on_end_of_stream()) {
|
||||
scrub_compaction::report_invalid_end_of_stream(compaction_type::Scrub, validator);
|
||||
++errors;
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
co_await reader.close();
|
||||
|
||||
if (ex) {
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
}
|
||||
|
||||
co_return errors;
|
||||
}
|
||||
|
||||
static future<compaction_result> scrub_sstables_validate_mode(sstables::compaction_descriptor descriptor, compaction_data& cdata, table_state& table_s) {
|
||||
auto schema = table_s.schema();
|
||||
auto permit = table_s.make_compaction_reader_permit();
|
||||
|
||||
uint64_t validation_errors;
|
||||
|
||||
formatted_sstables_list sstables_list_msg;
|
||||
auto sstables = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(schema, false));
|
||||
for (const auto& sst : descriptor.sstables) {
|
||||
sstables_list_msg += sst;
|
||||
sstables->insert(sst);
|
||||
clogger.info("Scrubbing in validate mode {}", sst->get_filename());
|
||||
|
||||
validation_errors += co_await sst->validate(permit, descriptor.io_priority, cdata.abort, [&schema] (sstring what) {
|
||||
scrub_compaction::report_validation_error(compaction_type::Scrub, *schema, what);
|
||||
});
|
||||
// Did validation actually finish because aborted?
|
||||
if (cdata.is_stop_requested()) {
|
||||
// Compaction manager will catch this exception and re-schedule the compaction.
|
||||
throw compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested);
|
||||
}
|
||||
|
||||
clogger.info("Finished scrubbing in validate mode {} - sstable is {}", sst->get_filename(), validation_errors == 0 ? "valid" : "invalid");
|
||||
}
|
||||
|
||||
clogger.info("Scrubbing in validate mode {}", sstables_list_msg);
|
||||
|
||||
auto permit = table_s.make_compaction_reader_permit();
|
||||
auto reader = sstables->make_crawling_reader(schema, permit, descriptor.io_priority, nullptr);
|
||||
|
||||
const auto validation_errors = co_await scrub_validate_mode_validate_reader(std::move(reader), cdata);
|
||||
|
||||
clogger.info("Finished scrubbing in validate mode {} - sstable(s) are {}", sstables_list_msg, validation_errors == 0 ? "valid" : "invalid");
|
||||
|
||||
if (validation_errors != 0) {
|
||||
for (auto& sst : *sstables->all()) {
|
||||
for (auto& sst : descriptor.sstables) {
|
||||
co_await sst->change_state(sstables::quarantine_dir);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,7 +130,4 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
|
||||
// For tests, can drop after we virtualize sstables.
|
||||
flat_mutation_reader_v2 make_scrubbing_reader(flat_mutation_reader_v2 rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors);
|
||||
|
||||
// For tests, can drop after we virtualize sstables.
|
||||
future<uint64_t> scrub_validate_mode_validate_reader(flat_mutation_reader_v2 rd, const compaction_data& info);
|
||||
|
||||
}
|
||||
|
||||
@@ -23,6 +23,22 @@ enum class mutation_fragment_stream_validation_level {
|
||||
/// Tracks and validates the monotonicity of the passed in fragment kinds,
|
||||
/// position in partition, token or partition keys.
|
||||
class mutation_fragment_stream_validator {
|
||||
public:
|
||||
class validation_result {
|
||||
sstring _what;
|
||||
private:
|
||||
explicit validation_result() = default;
|
||||
explicit validation_result(sstring what) : _what(std::move(what)) { }
|
||||
public:
|
||||
static validation_result invalid(sstring what) { return validation_result(what); }
|
||||
static validation_result valid() { return validation_result(); }
|
||||
bool is_valid() const { return _what.empty(); }
|
||||
const sstring& what() const { return _what; }
|
||||
explicit operator bool() const { return is_valid(); }
|
||||
bool operator!() const { return !is_valid(); }
|
||||
};
|
||||
|
||||
private:
|
||||
const ::schema& _schema;
|
||||
mutation_fragment_v2::kind _prev_kind;
|
||||
position_in_partition _prev_pos;
|
||||
@@ -30,8 +46,8 @@ class mutation_fragment_stream_validator {
|
||||
tombstone _current_tombstone;
|
||||
|
||||
private:
|
||||
bool validate(dht::token t, const partition_key* pkey);
|
||||
bool validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
|
||||
validation_result validate(dht::token t, const partition_key* pkey);
|
||||
validation_result validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
|
||||
std::optional<tombstone> new_current_tombstone);
|
||||
public:
|
||||
explicit mutation_fragment_stream_validator(const schema& s);
|
||||
@@ -49,8 +65,8 @@ public:
|
||||
/// the current tombstone (range tombstone change fragments).
|
||||
///
|
||||
/// \returns true if the fragment kind is valid.
|
||||
bool operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone);
|
||||
bool operator()(mutation_fragment::kind kind);
|
||||
validation_result operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone);
|
||||
validation_result operator()(mutation_fragment::kind kind);
|
||||
|
||||
/// Validates the monotonicity of the mutation fragment kind and position.
|
||||
///
|
||||
@@ -64,8 +80,8 @@ public:
|
||||
/// the current tombstone (range tombstone change fragments).
|
||||
///
|
||||
/// \returns true if the mutation fragment kind is valid.
|
||||
bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
|
||||
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
|
||||
validation_result operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
|
||||
validation_result operator()(mutation_fragment::kind kind, position_in_partition_view pos);
|
||||
|
||||
/// Validates the monotonicity of the mutation fragment.
|
||||
///
|
||||
@@ -73,8 +89,8 @@ public:
|
||||
/// See said overload for more details.
|
||||
///
|
||||
/// \returns true if the mutation fragment kind is valid.
|
||||
bool operator()(const mutation_fragment_v2& mf);
|
||||
bool operator()(const mutation_fragment& mf);
|
||||
validation_result operator()(const mutation_fragment_v2& mf);
|
||||
validation_result operator()(const mutation_fragment& mf);
|
||||
|
||||
/// Validates the monotonicity of the token.
|
||||
///
|
||||
@@ -84,7 +100,7 @@ public:
|
||||
/// overload.
|
||||
///
|
||||
/// \returns true if the token is valid.
|
||||
bool operator()(dht::token t);
|
||||
validation_result operator()(dht::token t);
|
||||
|
||||
/// Validates the monotonicity of the partition.
|
||||
///
|
||||
@@ -94,7 +110,7 @@ public:
|
||||
/// overload.
|
||||
///
|
||||
/// \returns true if the partition key is valid.
|
||||
bool operator()(const dht::decorated_key& dk);
|
||||
validation_result operator()(const dht::decorated_key& dk);
|
||||
|
||||
/// Reset the state of the validator to the given partition
|
||||
///
|
||||
@@ -120,7 +136,7 @@ public:
|
||||
///
|
||||
/// \returns false if the last partition wasn't closed, i.e. the last
|
||||
/// fragment wasn't a `partition_end` fragment.
|
||||
bool on_end_of_stream();
|
||||
validation_result on_end_of_stream();
|
||||
|
||||
/// The previous valid fragment kind.
|
||||
mutation_fragment_v2::kind previous_mutation_fragment_kind() const {
|
||||
@@ -171,8 +187,6 @@ class mutation_fragment_stream_validating_filter {
|
||||
mutation_fragment_stream_validation_level _validation_level;
|
||||
|
||||
private:
|
||||
sstring full_name() const;
|
||||
|
||||
mutation_fragment_stream_validating_filter(const char* name_literal, sstring name_value, const schema& s, mutation_fragment_stream_validation_level level);
|
||||
|
||||
public:
|
||||
@@ -187,6 +201,8 @@ public:
|
||||
mutation_fragment_stream_validating_filter(mutation_fragment_stream_validating_filter&&) = delete;
|
||||
mutation_fragment_stream_validating_filter(const mutation_fragment_stream_validating_filter&) = delete;
|
||||
|
||||
sstring full_name() const;
|
||||
|
||||
bool operator()(const dht::decorated_key& dk);
|
||||
bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
|
||||
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
|
||||
@@ -198,4 +214,5 @@ public:
|
||||
/// Equivalent to `operator()(partition_end{})`
|
||||
bool on_end_of_partition();
|
||||
void on_end_of_stream();
|
||||
mutation_fragment_stream_validator& validator() { return _validator; }
|
||||
};
|
||||
|
||||
@@ -41,13 +41,31 @@ mutation_fragment_stream_validator::mutation_fragment_stream_validator(const ::s
|
||||
, _prev_partition_key(dht::minimum_token(), partition_key::make_empty()) {
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::validate(dht::token t, const partition_key* pkey) {
|
||||
static sstring
|
||||
format_partition_key(const schema& s, const dht::decorated_key& pkey, const char* prefix = "") {
|
||||
if (pkey.key().is_empty()) {
|
||||
return "";
|
||||
}
|
||||
return format("{}{} ({})", prefix, pkey.key().with_schema(s), pkey);
|
||||
}
|
||||
|
||||
static mutation_fragment_stream_validator::validation_result
|
||||
ooo_key_result(const schema& s, dht::token t, const partition_key* pkey, dht::decorated_key prev_key) {
|
||||
return mutation_fragment_stream_validator::validation_result::invalid(format("out-of-order {} {}, previous {} was {}",
|
||||
pkey ? "partition key" : "token",
|
||||
pkey ? format("{} ({{key: {}, token: {}}})", pkey->with_schema(s), *pkey, t) : format("{}", t),
|
||||
prev_key.key().is_empty() ? "token" : "partition key",
|
||||
prev_key.key().is_empty() ? format("{}", prev_key.token()) : format_partition_key(s, prev_key)));
|
||||
}
|
||||
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::validate(dht::token t, const partition_key* pkey) {
|
||||
if (_prev_partition_key.token() > t) {
|
||||
return false;
|
||||
return ooo_key_result(_schema, t, pkey, _prev_partition_key);
|
||||
}
|
||||
partition_key::tri_compare cmp(_schema);
|
||||
if (_prev_partition_key.token() == t && pkey && cmp(_prev_partition_key.key(), *pkey) >= 0) {
|
||||
return false;
|
||||
return ooo_key_result(_schema, t, pkey, _prev_partition_key);
|
||||
}
|
||||
_prev_partition_key._token = t;
|
||||
if (pkey) {
|
||||
@@ -60,22 +78,26 @@ bool mutation_fragment_stream_validator::validate(dht::token t, const partition_
|
||||
_prev_partition_key._key = partition_key::make_empty();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return validation_result::valid();
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::operator()(const dht::decorated_key& dk) {
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(const dht::decorated_key& dk) {
|
||||
return validate(dk.token(), &dk.key());
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::operator()(dht::token t) {
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(dht::token t) {
|
||||
return validate(t, nullptr);
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
|
||||
std::optional<tombstone> new_current_tombstone) {
|
||||
// Check for unclosed range tombstone on partition end
|
||||
if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
|
||||
return false;
|
||||
return validation_result::invalid(format("invalid partition-end, partition {} has an active range tombstone {}",
|
||||
format_partition_key(_schema, _prev_partition_key), _current_tombstone));
|
||||
}
|
||||
|
||||
auto valid = true;
|
||||
@@ -96,7 +118,10 @@ bool mutation_fragment_stream_validator::validate(mutation_fragment_v2::kind kin
|
||||
break;
|
||||
}
|
||||
if (!valid) {
|
||||
return false;
|
||||
return validation_result::invalid(format("out-of-order mutation fragment {}{}, previous mutation fragment was {}",
|
||||
kind,
|
||||
format_partition_key(_schema, _prev_partition_key, " in partition "),
|
||||
_prev_kind));
|
||||
}
|
||||
|
||||
if (pos && _prev_kind != mutation_fragment_v2::kind::partition_end) {
|
||||
@@ -108,7 +133,12 @@ bool mutation_fragment_stream_validator::validate(mutation_fragment_v2::kind kin
|
||||
valid = res < 0;
|
||||
}
|
||||
if (!valid) {
|
||||
return false;
|
||||
return validation_result::invalid(format("out-of-order {} at position {}{}, previous clustering element was {} at position {}",
|
||||
kind,
|
||||
*pos,
|
||||
format_partition_key(_schema, _prev_partition_key, " in partition "),
|
||||
_prev_pos,
|
||||
_prev_kind));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,34 +168,46 @@ bool mutation_fragment_stream_validator::validate(mutation_fragment_v2::kind kin
|
||||
if (new_current_tombstone) {
|
||||
_current_tombstone = *new_current_tombstone;
|
||||
}
|
||||
return true;
|
||||
return validation_result::valid();
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
|
||||
std::optional<tombstone> new_current_tombstone) {
|
||||
return validate(kind, pos, new_current_tombstone);
|
||||
}
|
||||
bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
|
||||
return validate(to_mutation_fragment_kind_v2(kind), pos, {});
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
|
||||
return validate(mf.mutation_fragment_kind(), mf.position(),
|
||||
mf.is_range_tombstone_change() ? std::optional(mf.as_range_tombstone_change().tombstone()) : std::nullopt);
|
||||
}
|
||||
bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
|
||||
return validate(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position(), {});
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone) {
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone) {
|
||||
return validate(kind, {}, new_current_tombstone);
|
||||
}
|
||||
bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind) {
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind) {
|
||||
return validate(to_mutation_fragment_kind_v2(kind), {}, {});
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::on_end_of_stream() {
|
||||
return _prev_kind == mutation_fragment_v2::kind::partition_end;
|
||||
mutation_fragment_stream_validator::validation_result
|
||||
mutation_fragment_stream_validator::on_end_of_stream() {
|
||||
if (_prev_kind == mutation_fragment_v2::kind::partition_end) {
|
||||
return validation_result::valid();
|
||||
}
|
||||
return validation_result::invalid(format("invalid end-of-stream, last partition{} was not closed, last fragment was {}",
|
||||
format_partition_key(_schema, _prev_partition_key, " "),
|
||||
_prev_kind));
|
||||
}
|
||||
|
||||
void mutation_fragment_stream_validator::reset(dht::decorated_key dk) {
|
||||
@@ -191,9 +233,9 @@ void mutation_fragment_stream_validator::reset(const mutation_fragment& mf) {
|
||||
|
||||
namespace {
|
||||
|
||||
[[noreturn]] void on_validation_error(seastar::logger& l, const seastar::sstring& reason) {
|
||||
[[noreturn]] void on_validation_error(seastar::logger& l, const mutation_fragment_stream_validating_filter& zis, mutation_fragment_stream_validator::validation_result res) {
|
||||
try {
|
||||
on_internal_error(l, reason);
|
||||
on_internal_error(l, format("[validator {} for {}] {}", fmt::ptr(&zis), zis.full_name(), res.what()));
|
||||
} catch (std::runtime_error& e) {
|
||||
throw invalid_mutation_fragment_stream(e);
|
||||
}
|
||||
@@ -206,17 +248,15 @@ bool mutation_fragment_stream_validating_filter::operator()(const dht::decorated
|
||||
return true;
|
||||
}
|
||||
if (_validation_level == mutation_fragment_stream_validation_level::token) {
|
||||
if (_validator(dk.token())) {
|
||||
return true;
|
||||
if (auto res = _validator(dk.token()); !res) {
|
||||
on_validation_error(mrlog, *this, res);
|
||||
}
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected token: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_token(), dk.token()));
|
||||
return true;
|
||||
} else {
|
||||
if (_validator(dk)) {
|
||||
return true;
|
||||
if (auto res = _validator(dk); !res) {
|
||||
on_validation_error(mrlog, *this, res);
|
||||
}
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected partition key: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_partition_key(), dk));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,34 +311,18 @@ mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_
|
||||
|
||||
bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
|
||||
std::optional<tombstone> new_current_tombstone) {
|
||||
if (_validation_level < mutation_fragment_stream_validation_level::partition_region) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool valid = false;
|
||||
std::optional<mutation_fragment_stream_validator::validation_result> res;
|
||||
|
||||
mrlog.debug("[validator {}] {}:{} new_current_tombstone: {}", static_cast<void*>(this), kind, pos, new_current_tombstone);
|
||||
|
||||
if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
|
||||
valid = _validator(kind, pos, new_current_tombstone);
|
||||
res = _validator(kind, pos, new_current_tombstone);
|
||||
} else {
|
||||
valid = _validator(kind, new_current_tombstone);
|
||||
res = _validator(kind, new_current_tombstone);
|
||||
}
|
||||
|
||||
if (__builtin_expect(!valid, false)) {
|
||||
if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}:{}, current {}:{}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_partition_key(), _validator.previous_mutation_fragment_kind(), _validator.previous_position(), kind, pos));
|
||||
} else if (_validation_level >= mutation_fragment_stream_validation_level::partition_key) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_partition_key(), _validator.previous_mutation_fragment_kind(), kind));
|
||||
} else if (kind == mutation_fragment_v2::kind::partition_end && _validator.current_tombstone()) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Partition ended with active tombstone: {}",
|
||||
static_cast<void*>(this), full_name(), _validator.current_tombstone()));
|
||||
} else {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_mutation_fragment_kind(), kind));
|
||||
}
|
||||
if (__builtin_expect(!res->is_valid(), false)) {
|
||||
on_validation_error(mrlog, *this, *res);
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -345,9 +369,8 @@ void mutation_fragment_stream_validating_filter::on_end_of_stream() {
|
||||
return;
|
||||
}
|
||||
mrlog.debug("[validator {}] EOS", static_cast<const void*>(this));
|
||||
if (!_validator.on_end_of_stream()) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Stream ended with unclosed partition: {}", static_cast<const void*>(this), full_name(),
|
||||
_validator.previous_mutation_fragment_kind()));
|
||||
if (auto res = _validator.on_end_of_stream(); !res) {
|
||||
on_validation_error(mrlog, *this, res);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -56,5 +56,15 @@ flat_mutation_reader_v2 make_crawling_reader(
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& monitor);
|
||||
|
||||
// Validate the content of the sstable with the mutation_fragment_stream_valdiator,
|
||||
// additionally cross checking that the content is laid out as expected by the
|
||||
// index and promoted index respectively.
|
||||
future<uint64_t> validate(
|
||||
shared_sstable sstable,
|
||||
reader_permit permit,
|
||||
const io_priority_class& pc,
|
||||
abort_source& abort,
|
||||
std::function<void(sstring)> error_handler);
|
||||
|
||||
} // namespace mx
|
||||
} // namespace sstables
|
||||
|
||||
@@ -1840,6 +1840,66 @@ sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partition
|
||||
return sstable_writer(*this, s, estimated_partitions, cfg, enc_stats, pc, shard);
|
||||
}
|
||||
|
||||
future<uint64_t> sstable::validate(reader_permit permit, const io_priority_class& pc, abort_source& abort,
|
||||
std::function<void(sstring)> error_handler) {
|
||||
if (_version >= sstable_version_types::mc) {
|
||||
co_return co_await mx::validate(shared_from_this(), std::move(permit), pc, abort, std::move(error_handler));
|
||||
}
|
||||
|
||||
auto reader = make_crawling_reader(_schema, permit, pc, nullptr);
|
||||
|
||||
uint64_t errors = 0;
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
auto validator = mutation_fragment_stream_validator(*_schema);
|
||||
|
||||
while (auto mf_opt = co_await reader()) {
|
||||
if (abort.abort_requested()) [[unlikely]] {
|
||||
break;
|
||||
}
|
||||
|
||||
const auto& mf = *mf_opt;
|
||||
|
||||
if (auto res = validator(mf); !res) {
|
||||
error_handler(res.what());
|
||||
validator.reset(mf);
|
||||
++errors;
|
||||
}
|
||||
|
||||
if (mf.is_partition_start()) {
|
||||
const auto& ps = mf.as_partition_start();
|
||||
if (auto res = validator(ps.key()); !res) {
|
||||
error_handler(res.what());
|
||||
validator.reset(ps.key());
|
||||
++errors;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (auto res = validator.on_end_of_stream(); !res) {
|
||||
error_handler(res.what());
|
||||
++errors;
|
||||
}
|
||||
} catch (const malformed_sstable_exception& e) {
|
||||
try {
|
||||
error_handler(format("unrecoverable error: {}", e));
|
||||
++errors;
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
co_await reader.close();
|
||||
|
||||
if (ex) {
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
}
|
||||
|
||||
co_return errors;
|
||||
}
|
||||
|
||||
// Encoding stats for compaction are based on the sstable's stats metadata
|
||||
// since, in contract to the mc-format encoding_stats that are evaluated
|
||||
// before the sstable data is written, the stats metadata is updated during
|
||||
|
||||
@@ -263,6 +263,16 @@ public:
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
shard_id shard = this_shard_id());
|
||||
|
||||
// Validates the content of the sstable.
|
||||
// Reports all errors via the provided error handler.
|
||||
// Returns the count of all validation errors found.
|
||||
// Can be aborted via the abort-source parameter.
|
||||
// If aborted, either via the abort-source or via unrecoverable errors
|
||||
// (e.g. parse error), it will return with validation error count seen up to
|
||||
// the abort. In the latter case it will call the error-handler before doing so.
|
||||
future<uint64_t> validate(reader_permit permit, const io_priority_class& pc, abort_source& abort,
|
||||
std::function<void(sstring)> error_handler);
|
||||
|
||||
encoding_stats get_encoding_stats_for_compaction() const;
|
||||
|
||||
future<> seal_sstable(bool backup);
|
||||
|
||||
@@ -457,7 +457,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
|
||||
bool valid = true;
|
||||
for (const auto& mf : mfs) {
|
||||
testlog.trace("validate fragment [{}] {} @ {}", i, mf.mutation_fragment_kind(), mf.position());
|
||||
valid &= validator(mf);
|
||||
valid &= bool(validator(mf));
|
||||
if (expect_valid) {
|
||||
if (!valid) {
|
||||
BOOST_FAIL(fmt::format("Unexpected invalid fragment {} @ {}", mf.mutation_fragment_kind(), mf.position()));
|
||||
@@ -470,7 +470,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
|
||||
++i;
|
||||
}
|
||||
if (expect_valid || i <= at) {
|
||||
valid &= validator.on_end_of_stream();
|
||||
valid &= bool(validator.on_end_of_stream());
|
||||
BOOST_REQUIRE(valid == expect_valid);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2184,7 +2184,8 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) {
|
||||
}, test_cfg);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(scrub_validate_mode_validate_reader_test) {
|
||||
SEASTAR_TEST_CASE(sstable_validate_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto schema = schema_builder("ks", get_name())
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
@@ -2195,6 +2196,8 @@ SEASTAR_THREAD_TEST_CASE(scrub_validate_mode_validate_reader_test) {
|
||||
|
||||
std::deque<mutation_fragment_v2> frags;
|
||||
|
||||
abort_source abort;
|
||||
|
||||
const auto ts = api::timestamp_type{1};
|
||||
auto local_keys = tests::generate_partition_keys(5, schema);
|
||||
|
||||
@@ -2224,8 +2227,26 @@ SEASTAR_THREAD_TEST_CASE(scrub_validate_mode_validate_reader_test) {
|
||||
clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r)));
|
||||
};
|
||||
|
||||
auto make_sst = [&] (std::deque<mutation_fragment_v2> frags) {
|
||||
auto rd = make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags));
|
||||
auto config = env.manager().configure_writer();
|
||||
config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose
|
||||
auto sst = env.make_sstable(schema);
|
||||
sst->write_components(std::move(rd), local_keys.size(), schema, config, encoding_stats{}).get();
|
||||
sst->load().get();
|
||||
return sst;
|
||||
};
|
||||
|
||||
auto info = make_lw_shared<compaction_data>();
|
||||
|
||||
struct error_handler {
|
||||
uint64_t& count;
|
||||
void operator()(sstring what) {
|
||||
++count;
|
||||
testlog.trace("validation error: ", what);
|
||||
}
|
||||
};
|
||||
|
||||
BOOST_TEST_MESSAGE("valid");
|
||||
{
|
||||
frags.emplace_back(make_partition_start(0));
|
||||
@@ -2236,8 +2257,11 @@ SEASTAR_THREAD_TEST_CASE(scrub_validate_mode_validate_reader_test) {
|
||||
frags.emplace_back(make_partition_start(2));
|
||||
frags.emplace_back(make_partition_end());
|
||||
|
||||
const auto errors = scrub_validate_mode_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
|
||||
uint64_t count = 0;
|
||||
auto sst = make_sst(std::move(frags));
|
||||
const auto errors = sst->validate(permit, default_priority_class(), abort, error_handler{count}).get();
|
||||
BOOST_REQUIRE_EQUAL(errors, 0);
|
||||
BOOST_REQUIRE_EQUAL(errors, count);
|
||||
}
|
||||
|
||||
BOOST_TEST_MESSAGE("out-of-order clustering row");
|
||||
@@ -2247,52 +2271,31 @@ SEASTAR_THREAD_TEST_CASE(scrub_validate_mode_validate_reader_test) {
|
||||
frags.emplace_back(make_clustering_row(0));
|
||||
frags.emplace_back(make_partition_end());
|
||||
|
||||
const auto errors = scrub_validate_mode_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
|
||||
BOOST_REQUIRE_NE(errors, 0);
|
||||
}
|
||||
|
||||
BOOST_TEST_MESSAGE("out-of-order static row");
|
||||
{
|
||||
frags.emplace_back(make_partition_start(0));
|
||||
frags.emplace_back(make_clustering_row(0));
|
||||
frags.emplace_back(make_static_row());
|
||||
frags.emplace_back(make_partition_end());
|
||||
|
||||
const auto errors = scrub_validate_mode_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
|
||||
BOOST_REQUIRE_NE(errors, 0);
|
||||
}
|
||||
|
||||
BOOST_TEST_MESSAGE("out-of-order partition start");
|
||||
{
|
||||
frags.emplace_back(make_partition_start(0));
|
||||
frags.emplace_back(make_clustering_row(1));
|
||||
frags.emplace_back(make_partition_start(2));
|
||||
frags.emplace_back(make_partition_end());
|
||||
|
||||
const auto errors = scrub_validate_mode_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
|
||||
uint64_t count = 0;
|
||||
auto sst = make_sst(std::move(frags));
|
||||
const auto errors = sst->validate(permit, default_priority_class(), abort, error_handler{count}).get();
|
||||
BOOST_REQUIRE_NE(errors, 0);
|
||||
BOOST_REQUIRE_EQUAL(errors, count);
|
||||
}
|
||||
|
||||
BOOST_TEST_MESSAGE("out-of-order partition");
|
||||
{
|
||||
frags.emplace_back(make_partition_start(0));
|
||||
frags.emplace_back(make_clustering_row(0));
|
||||
frags.emplace_back(make_partition_end());
|
||||
frags.emplace_back(make_partition_start(2));
|
||||
frags.emplace_back(make_clustering_row(0));
|
||||
frags.emplace_back(make_partition_end());
|
||||
frags.emplace_back(make_partition_start(0));
|
||||
frags.emplace_back(make_partition_start(1));
|
||||
frags.emplace_back(make_partition_end());
|
||||
|
||||
const auto errors = scrub_validate_mode_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
|
||||
BOOST_REQUIRE_NE(errors, 0);
|
||||
}
|
||||
|
||||
BOOST_TEST_MESSAGE("missing end-of-partition at EOS");
|
||||
{
|
||||
frags.emplace_back(make_partition_start(0));
|
||||
frags.emplace_back(make_clustering_row(0));
|
||||
|
||||
const auto errors = scrub_validate_mode_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
|
||||
uint64_t count = 0;
|
||||
auto sst = make_sst(std::move(frags));
|
||||
const auto errors = sst->validate(permit, default_priority_class(), abort, error_handler{count}).get();
|
||||
BOOST_REQUIRE_NE(errors, 0);
|
||||
BOOST_REQUIRE_EQUAL(errors, count);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
|
||||
|
||||
@@ -13,11 +13,14 @@ import pytest
|
||||
|
||||
from cassandra.cluster import Cluster, NoHostAvailable
|
||||
from cassandra.connection import DRIVER_NAME, DRIVER_VERSION
|
||||
import os
|
||||
import ssl
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import random
|
||||
|
||||
from util import unique_name, new_test_table, cql_session
|
||||
from util import unique_name, new_test_table, cql_session, local_process_id
|
||||
|
||||
|
||||
print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
|
||||
@@ -167,3 +170,34 @@ def random_seed():
|
||||
|
||||
# TODO: use new_test_table and "yield from" to make shared test_table
|
||||
# fixtures with some common schemas.
|
||||
|
||||
# To run the Scylla tools, we need to run Scylla executable itself, so we
|
||||
# need to find the path of the executable that was used to run Scylla for
|
||||
# this test. We do this by trying to find a local process which is listening
|
||||
# to the address and port to which our our CQL connection is connected.
|
||||
# If such a process exists, we verify that it is Scylla, and return the
|
||||
# executable's path. If we can't find the Scylla executable we use
|
||||
# pytest.skip() to skip tests relying on this executable.
|
||||
@pytest.fixture(scope="session")
|
||||
def scylla_path(cql):
|
||||
pid = local_process_id(cql)
|
||||
if not pid:
|
||||
pytest.skip("Can't find local Scylla process")
|
||||
# Now that we know the process id, use /proc to find the executable.
|
||||
try:
|
||||
path = os.readlink(f'/proc/{pid}/exe')
|
||||
except:
|
||||
pytest.skip("Can't find local Scylla executable")
|
||||
# Confirm that this executable is a real tool-providing Scylla by trying
|
||||
# to run it with the "--list-tools" option
|
||||
try:
|
||||
subprocess.check_output([path, '--list-tools'])
|
||||
except:
|
||||
pytest.skip("Local server isn't Scylla")
|
||||
return path
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def temp_workdir():
|
||||
""" Creates a temporary work directory, for the scope of a single test. """
|
||||
with tempfile.TemporaryDirectory() as workdir:
|
||||
yield workdir
|
||||
|
||||
313
test/cql-pytest/test_sstable_validation.py
Normal file
313
test/cql-pytest/test_sstable_validation.py
Normal file
@@ -0,0 +1,313 @@
|
||||
# Copyright 2023-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
#############################################################################
|
||||
# Tests for sstable validation.
|
||||
#
|
||||
# Namely, detecting discrepancies between index and data files.
|
||||
# These tests produce pairs of sstables, that differ only slightly, mixes the
|
||||
# index/data from the two of them, then runs `sstable validate` expecting it to
|
||||
# find the discrepancies.
|
||||
#############################################################################
|
||||
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def schema1_file():
|
||||
"""Create a schema.cql for the schema1"""
|
||||
with tempfile.NamedTemporaryFile("w+t") as f:
|
||||
f.write("CREATE TABLE ks.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))")
|
||||
f.flush()
|
||||
yield f.name
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def schema2_file():
|
||||
"""Create a schema.cql for the schema2"""
|
||||
with tempfile.NamedTemporaryFile("w+t") as f:
|
||||
f.write("CREATE TABLE ks.tbl (pk int, ck int, v text, s text STATIC, PRIMARY KEY (pk, ck))")
|
||||
f.flush()
|
||||
yield f.name
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def large_rows():
|
||||
"""Create a set of large rows"""
|
||||
rows = []
|
||||
v_1k = 'v' * 1024
|
||||
for ck in range(128):
|
||||
ck_raw = "0004{:08x}".format(ck)
|
||||
rows.append({ "type": "clustering-row", "key": { "raw": ck_raw }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": v_1k } } })
|
||||
return rows
|
||||
|
||||
|
||||
def find_component(generation, component_type, sst_dir):
|
||||
comps = glob.glob(os.path.join(sst_dir, f"*-{str(generation)}-big-{component_type}.db"))
|
||||
assert len(comps) == 1
|
||||
return comps[0]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def sstable_cache(scylla_path):
|
||||
"""Content-addressable sstable cache"""
|
||||
class cache:
|
||||
def __init__(self, scylla_path, workdir):
|
||||
self._scylla_path = scylla_path
|
||||
self._in_json = os.path.join(workdir, "input.json")
|
||||
self._store_dir = os.path.join(workdir, "sst_store_dir")
|
||||
self._cache = {}
|
||||
self._next_generation = 0
|
||||
|
||||
os.mkdir(self._store_dir)
|
||||
|
||||
@property
|
||||
def dir(self):
|
||||
return self._store_dir
|
||||
|
||||
def get_generation(self, sst, schema_file):
|
||||
json_str = json.dumps(sst)
|
||||
generation = self._cache.get(json_str)
|
||||
if not generation is None:
|
||||
return generation
|
||||
with open(self._in_json, "w") as f:
|
||||
f.write(json_str)
|
||||
generation = self._next_generation
|
||||
self._cache[json_str] = generation
|
||||
self._next_generation = self._next_generation + 1
|
||||
subprocess.check_call([self._scylla_path, "sstable", "write", "--schema-file", schema_file, "--output-dir", self._store_dir, "--generation", str(generation), "--input-file", self._in_json])
|
||||
return generation
|
||||
|
||||
def copy_sstable_to(self, generation, target_dir):
|
||||
for f in glob.glob(os.path.join(self._store_dir, f"*-{str(generation)}-big-*.*")):
|
||||
shutil.copy(f, target_dir)
|
||||
|
||||
with tempfile.TemporaryDirectory() as workdir:
|
||||
yield cache(scylla_path, workdir)
|
||||
|
||||
|
||||
def validate_mixed_sstable_pair(ssta, sstb, scylla_path, sst_cache, sst_work_dir, schema_file, error_message):
|
||||
"""Validate an sstable, created by mixing the index and data from two different sstables.
|
||||
|
||||
Check that the validation has the expected result (`error_message`).
|
||||
"""
|
||||
generation_a = sst_cache.get_generation(ssta, schema_file)
|
||||
generation_b = sst_cache.get_generation(sstb, schema_file)
|
||||
shutil.rmtree(sst_work_dir)
|
||||
os.mkdir(sst_work_dir)
|
||||
sst_cache.copy_sstable_to(generation_a, sst_work_dir)
|
||||
shutil.copyfile(find_component(generation_b, "Index", sst_cache.dir), find_component(generation_a, "Index", sst_work_dir))
|
||||
|
||||
res = subprocess.run([scylla_path, "sstable", "validate", "--schema-file", schema_file, find_component(generation_a, "Data", sst_work_dir)],
|
||||
check=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
out = res.stdout.decode('utf-8')
|
||||
err = res.stderr.decode('utf-8')
|
||||
valid = out.split(":")[1].strip() == "valid"
|
||||
if error_message:
|
||||
assert not valid
|
||||
assert error_message in err
|
||||
else:
|
||||
print(err)
|
||||
assert valid
|
||||
|
||||
|
||||
def make_partition(pk, ck, v):
|
||||
pks = [ "000400000001", "000400000000", "000400000002", "000400000003" ]
|
||||
r = { "type": "clustering-row", "key": { "raw": "0004{:08x}".format(ck) }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": v } } }
|
||||
return { "key": { "raw": pks[pk] }, "clustering_elements": [ r ] }
|
||||
|
||||
|
||||
def make_large_partition(pk, rows, with_static_row = False):
|
||||
pks = [ "000400000001", "000400000000", "000400000002", "000400000003" ]
|
||||
if with_static_row:
|
||||
return {
|
||||
"key": { "raw": pks[pk] },
|
||||
"static_row": { "s": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": "s" * 128 } },
|
||||
"clustering_elements": rows
|
||||
}
|
||||
else:
|
||||
return { "key": { "raw": pks[pk] }, "clustering_elements": rows }
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_ok1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_partition(0, 0, 'v')],
|
||||
[make_partition(0, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "")
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_ok2(cql, scylla_path, temp_workdir, schema1_file, sstable_cache):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_partition(0, 0, 'v'), make_partition(1, 0, 'v'), make_partition(2, 0, 'v')],
|
||||
[make_partition(0, 0, 'v'), make_partition(1, 0, 'v'), make_partition(2, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "")
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_mismatching_partition(cql, scylla_path, temp_workdir, schema1_file, sstable_cache):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_partition(0, 0, 'v')],
|
||||
[make_partition(1, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: partition mismatch")
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_mismatching_position(cql, scylla_path, temp_workdir, schema1_file, sstable_cache):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_partition(0, 0, 'v'), make_partition(2, 0, 'v')],
|
||||
[make_partition(0, 0, 'vv'), make_partition(2, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: position mismatch")
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_index_ends_before_data(cql, scylla_path, temp_workdir, schema1_file, sstable_cache):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_partition(0, 0, 'v'), make_partition(2, 0, 'v')],
|
||||
[make_partition(0, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: index is at EOF, but data file has more data")
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="index's EOF definition depends on data file size")
|
||||
def test_scylla_sstable_validate_index_ends_before_data1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_partition(0, 0, 'v')],
|
||||
[make_partition(0, 0, 'vvvvvvvvvvvvvvvvvvvvvv'), make_partition(2, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: data is at EOF, but index has more data")
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="index's EOF definition depends on data file size")
|
||||
def test_scylla_sstable_validate_index_ends_before_data2(cql, scylla_path, temp_workdir, schema1_file, sstable_cache):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_partition(0, 0, 'v')],
|
||||
[make_partition(0, 0, 'v'), make_partition(2, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: data is at EOF, but index has more data")
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_ok1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, large_rows)],
|
||||
[make_large_partition(0, large_rows)],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "")
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_ok2(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, large_rows)],
|
||||
[make_large_partition(0, large_rows)],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema2_file,
|
||||
error_message = "")
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_ok3(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, large_rows, True)],
|
||||
[make_large_partition(0, large_rows, True)],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema2_file,
|
||||
error_message = "")
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_mismatching_position1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows):
|
||||
row_0x = { "type": "clustering-row", "key": { "raw": "000400000000" }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": "v" } } }
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, large_rows)],
|
||||
[make_large_partition(0, [row_0x] + large_rows[1:])],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: position mismatch: promoted index:")
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_mismatching_position2(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, large_rows, True)],
|
||||
[make_large_partition(0, large_rows)],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema2_file,
|
||||
error_message = "mismatching index/data: position mismatch: promoted index:")
|
||||
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_mismatching_position3(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, large_rows)],
|
||||
[make_large_partition(0, large_rows, True)],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema2_file,
|
||||
error_message = "mismatching index/data: position mismatch: promoted index:")
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_mismathing_row(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, [large_rows[1]] + large_rows[4:])],
|
||||
[make_large_partition(0, [large_rows[2]] + large_rows[4:])],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: clustering element")
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_end_of_pi_not_end_of_rows(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows):
|
||||
row_0x = { "type": "clustering-row", "key": { "raw": "000400000000" }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": "v" } } }
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, [row_0x]), make_partition(2, 0, 'v')],
|
||||
[make_large_partition(0, large_rows), make_partition(2, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: promoted index has more blocks, but it is end of partition")
|
||||
|
||||
def test_scylla_sstable_validate_large_rows_end_of_rows_not_end_of_pi(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows):
|
||||
validate_mixed_sstable_pair(
|
||||
[make_large_partition(0, large_rows), make_partition(2, 0, 'v')],
|
||||
[make_large_partition(0, large_rows[:96]), make_partition(2, 0, 'v')],
|
||||
scylla_path,
|
||||
sstable_cache,
|
||||
temp_workdir,
|
||||
schema_file = schema1_file,
|
||||
error_message = "mismatching index/data: promoted index has no more blocks, but partition")
|
||||
@@ -18,30 +18,6 @@ import random
|
||||
import shutil
|
||||
import util
|
||||
|
||||
# To run the Scylla tools, we need to run Scylla executable itself, so we
|
||||
# need to find the path of the executable that was used to run Scylla for
|
||||
# this test. We do this by trying to find a local process which is listening
|
||||
# to the address and port to which our our CQL connection is connected.
|
||||
# If such a process exists, we verify that it is Scylla, and return the
|
||||
# executable's path. If we can't find the Scylla executable we use
|
||||
# pytest.skip() to skip tests relying on this executable.
|
||||
@pytest.fixture(scope="module")
|
||||
def scylla_path(cql):
|
||||
pid = util.local_process_id(cql)
|
||||
if not pid:
|
||||
pytest.skip("Can't find local Scylla process")
|
||||
# Now that we know the process id, use /proc to find the executable.
|
||||
try:
|
||||
path = os.readlink(f'/proc/{pid}/exe')
|
||||
except:
|
||||
pytest.skip("Can't find local Scylla executable")
|
||||
# Confirm that this executable is a real tool-providing Scylla by trying
|
||||
# to run it with the "--list-tools" option
|
||||
try:
|
||||
subprocess.check_output([path, '--list-tools'])
|
||||
except:
|
||||
pytest.skip("Local server isn't Scylla")
|
||||
return path
|
||||
|
||||
# A fixture for finding Scylla's data directory. We get it using the CQL
|
||||
# interface to Scylla's configuration. Note that if the server is remote,
|
||||
@@ -541,13 +517,6 @@ def test_scylla_sstable_script(cql, test_keyspace, scylla_path, scylla_data_dir,
|
||||
assert slice_lua_json == cxx_json
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def temp_workdir():
|
||||
""" Creates a temporary work directory, for the scope of a single test. """
|
||||
with tempfile.TemporaryDirectory() as workdir:
|
||||
yield workdir
|
||||
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def system_scylla_local_sstable_prepared(cql, scylla_data_dir):
|
||||
""" Prepares the system.scylla_local table for the needs of the schema loading tests.
|
||||
|
||||
@@ -945,16 +945,12 @@ void validate_operation(schema_ptr schema, reader_permit permit, const std::vect
|
||||
if (sstables.empty()) {
|
||||
throw std::runtime_error("error: no sstables specified on the command line");
|
||||
}
|
||||
const auto merge = vm.count("merge");
|
||||
sstables::compaction_data info;
|
||||
consume_sstables(schema, permit, sstables, merge, true, [&info] (flat_mutation_reader_v2& rd, sstables::sstable* sst) {
|
||||
if (sst) {
|
||||
sst_log.info("validating {}", sst->get_filename());
|
||||
}
|
||||
const auto errors = sstables::scrub_validate_mode_validate_reader(std::move(rd), info).get();
|
||||
sst_log.info("validated {}: {}", sst ? sst->get_filename() : "the stream", errors == 0 ? "valid" : "invalid");
|
||||
return stop_iteration::no;
|
||||
});
|
||||
|
||||
abort_source abort;
|
||||
for (const auto& sst : sstables) {
|
||||
const auto errors = sst->validate(permit, default_priority_class(), abort, [] (sstring what) { sst_log.info("{}", what); }).get();
|
||||
fmt::print("{}: {}\n", sst->get_filename(), errors == 0 ? "valid" : "invalid");
|
||||
}
|
||||
}
|
||||
|
||||
void dump_index_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
||||
@@ -2713,7 +2709,7 @@ validation will happen on the fragment level.
|
||||
See https://docs.scylladb.com/operating-scylla/admin-tools/scylla-sstable#validate
|
||||
for more information on this operation.
|
||||
)",
|
||||
{"merge"},
|
||||
{},
|
||||
validate_operation},
|
||||
{"validate-checksums",
|
||||
"Validate the checksums of the sstable(s)",
|
||||
|
||||
Reference in New Issue
Block a user