Merge "Validate token monotonicity on the sstable write path" from Botond

"
We have recently seen out-of-order partitions getting into sstables
causing major disruption later on. Given the damage caused, it was again
raised that we should enable partition key monotonicity validation
unconditionally in the sstable write path. This was also raised in the
past but dismissed as key validation was suspected (but not measured) to
add considerable per-fragment overhead. One of the problems was that the
key monotonicity validation was all or nothing. It either validated all
(clustering and partition) key monotonicity or none of it.
This series takes a second look at this and solves the all-or-nothing
problem by making the configuration of the key monotonicity check more
fine grained, allowing for enabling just token monotonicity validation
separately, then enables it unconditionally.

Refs: #7623

Tests: unit(release)
"

* 'sstable-writer-validate-partition-keys-unconditionally/v3' of https://github.com/denesb/scylla:
  sstables: enable token monotonicity validation by default
  mutation_fragment_stream_validator: add token validation level
  mutation_fragment_stream_validating_filter: make validation levels more fine-grained
This commit is contained in:
Avi Kivity
2021-03-01 11:23:51 +02:00
6 changed files with 87 additions and 21 deletions

View File

@@ -969,6 +969,14 @@ bool mutation_fragment_stream_validator::operator()(const dht::decorated_key& dk
return false;
}
bool mutation_fragment_stream_validator::operator()(dht::token t) {
if (_prev_partition_key.token() <= t) {
_prev_partition_key._token = t;
return true;
}
return false;
}
bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
if (_prev_kind == mutation_fragment::kind::partition_end) {
const bool valid = (kind == mutation_fragment::kind::partition_start);
@@ -1036,22 +1044,48 @@ namespace {
}
bool mutation_fragment_stream_validating_filter::operator()(const dht::decorated_key& dk) {
if (_compare_keys) {
if (!_validator(dk)) {
on_validation_error(fmr_logger, format("[validator {} for {}] Unexpected partition key: previous {}, current {}",
static_cast<void*>(this), _name, _validator.previous_partition_key(), dk));
}
if (_validation_level < mutation_fragment_stream_validation_level::token) {
return true;
}
if (_validation_level == mutation_fragment_stream_validation_level::token) {
if (_validator(dk.token())) {
return true;
}
on_validation_error(fmr_logger, format("[validator {} for {}] Unexpected token: previous {}, current {}",
static_cast<void*>(this), _name, _validator.previous_token(), dk.token()));
} else {
if (_validator(dk)) {
return true;
}
on_validation_error(fmr_logger, format("[validator {} for {}] Unexpected partition key: previous {}, current {}",
static_cast<void*>(this), _name, _validator.previous_partition_key(), dk));
}
return true;
}
mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_filter(sstring_view name, const schema& s, bool compare_keys)
mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_filter(sstring_view name, const schema& s,
mutation_fragment_stream_validation_level level)
: _validator(s)
, _name(format("{} ({}.{} {})", name, s.ks_name(), s.cf_name(), s.id()))
, _compare_keys(compare_keys)
, _validation_level(level)
{
fmr_logger.debug("[validator {} for {}] Will validate {} monotonicity.", static_cast<void*>(this), _name,
compare_keys ? "keys" : "only partition regions");
if (fmr_logger.level() <= log_level::debug) {
std::string_view what;
switch (_validation_level) {
case mutation_fragment_stream_validation_level::partition_region:
what = "partition region";
break;
case mutation_fragment_stream_validation_level::token:
what = "partition region and token";
break;
case mutation_fragment_stream_validation_level::partition_key:
what = "partition region and partition key";
break;
case mutation_fragment_stream_validation_level::clustering_key:
what = "partition region, partition key and clustering key";
break;
}
fmr_logger.debug("[validator {} for {}] Will validate {} monotonicity.", static_cast<void*>(this), _name, what);
}
}
bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
@@ -1059,14 +1093,14 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment::k
fmr_logger.debug("[validator {}] {}:{}", static_cast<void*>(this), kind, pos);
if (_compare_keys) {
if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
valid = _validator(kind, pos);
} else {
valid = _validator(kind);
}
if (__builtin_expect(!valid, false)) {
if (_compare_keys) {
if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
on_validation_error(fmr_logger, format("[validator {} for {}] Unexpected mutation fragment: previous {}:{}, current {}:{}",
static_cast<void*>(this), _name, _validator.previous_mutation_fragment_kind(), _validator.previous_position(), kind, pos));
} else {

View File

@@ -23,11 +23,19 @@
#include "mutation_fragment.hh"
enum class mutation_fragment_stream_validation_level {
partition_region, // fragment kind
token,
partition_key,
clustering_key,
};
/// Low level fragment stream validator.
///
/// Tracks and validates the monotonicity of the passed in fragment kinds,
/// position in partition and partition keys. Any subset of these can be
/// used, but what is used have to be consistent across the entire stream.
/// position in partition, token or partition keys. Any subset of these
/// can be used, but what is used have to be consistent across the entire
/// stream.
class mutation_fragment_stream_validator {
const schema& _schema;
mutation_fragment::kind _prev_kind;
@@ -67,11 +75,23 @@ public:
/// \returns true if the mutation fragment kind is valid.
bool operator()(const mutation_fragment& mf);
/// Validates the monotonicity of the token.
///
/// Does not check fragment level monotonicity.
/// Advances the previous token, but only if the validation passes.
/// Cannot be used in parallel with the `dht::decorated_key`
/// overload.
///
/// \returns true if the token is valid.
bool operator()(dht::token t);
/// Validates the monotonicity of the partition.
///
/// Does not check fragment level monotonicity.
/// Advances the previous partition-key, but only if the validation passes.
//
/// Cannot be used in parallel with the `dht::token`
/// overload.
///
/// \returns true if the partition key is valid.
bool operator()(const dht::decorated_key& dk);
@@ -92,6 +112,15 @@ public:
return _prev_pos;
}
/// The previous valid partition key.
///
/// Only valid if `operator()(const dht::decorated_key&)` or
/// `operator()(dht::token)` was used.
dht::token previous_token() const {
return _prev_partition_key.token();
}
/// The previous valid partition key.
///
/// Only valid if `operator()(const dht::decorated_key&)` was used.
const dht::decorated_key& previous_partition_key() const {
return _prev_partition_key;
}
@@ -110,7 +139,7 @@ struct invalid_mutation_fragment_stream : public std::runtime_error {
class mutation_fragment_stream_validating_filter {
mutation_fragment_stream_validator _validator;
sstring _name;
bool _compare_keys;
mutation_fragment_stream_validation_level _validation_level;
public:
/// Constructor.
@@ -118,7 +147,7 @@ public:
/// \arg name is used in log messages to identify the validator, the
/// schema identity is added automatically
/// \arg compare_keys enable validating clustering key monotonicity
mutation_fragment_stream_validating_filter(sstring_view name, const schema& s, bool compare_keys = false);
mutation_fragment_stream_validating_filter(sstring_view name, const schema& s, mutation_fragment_stream_validation_level level);
bool operator()(const dht::decorated_key& dk);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);

View File

@@ -62,6 +62,7 @@
#include "sstables/shareable_components.hh"
#include "sstables/open_info.hh"
#include "query-request.hh"
#include "mutation_fragment_stream_validator.hh"
#include <seastar/util/optimized_optional.hh>
#include <boost/intrusive/list.hpp>
@@ -113,7 +114,7 @@ struct sstable_writer_config {
uint64_t max_sstable_size = std::numeric_limits<uint64_t>::max();
bool backup = false;
bool leave_unsealed = false;
bool validate_keys;
mutation_fragment_stream_validation_level validation_level;
std::optional<db::replay_position> replay_position;
std::optional<int> sstable_level;
write_monitor* monitor = &default_write_monitor();

View File

@@ -56,7 +56,9 @@ sstable_writer_config sstables_manager::configure_writer(sstring origin) const {
sstable_writer_config cfg;
cfg.promoted_index_block_size = _db_config.column_index_size_in_kb() * 1024;
cfg.validate_keys = _db_config.enable_sstable_key_validation();
cfg.validation_level = _db_config.enable_sstable_key_validation()
? mutation_fragment_stream_validation_level::clustering_key
: mutation_fragment_stream_validation_level::token;
cfg.summary_byte_cost = summary_byte_cost(_db_config.sstable_summary_ratio());
cfg.correctly_serialize_non_compound_range_tombstones = true;

View File

@@ -46,7 +46,7 @@ struct sstable_writer::writer_impl {
, _pc(pc)
, _cfg(cfg)
, _collector(_schema, sst.get_filename())
, _validator(format("sstable writer {}", _sst.get_filename()), _schema, _cfg.validate_keys)
, _validator(format("sstable writer {}", _sst.get_filename()), _schema, _cfg.validation_level)
{}
virtual void consume_new_partition(const dht::decorated_key& dk) = 0;

View File

@@ -5308,7 +5308,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
auto local_keys = make_local_keys(3, schema);
auto config = env.manager().configure_writer();
config.validate_keys = false; // this test violates key order on purpose
config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose
auto writer = sst->get_writer(*schema, local_keys.size(), config, encoding_stats{});
auto make_static_row = [&, schema, ts] {