Merge 'Add repair unit tests/v1' from Mikołaj Sielużycki
This patch series splits up parts of repair pipeline to allow unit testing various bits of code without having to run full dtest suite. The reason why repair pipeline has no unit tests is that by definition repair requires multiple nodes, while unit test environment works only for a single node. However, it is possible to explicitly define interfaces between various parts of the pipeline, inject dependencies and test them individually. This patch series is focused on taking repair_rows_on_wire (frozen mutation representation of changes coming from another node) and flushing them to an sstable. The commits are split into the following parts: - pulling out classes to separate headers so that they can be included (potentially indirectly) from the test, - pulling out repair_meta::to_repair_rows_list and part of repair_meta::flush_rows_in_working_row_buf so that they can be tested, - refactoring repair_writer so that the actual writing logic can be injected as dependency, - creating the unit test. tests: unit(dev), dtest(incremental_repair_test, read_repair_test, repair_additional_test, repair_test) Closes #10345 * github.com:scylladb/scylla: repair: Add unit test for flushing repair_rows_on_wire to disk. repair: Extract mutation_fragment_queue and repair_writer::impl interfaces. repair: Make parts of repair_writer interface private. repair: Rename inputs to flush_rows. repair: Make repair_meta::flush_rows a free function. repair: Split flush_rows_in_working_row_buf to two functions and make one static. repair: Rename inputs to to_repair_rows_list. repair: Make to_repair_rows_list a free function. repair: Make repair_meta::to_repair_rows_list a static function repair: Fix indentation in repair_writer. repair: Move repair_writer to separate header. repair: Move repair_row to a separate header. repair: Move repair_sync_boundary to a separate header. repair: Move decorated_key_with_hash to separate header. repair: Move row_repair hashing logic to separate class and file.
This commit is contained in:
@@ -451,6 +451,7 @@ scylla_tests = set([
|
||||
'test/boost/range_tombstone_list_test',
|
||||
'test/boost/reusable_buffer_test',
|
||||
'test/boost/restrictions_test',
|
||||
'test/boost/repair_test',
|
||||
'test/boost/role_manager_test',
|
||||
'test/boost/row_cache_test',
|
||||
'test/boost/schema_change_test',
|
||||
|
||||
25
repair/decorated_key_with_hash.hh
Normal file
25
repair/decorated_key_with_hash.hh
Normal file
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include "xx_hasher.hh"
|
||||
#include "repair/hash.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
|
||||
class decorated_key_with_hash {
|
||||
public:
|
||||
dht::decorated_key dk;
|
||||
repair_hash hash;
|
||||
decorated_key_with_hash(const schema& s, dht::decorated_key key, uint64_t seed)
|
||||
: dk(key) {
|
||||
xx_hasher h(seed);
|
||||
feed_hash(h, dk.key(), s);
|
||||
hash = repair_hash(h.finalize_uint64());
|
||||
}
|
||||
};
|
||||
|
||||
51
repair/hash.hh
Normal file
51
repair/hash.hh
Normal file
@@ -0,0 +1,51 @@
|
||||
#pragma once
|
||||
#include <absl/container/btree_set.h>
|
||||
#include <cstdint>
|
||||
#include <ostream>
|
||||
#include "schema.hh"
|
||||
|
||||
class decorated_key_with_hash;
|
||||
class mutation_fragment;
|
||||
|
||||
// Hash of a repair row
|
||||
class repair_hash {
|
||||
public:
|
||||
uint64_t hash = 0;
|
||||
repair_hash() = default;
|
||||
explicit repair_hash(uint64_t h) : hash(h) {
|
||||
}
|
||||
void clear() {
|
||||
hash = 0;
|
||||
}
|
||||
void add(const repair_hash& other) {
|
||||
hash ^= other.hash;
|
||||
}
|
||||
bool operator==(const repair_hash& x) const {
|
||||
return x.hash == hash;
|
||||
}
|
||||
bool operator!=(const repair_hash& x) const {
|
||||
return x.hash != hash;
|
||||
}
|
||||
bool operator<(const repair_hash& x) const {
|
||||
return x.hash < hash;
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const repair_hash& x) {
|
||||
return os << x.hash;
|
||||
}
|
||||
};
|
||||
|
||||
using repair_hash_set = absl::btree_set<repair_hash>;
|
||||
|
||||
class repair_hasher {
|
||||
uint64_t _seed;
|
||||
schema_ptr _schema;
|
||||
public:
|
||||
repair_hasher(uint64_t seed, schema_ptr s)
|
||||
: _seed(seed)
|
||||
, _schema(std::move(s))
|
||||
{}
|
||||
|
||||
repair_hash do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf);
|
||||
};
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@
|
||||
#include "utils/hash.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "repair/hash.hh"
|
||||
#include "repair/sync_boundary.hh"
|
||||
|
||||
class flat_mutation_reader;
|
||||
|
||||
@@ -256,58 +258,6 @@ public:
|
||||
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,
|
||||
const sstring& cf, const dht::token_range& range);
|
||||
|
||||
// Represent a position of a mutation_fragment read from a flat mutation
|
||||
// reader. Repair nodes negotiate a small range identified by two
|
||||
// repair_sync_boundary to work on in each round.
|
||||
struct repair_sync_boundary {
|
||||
dht::decorated_key pk;
|
||||
position_in_partition position;
|
||||
class tri_compare {
|
||||
dht::ring_position_comparator _pk_cmp;
|
||||
position_in_partition::tri_compare _position_cmp;
|
||||
public:
|
||||
tri_compare(const schema& s) : _pk_cmp(s), _position_cmp(s) { }
|
||||
std::strong_ordering operator()(const repair_sync_boundary& a, const repair_sync_boundary& b) const {
|
||||
auto ret = _pk_cmp(a.pk, b.pk);
|
||||
if (ret == 0) {
|
||||
ret = _position_cmp(a.position, b.position);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const repair_sync_boundary& x) {
|
||||
return os << "{ " << x.pk << "," << x.position << " }";
|
||||
}
|
||||
};
|
||||
|
||||
// Hash of a repair row
|
||||
class repair_hash {
|
||||
public:
|
||||
uint64_t hash = 0;
|
||||
repair_hash() = default;
|
||||
explicit repair_hash(uint64_t h) : hash(h) {
|
||||
}
|
||||
void clear() {
|
||||
hash = 0;
|
||||
}
|
||||
void add(const repair_hash& other) {
|
||||
hash ^= other.hash;
|
||||
}
|
||||
bool operator==(const repair_hash& x) const {
|
||||
return x.hash == hash;
|
||||
}
|
||||
bool operator!=(const repair_hash& x) const {
|
||||
return x.hash != hash;
|
||||
}
|
||||
bool operator<(const repair_hash& x) const {
|
||||
return x.hash < hash;
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const repair_hash& x) {
|
||||
return os << x.hash;
|
||||
}
|
||||
};
|
||||
|
||||
using repair_hash_set = absl::btree_set<repair_hash>;
|
||||
|
||||
enum class repair_row_level_start_status: uint8_t {
|
||||
ok,
|
||||
|
||||
101
repair/row.hh
Normal file
101
repair/row.hh
Normal file
@@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include <optional>
|
||||
#include "frozen_mutation.hh"
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "repair/decorated_key_with_hash.hh"
|
||||
#include "repair/hash.hh"
|
||||
#include "repair/sync_boundary.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
using is_dirty_on_master = bool_class<class is_dirty_on_master_tag>;
|
||||
class decorated_key_with_hash;
|
||||
class repair_hash;
|
||||
|
||||
class repair_row {
|
||||
std::optional<frozen_mutation_fragment> _fm;
|
||||
lw_shared_ptr<const decorated_key_with_hash> _dk_with_hash;
|
||||
std::optional<repair_sync_boundary> _boundary;
|
||||
std::optional<repair_hash> _hash;
|
||||
is_dirty_on_master _dirty_on_master;
|
||||
lw_shared_ptr<mutation_fragment> _mf;
|
||||
public:
|
||||
repair_row() = default;
|
||||
repair_row(std::optional<frozen_mutation_fragment> fm,
|
||||
std::optional<position_in_partition> pos,
|
||||
lw_shared_ptr<const decorated_key_with_hash> dk_with_hash,
|
||||
std::optional<repair_hash> hash,
|
||||
is_dirty_on_master dirty_on_master,
|
||||
lw_shared_ptr<mutation_fragment> mf = {})
|
||||
: _fm(std::move(fm))
|
||||
, _dk_with_hash(std::move(dk_with_hash))
|
||||
, _boundary(pos ? std::optional<repair_sync_boundary>(repair_sync_boundary{_dk_with_hash->dk, std::move(*pos)}) : std::nullopt)
|
||||
, _hash(std::move(hash))
|
||||
, _dirty_on_master(dirty_on_master)
|
||||
, _mf(std::move(mf))
|
||||
{ }
|
||||
lw_shared_ptr<mutation_fragment>& get_mutation_fragment_ptr() { return _mf; }
|
||||
mutation_fragment& get_mutation_fragment() {
|
||||
if (!_mf) {
|
||||
throw std::runtime_error("empty mutation_fragment");
|
||||
}
|
||||
return *_mf;
|
||||
}
|
||||
frozen_mutation_fragment& get_frozen_mutation() {
|
||||
if (!_fm) {
|
||||
throw std::runtime_error("empty frozen_mutation_fragment");
|
||||
}
|
||||
return *_fm;
|
||||
}
|
||||
const frozen_mutation_fragment& get_frozen_mutation() const {
|
||||
if (!_fm) {
|
||||
throw std::runtime_error("empty frozen_mutation_fragment");
|
||||
}
|
||||
return *_fm;
|
||||
}
|
||||
const lw_shared_ptr<const decorated_key_with_hash>& get_dk_with_hash() const {
|
||||
return _dk_with_hash;
|
||||
}
|
||||
size_t size() const {
|
||||
if (!_fm) {
|
||||
throw std::runtime_error("empty size due to empty frozen_mutation_fragment");
|
||||
}
|
||||
return _fm->representation().size();
|
||||
}
|
||||
const repair_sync_boundary& boundary() const {
|
||||
if (!_boundary) {
|
||||
throw std::runtime_error("empty repair_sync_boundary");
|
||||
}
|
||||
return *_boundary;
|
||||
}
|
||||
const repair_hash& hash() const {
|
||||
if (!_hash) {
|
||||
throw std::runtime_error("empty hash");
|
||||
}
|
||||
return *_hash;
|
||||
}
|
||||
is_dirty_on_master dirty_on_master() const {
|
||||
return _dirty_on_master;
|
||||
}
|
||||
future<> clear_gently() noexcept {
|
||||
if (_fm) {
|
||||
co_await _fm->clear_gently();
|
||||
_fm.reset();
|
||||
}
|
||||
_dk_with_hash = {};
|
||||
_boundary.reset();
|
||||
_hash.reset();
|
||||
_mf = {};
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -47,6 +47,11 @@
|
||||
#include "readers/empty.hh"
|
||||
#include "readers/evictable.hh"
|
||||
#include "readers/queue.hh"
|
||||
#include "repair/hash.hh"
|
||||
#include "repair/decorated_key_with_hash.hh"
|
||||
#include "repair/row.hh"
|
||||
#include "repair/writer.hh"
|
||||
#include "xx_hasher.hh"
|
||||
|
||||
extern logging::logger rlogger;
|
||||
|
||||
@@ -239,98 +244,13 @@ static uint64_t get_random_seed() {
|
||||
return random_dist(random_engine);
|
||||
}
|
||||
|
||||
class decorated_key_with_hash {
|
||||
public:
|
||||
dht::decorated_key dk;
|
||||
repair_hash hash;
|
||||
decorated_key_with_hash(const schema& s, dht::decorated_key key, uint64_t seed)
|
||||
: dk(key) {
|
||||
xx_hasher h(seed);
|
||||
feed_hash(h, dk.key(), s);
|
||||
hash = repair_hash(h.finalize_uint64());
|
||||
}
|
||||
};
|
||||
repair_hash repair_hasher::do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) {
|
||||
xx_hasher h(_seed);
|
||||
feed_hash(h, mf, *_schema);
|
||||
feed_hash(h, dk_with_hash.hash.hash);
|
||||
return repair_hash(h.finalize_uint64());
|
||||
}
|
||||
|
||||
using is_dirty_on_master = bool_class<class is_dirty_on_master_tag>;
|
||||
|
||||
class repair_row {
|
||||
std::optional<frozen_mutation_fragment> _fm;
|
||||
lw_shared_ptr<const decorated_key_with_hash> _dk_with_hash;
|
||||
std::optional<repair_sync_boundary> _boundary;
|
||||
std::optional<repair_hash> _hash;
|
||||
is_dirty_on_master _dirty_on_master;
|
||||
lw_shared_ptr<mutation_fragment> _mf;
|
||||
public:
|
||||
repair_row() = default;
|
||||
repair_row(std::optional<frozen_mutation_fragment> fm,
|
||||
std::optional<position_in_partition> pos,
|
||||
lw_shared_ptr<const decorated_key_with_hash> dk_with_hash,
|
||||
std::optional<repair_hash> hash,
|
||||
is_dirty_on_master dirty_on_master,
|
||||
lw_shared_ptr<mutation_fragment> mf = {})
|
||||
: _fm(std::move(fm))
|
||||
, _dk_with_hash(std::move(dk_with_hash))
|
||||
, _boundary(pos ? std::optional<repair_sync_boundary>(repair_sync_boundary{_dk_with_hash->dk, std::move(*pos)}) : std::nullopt)
|
||||
, _hash(std::move(hash))
|
||||
, _dirty_on_master(dirty_on_master)
|
||||
, _mf(std::move(mf)) {
|
||||
}
|
||||
lw_shared_ptr<mutation_fragment>& get_mutation_fragment_ptr () {
|
||||
return _mf;
|
||||
}
|
||||
mutation_fragment& get_mutation_fragment() {
|
||||
if (!_mf) {
|
||||
throw std::runtime_error("empty mutation_fragment");
|
||||
}
|
||||
return *_mf;
|
||||
}
|
||||
frozen_mutation_fragment& get_frozen_mutation() {
|
||||
if (!_fm) {
|
||||
throw std::runtime_error("empty frozen_mutation_fragment");
|
||||
}
|
||||
return *_fm;
|
||||
}
|
||||
const frozen_mutation_fragment& get_frozen_mutation() const {
|
||||
if (!_fm) {
|
||||
throw std::runtime_error("empty frozen_mutation_fragment");
|
||||
}
|
||||
return *_fm;
|
||||
}
|
||||
const lw_shared_ptr<const decorated_key_with_hash>& get_dk_with_hash() const {
|
||||
return _dk_with_hash;
|
||||
}
|
||||
size_t size() const {
|
||||
if (!_fm) {
|
||||
throw std::runtime_error("empty size due to empty frozen_mutation_fragment");
|
||||
}
|
||||
return _fm->representation().size();
|
||||
}
|
||||
const repair_sync_boundary& boundary() const {
|
||||
if (!_boundary) {
|
||||
throw std::runtime_error("empty repair_sync_boundary");
|
||||
}
|
||||
return *_boundary;
|
||||
}
|
||||
const repair_hash& hash() const {
|
||||
if (!_hash) {
|
||||
throw std::runtime_error("empty hash");
|
||||
}
|
||||
return *_hash;
|
||||
}
|
||||
is_dirty_on_master dirty_on_master() const {
|
||||
return _dirty_on_master;
|
||||
}
|
||||
future<> clear_gently() noexcept {
|
||||
if (_fm) {
|
||||
co_await _fm->clear_gently();
|
||||
_fm.reset();
|
||||
}
|
||||
_dk_with_hash = {};
|
||||
_boundary.reset();
|
||||
_hash.reset();
|
||||
_mf = {};
|
||||
}
|
||||
};
|
||||
|
||||
class repair_reader {
|
||||
public:
|
||||
@@ -474,47 +394,48 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class repair_writer : public enable_lw_shared_from_this<repair_writer> {
|
||||
class repair_writer_impl : public repair_writer::impl {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
uint64_t _estimated_partitions;
|
||||
std::optional<future<>> _writer_done;
|
||||
std::optional<queue_reader_handle> _mq;
|
||||
// Current partition written to disk
|
||||
lw_shared_ptr<const decorated_key_with_hash> _current_dk_written_to_sstable;
|
||||
// Is current partition still open. A partition is opened when a
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
bool _partition_opened;
|
||||
mutation_fragment_queue _mq;
|
||||
sharded<replica::database>& _db;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
streaming::stream_reason _reason;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
flat_mutation_reader _queue_reader;
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
uint64_t estimated_partitions,
|
||||
streaming::stream_reason reason)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _estimated_partitions(estimated_partitions)
|
||||
, _reason(reason) {
|
||||
repair_writer_impl(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
uint64_t estimated_partitions,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
streaming::stream_reason reason,
|
||||
mutation_fragment_queue queue,
|
||||
flat_mutation_reader queue_reader)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _estimated_partitions(estimated_partitions)
|
||||
, _mq(std::move(queue))
|
||||
, _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _view_update_generator(view_update_generator)
|
||||
, _reason(reason)
|
||||
, _queue_reader(std::move(queue_reader))
|
||||
{}
|
||||
|
||||
virtual void create_writer(lw_shared_ptr<repair_writer> writer) override;
|
||||
|
||||
virtual mutation_fragment_queue& queue() override {
|
||||
return _mq;
|
||||
}
|
||||
|
||||
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
_current_dk_written_to_sstable = dk;
|
||||
if (mf.is_partition_start()) {
|
||||
return _mq->push(std::move(mf)).then([this] {
|
||||
_partition_opened = true;
|
||||
});
|
||||
} else {
|
||||
auto start = mutation_fragment(*_schema, _permit, partition_start(dk->dk, tombstone()));
|
||||
return _mq->push(std::move(start)).then([this, mf = std::move(mf)] () mutable {
|
||||
_partition_opened = true;
|
||||
return _mq->push(std::move(mf));
|
||||
});
|
||||
}
|
||||
};
|
||||
virtual future<> wait_for_writer_done() override;
|
||||
|
||||
private:
|
||||
static sstables::offstrategy is_offstrategy_supported(streaming::stream_reason reason) {
|
||||
static const std::unordered_set<streaming::stream_reason> operations_supported = {
|
||||
streaming::stream_reason::bootstrap,
|
||||
@@ -526,94 +447,140 @@ public:
|
||||
};
|
||||
return sstables::offstrategy(operations_supported.contains(reason));
|
||||
}
|
||||
};
|
||||
|
||||
void create_writer(sharded<replica::database>& db, sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<db::view::view_update_generator>& view_update_gen) {
|
||||
if (_writer_done) {
|
||||
return;
|
||||
}
|
||||
replica::table& t = db.local().find_column_family(_schema->id());
|
||||
auto [queue_reader, queue_handle] = make_queue_reader(_schema, _permit);
|
||||
_mq = std::move(queue_handle);
|
||||
auto writer = shared_from_this();
|
||||
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, upgrade_to_v2(std::move(queue_reader)),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, db, sys_dist_ks, view_update_gen, _estimated_partitions, _reason, is_offstrategy_supported(_reason)),
|
||||
t.stream_in_progress()).then([writer] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
writer->_schema->ks_name(), writer->_schema->cf_name(), partitions);
|
||||
}).handle_exception([writer] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
writer->_schema->ks_name(), writer->_schema->cf_name(), ep);
|
||||
writer->_mq->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
future<> repair_writer::write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
_current_dk_written_to_sstable = dk;
|
||||
if (mf.is_partition_start()) {
|
||||
return _mq->push(std::move(mf)).then([this] {
|
||||
_partition_opened = true;
|
||||
});
|
||||
} else {
|
||||
auto start = mutation_fragment(*_schema, _permit, partition_start(dk->dk, tombstone()));
|
||||
return _mq->push(std::move(start)).then([this, mf = std::move(mf)] () mutable {
|
||||
_partition_opened = true;
|
||||
return _mq->push(std::move(mf));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
future<> write_partition_end() {
|
||||
if (_partition_opened) {
|
||||
return _mq->push(mutation_fragment(*_schema, _permit, partition_end())).then([this] {
|
||||
_partition_opened = false;
|
||||
class queue_reader_handle_adapter : public mutation_fragment_queue::impl {
|
||||
queue_reader_handle _handle;
|
||||
public:
|
||||
queue_reader_handle_adapter(queue_reader_handle handle)
|
||||
: _handle(std::move(handle))
|
||||
{}
|
||||
|
||||
virtual future<> push(mutation_fragment mf) override {
|
||||
return _handle.push(std::move(mf));
|
||||
}
|
||||
|
||||
virtual void abort(std::exception_ptr ep) override {
|
||||
_handle.abort(std::move(ep));
|
||||
}
|
||||
|
||||
virtual void push_end_of_stream() override {
|
||||
_handle.push_end_of_stream();
|
||||
}
|
||||
};
|
||||
|
||||
mutation_fragment_queue make_mutation_fragment_queue(queue_reader_handle handle) {
|
||||
return mutation_fragment_queue(std::make_unique<queue_reader_handle_adapter>(std::move(handle)));
|
||||
}
|
||||
|
||||
void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
|
||||
if (_writer_done) {
|
||||
return;
|
||||
}
|
||||
replica::table& t = _db.local().find_column_family(_schema->id());
|
||||
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, upgrade_to_v2(std::move(_queue_reader)),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, _estimated_partitions, _reason, is_offstrategy_supported(_reason)),
|
||||
t.stream_in_progress()).then([w] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
|
||||
}).handle_exception([w] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
w->schema()->ks_name(), w->schema()->cf_name(), ep);
|
||||
w->queue().abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
lw_shared_ptr<repair_writer> make_repair_writer(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
uint64_t estimated_partitions,
|
||||
streaming::stream_reason reason,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator) {
|
||||
auto [queue_reader, queue_handle] = make_queue_reader(schema, permit);
|
||||
auto queue = mutation_fragment_queue(std::make_unique<queue_reader_handle_adapter>(std::move(queue_handle)));
|
||||
auto i = std::make_unique<repair_writer_impl>(schema, permit, estimated_partitions, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader));
|
||||
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
|
||||
}
|
||||
|
||||
future<> repair_writer::write_partition_end() {
|
||||
if (_partition_opened) {
|
||||
return _mq->push(mutation_fragment(*_schema, _permit, partition_end())).then([this] {
|
||||
_partition_opened = false;
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> repair_writer::do_write(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
if (_current_dk_written_to_sstable) {
|
||||
const auto cmp_res = _current_dk_written_to_sstable->dk.tri_compare(*_schema, dk->dk);
|
||||
if (cmp_res > 0) {
|
||||
on_internal_error(rlogger, format("repair_writer::do_write(): received out-of-order partition, current: {}, next: {}", _current_dk_written_to_sstable->dk, dk->dk));
|
||||
} else if (cmp_res == 0) {
|
||||
return _mq->push(std::move(mf));
|
||||
} else {
|
||||
return write_partition_end().then([this,
|
||||
dk = std::move(dk), mf = std::move(mf)] () mutable {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf));
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
} else {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf));
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_write(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
if (_current_dk_written_to_sstable) {
|
||||
const auto cmp_res = _current_dk_written_to_sstable->dk.tri_compare(*_schema, dk->dk);
|
||||
if (cmp_res > 0) {
|
||||
on_internal_error(rlogger, format("repair_writer::do_write(): received out-of-order partition, current: {}, next: {}", _current_dk_written_to_sstable->dk, dk->dk));
|
||||
} else if (cmp_res == 0) {
|
||||
return _mq->push(std::move(mf));
|
||||
} else {
|
||||
return write_partition_end().then([this,
|
||||
dk = std::move(dk), mf = std::move(mf)] () mutable {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf));
|
||||
});
|
||||
}
|
||||
} else {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf));
|
||||
}
|
||||
}
|
||||
|
||||
future<> write_end_of_stream() {
|
||||
if (_mq) {
|
||||
return with_semaphore(_sem, 1, [this] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end().then([this] () mutable {
|
||||
_mq->push_end_of_stream();
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
_mq->abort(ep);
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_wait_for_writer_done() {
|
||||
if (_writer_done) {
|
||||
return std::move(*(_writer_done));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> wait_for_writer_done() {
|
||||
return when_all_succeed(write_end_of_stream(), do_wait_for_writer_done()).discard_result().handle_exception(
|
||||
[this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
future<> repair_writer::write_end_of_stream() {
|
||||
if (_created_writer) {
|
||||
return with_semaphore(_sem, 1, [this] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end().then([this] () mutable {
|
||||
_mq->push_end_of_stream();
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
_mq->abort(ep);
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
future<> repair_writer_impl::wait_for_writer_done() {
|
||||
if (_writer_done) {
|
||||
return std::move(*(_writer_done));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
future<> repair_writer::wait_for_writer_done() {
|
||||
return when_all_succeed(write_end_of_stream(), _impl->wait_for_writer_done()).discard_result().handle_exception(
|
||||
[this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
class repair_meta;
|
||||
class repair_meta_tracker;
|
||||
@@ -622,10 +589,86 @@ class row_level_repair;
|
||||
static void add_to_repair_meta_for_masters(repair_meta& rm);
|
||||
static void add_to_repair_meta_for_followers(repair_meta& rm);
|
||||
|
||||
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr s, uint64_t seed, repair_master is_master, reader_permit permit, repair_hasher hasher) {
|
||||
return do_with(std::move(rows), std::list<repair_row>(), lw_shared_ptr<const decorated_key_with_hash>(), lw_shared_ptr<mutation_fragment>(), position_in_partition::tri_compare(*s),
|
||||
[s, seed, is_master, permit, hasher] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& dk_ptr, lw_shared_ptr<mutation_fragment>& last_mf, position_in_partition::tri_compare& cmp) mutable {
|
||||
return do_for_each(rows, [&dk_ptr, &row_list, &last_mf, &cmp, s, seed, is_master, permit, hasher] (partition_key_and_mutation_fragments& x) mutable {
|
||||
dht::decorated_key dk = dht::decorate_key(*s, x.get_key());
|
||||
if (!(dk_ptr && dk_ptr->dk.equal(*s, dk))) {
|
||||
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*s, dk, seed);
|
||||
}
|
||||
if (is_master) {
|
||||
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, s, permit, hasher] (frozen_mutation_fragment& fmf) mutable {
|
||||
_metrics.rx_row_nr += 1;
|
||||
_metrics.rx_row_bytes += fmf.representation().size();
|
||||
// Keep the mutation_fragment in repair_row as an
|
||||
// optimization to avoid unfreeze again when
|
||||
// mutation_fragment is needed by _repair_writer.do_write()
|
||||
// to apply the repair_row to disk
|
||||
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
|
||||
auto hash = hasher.do_hash_for_mf(*dk_ptr, *mf);
|
||||
position_in_partition pos(mf->position());
|
||||
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
|
||||
});
|
||||
} else {
|
||||
last_mf = {};
|
||||
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, &last_mf, &cmp, s, permit] (frozen_mutation_fragment& fmf) mutable {
|
||||
_metrics.rx_row_nr += 1;
|
||||
_metrics.rx_row_bytes += fmf.representation().size();
|
||||
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
|
||||
// If the mutation_fragment has the same position as
|
||||
// the last mutation_fragment, it means they are the
|
||||
// same row with different contents. We can not feed
|
||||
// such rows into the sstable writer. Instead we apply
|
||||
// the mutation_fragment into the previous one.
|
||||
if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) {
|
||||
last_mf->apply(*s, std::move(*mf));
|
||||
} else {
|
||||
last_mf = mf;
|
||||
// On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used.
|
||||
row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf)));
|
||||
}
|
||||
});
|
||||
}
|
||||
}).then([&row_list] {
|
||||
return std::move(row_list);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer) {
|
||||
auto cmp = position_in_partition::tri_compare(*s);
|
||||
lw_shared_ptr<mutation_fragment> last_mf;
|
||||
lw_shared_ptr<const decorated_key_with_hash> last_dk;
|
||||
for (auto& r : rows) {
|
||||
thread::maybe_yield();
|
||||
if (!r.dirty_on_master()) {
|
||||
continue;
|
||||
}
|
||||
writer->create_writer();
|
||||
auto mf = r.get_mutation_fragment_ptr();
|
||||
const auto& dk = r.get_dk_with_hash()->dk;
|
||||
if (last_mf && last_dk &&
|
||||
cmp(last_mf->position(), mf->position()) == 0 &&
|
||||
dk.tri_compare(*s, last_dk->dk) == 0 &&
|
||||
last_mf->mergeable_with(*mf)) {
|
||||
last_mf->apply(*s, std::move(*mf));
|
||||
} else {
|
||||
if (last_mf && last_dk) {
|
||||
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
|
||||
}
|
||||
last_mf = mf;
|
||||
last_dk = r.get_dk_with_hash();
|
||||
}
|
||||
}
|
||||
if (last_mf && last_dk) {
|
||||
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
|
||||
}
|
||||
}
|
||||
|
||||
class repair_meta {
|
||||
friend repair_meta_tracker;
|
||||
public:
|
||||
using repair_master = bool_class<class repair_master_tag>;
|
||||
using update_working_row_buf = bool_class<class update_working_row_buf_tag>;
|
||||
using update_peer_row_hash_sets = bool_class<class update_peer_row_hash_sets_tag>;
|
||||
using needs_all_rows_t = bool_class<class needs_all_rows_tag>;
|
||||
@@ -685,6 +728,7 @@ private:
|
||||
std::vector<repair_node_state> _all_node_states;
|
||||
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
|
||||
std::optional<shared_promise<>> _stop_promise;
|
||||
repair_hasher _repair_hasher;
|
||||
public:
|
||||
std::vector<repair_node_state>& all_nodes() {
|
||||
return _all_node_states;
|
||||
@@ -771,7 +815,7 @@ public:
|
||||
_seed,
|
||||
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
|
||||
)
|
||||
, _repair_writer(make_lw_shared<repair_writer>(_schema, _permit, _estimated_partitions, _reason))
|
||||
, _repair_writer(make_repair_writer(_schema, _permit, _estimated_partitions, _reason, _db, _sys_dist_ks, _view_update_generator))
|
||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||
[&rs] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||
return rs.get_messaging().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
|
||||
@@ -785,6 +829,7 @@ public:
|
||||
return rs.get_messaging().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, addr);
|
||||
})
|
||||
, _row_level_repair_ptr(row_level_repair_ptr)
|
||||
, _repair_hasher(_seed, _schema)
|
||||
{
|
||||
if (master) {
|
||||
add_to_repair_meta_for_masters(*this);
|
||||
@@ -975,13 +1020,6 @@ private:
|
||||
});
|
||||
}
|
||||
|
||||
repair_hash do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) {
|
||||
xx_hasher h(_seed);
|
||||
feed_hash(h, mf, *_schema);
|
||||
feed_hash(h, dk_with_hash.hash.hash);
|
||||
return repair_hash(h.finalize_uint64());
|
||||
}
|
||||
|
||||
stop_iteration handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
if (mf.is_partition_start()) {
|
||||
auto& start = mf.as_partition_start();
|
||||
@@ -994,7 +1032,7 @@ private:
|
||||
_repair_reader.clear_current_dk();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
auto hash = do_hash_for_mf(*_repair_reader.get_current_dk(), mf);
|
||||
auto hash = _repair_hasher.do_hash_for_mf(*_repair_reader.get_current_dk(), mf);
|
||||
repair_row r(freeze(*_schema, mf), position_in_partition(mf.position()), _repair_reader.get_current_dk(), hash, is_dirty_on_master::no);
|
||||
rlogger.trace("Reading: r.boundary={}, r.hash={}", r.boundary(), r.hash());
|
||||
_metrics.row_from_disk_nr++;
|
||||
@@ -1187,7 +1225,7 @@ private:
|
||||
future<> do_apply_rows(std::list<repair_row>&& row_diff, update_working_row_buf update_buf) {
|
||||
return do_with(std::move(row_diff), [this, update_buf] (std::list<repair_row>& row_diff) {
|
||||
return with_semaphore(_repair_writer->sem(), 1, [this, update_buf, &row_diff] {
|
||||
_repair_writer->create_writer(_db, _sys_dist_ks, _view_update_generator);
|
||||
_repair_writer->create_writer();
|
||||
return repeat([this, update_buf, &row_diff] () mutable {
|
||||
if (row_diff.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
@@ -1217,7 +1255,7 @@ private:
|
||||
if (rows.empty()) {
|
||||
return;
|
||||
}
|
||||
auto row_diff = to_repair_rows_list(std::move(rows)).get0();
|
||||
auto row_diff = to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher).get0();
|
||||
auto sz = get_repair_rows_size(row_diff).get0();
|
||||
stats().rx_row_bytes += sz;
|
||||
stats().rx_row_nr += row_diff.size();
|
||||
@@ -1249,33 +1287,7 @@ public:
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
auto cmp = position_in_partition::tri_compare(*_schema);
|
||||
lw_shared_ptr<mutation_fragment> last_mf;
|
||||
lw_shared_ptr<const decorated_key_with_hash> last_dk;
|
||||
for (auto& r : _working_row_buf) {
|
||||
thread::maybe_yield();
|
||||
if (!r.dirty_on_master()) {
|
||||
continue;
|
||||
}
|
||||
_repair_writer->create_writer(_db, _sys_dist_ks, _view_update_generator);
|
||||
auto mf = r.get_mutation_fragment_ptr();
|
||||
const auto& dk = r.get_dk_with_hash()->dk;
|
||||
if (last_mf && last_dk &&
|
||||
cmp(last_mf->position(), mf->position()) == 0 &&
|
||||
dk.tri_compare(*_schema, last_dk->dk) == 0 &&
|
||||
last_mf->mergeable_with(*mf)) {
|
||||
last_mf->apply(*_schema, std::move(*mf));
|
||||
} else {
|
||||
if (last_mf && last_dk) {
|
||||
_repair_writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
|
||||
}
|
||||
last_mf = mf;
|
||||
last_dk = r.get_dk_with_hash();
|
||||
}
|
||||
}
|
||||
if (last_mf && last_dk) {
|
||||
_repair_writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
|
||||
}
|
||||
flush_rows(_schema, _working_row_buf, _repair_writer);
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -1284,7 +1296,7 @@ private:
|
||||
if (rows.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return to_repair_rows_list(std::move(rows)).then([this] (std::list<repair_row> row_diff) {
|
||||
return to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_apply_rows(std::move(row_diff), update_working_row_buf::no);
|
||||
});
|
||||
}
|
||||
@@ -1320,53 +1332,6 @@ private:
|
||||
});
|
||||
};
|
||||
|
||||
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows) {
|
||||
return do_with(std::move(rows), std::list<repair_row>(), lw_shared_ptr<const decorated_key_with_hash>(), lw_shared_ptr<mutation_fragment>(), position_in_partition::tri_compare(*_schema),
|
||||
[this] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& dk_ptr, lw_shared_ptr<mutation_fragment>& last_mf, position_in_partition::tri_compare& cmp) mutable {
|
||||
return do_for_each(rows, [this, &dk_ptr, &row_list, &last_mf, &cmp] (partition_key_and_mutation_fragments& x) mutable {
|
||||
dht::decorated_key dk = dht::decorate_key(*_schema, x.get_key());
|
||||
if (!(dk_ptr && dk_ptr->dk.equal(*_schema, dk))) {
|
||||
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*_schema, dk, _seed);
|
||||
}
|
||||
if (_repair_master) {
|
||||
return do_for_each(x.get_mutation_fragments(), [this, &dk_ptr, &row_list] (frozen_mutation_fragment& fmf) mutable {
|
||||
_metrics.rx_row_nr += 1;
|
||||
_metrics.rx_row_bytes += fmf.representation().size();
|
||||
// Keep the mutation_fragment in repair_row as an
|
||||
// optimization to avoid unfreeze again when
|
||||
// mutation_fragment is needed by _repair_writer.do_write()
|
||||
// to apply the repair_row to disk
|
||||
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
|
||||
auto hash = do_hash_for_mf(*dk_ptr, *mf);
|
||||
position_in_partition pos(mf->position());
|
||||
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
|
||||
});
|
||||
} else {
|
||||
last_mf = {};
|
||||
return do_for_each(x.get_mutation_fragments(), [this, &dk_ptr, &row_list, &last_mf, &cmp] (frozen_mutation_fragment& fmf) mutable {
|
||||
_metrics.rx_row_nr += 1;
|
||||
_metrics.rx_row_bytes += fmf.representation().size();
|
||||
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
|
||||
// If the mutation_fragment has the same position as
|
||||
// the last mutation_fragment, it means they are the
|
||||
// same row with different contents. We can not feed
|
||||
// such rows into the sstable writer. Instead we apply
|
||||
// the mutation_fragment into the previous one.
|
||||
if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) {
|
||||
last_mf->apply(*_schema, std::move(*mf));
|
||||
} else {
|
||||
last_mf = mf;
|
||||
// On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used.
|
||||
row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf)));
|
||||
}
|
||||
});
|
||||
}
|
||||
}).then([&row_list] {
|
||||
return std::move(row_list);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
// RPC API
|
||||
// Return the hashes of the rows in _working_row_buf
|
||||
@@ -2783,7 +2748,7 @@ public:
|
||||
algorithm,
|
||||
max_row_buf_size,
|
||||
_seed,
|
||||
repair_meta::repair_master::yes,
|
||||
repair_master::yes,
|
||||
repair_meta_id,
|
||||
_ri.reason,
|
||||
std::move(master_node_shard_config),
|
||||
@@ -3114,7 +3079,7 @@ repair_service::insert_repair_meta(
|
||||
algo,
|
||||
max_row_buf_size,
|
||||
seed,
|
||||
repair_meta::repair_master::no,
|
||||
repair_master::no,
|
||||
repair_meta_id,
|
||||
reason,
|
||||
std::move(master_node_shard_config),
|
||||
|
||||
@@ -12,6 +12,9 @@
|
||||
#include "gms/inet_address.hh"
|
||||
#include "repair/repair.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
class row_level_repair_gossip_helper;
|
||||
|
||||
@@ -231,7 +234,17 @@ public:
|
||||
};
|
||||
|
||||
class repair_info;
|
||||
using repair_master = bool_class<class repair_master_tag>;
|
||||
class partition_key_and_mutation_fragments;
|
||||
using repair_rows_on_wire = std::list<partition_key_and_mutation_fragments>;
|
||||
class repair_row;
|
||||
class repair_hasher;
|
||||
class repair_writer;
|
||||
|
||||
future<> repair_cf_range_row_level(repair_info& ri,
|
||||
sstring cf_name, utils::UUID table_id, dht::token_range range,
|
||||
const std::vector<gms::inet_address>& all_peer_nodes);
|
||||
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
|
||||
schema_ptr s, uint64_t seed, repair_master is_master,
|
||||
reader_permit permit, repair_hasher hasher);
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer);
|
||||
|
||||
37
repair/sync_boundary.hh
Normal file
37
repair/sync_boundary.hh
Normal file
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright (C) 2018-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "position_in_partition.hh"
|
||||
|
||||
// Represent a position of a mutation_fragment read from a flat mutation
|
||||
// reader. Repair nodes negotiate a small range identified by two
|
||||
// repair_sync_boundary to work on in each round.
|
||||
struct repair_sync_boundary {
|
||||
dht::decorated_key pk;
|
||||
position_in_partition position;
|
||||
class tri_compare {
|
||||
dht::ring_position_comparator _pk_cmp;
|
||||
position_in_partition::tri_compare _position_cmp;
|
||||
public:
|
||||
tri_compare(const schema& s) : _pk_cmp(s), _position_cmp(s) { }
|
||||
std::strong_ordering operator()(const repair_sync_boundary& a, const repair_sync_boundary& b) const {
|
||||
auto ret = _pk_cmp(a.pk, b.pk);
|
||||
if (ret == 0) {
|
||||
ret = _position_cmp(a.position, b.position);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const repair_sync_boundary& x) {
|
||||
return os << "{ " << x.pk << "," << x.position << " }";
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
123
repair/writer.hh
Normal file
123
repair/writer.hh
Normal file
@@ -0,0 +1,123 @@
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include "schema_fwd.hh"
|
||||
#include <optional>
|
||||
#include "reader_permit.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include "repair/decorated_key_with_hash.hh"
|
||||
#include "readers/queue.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
namespace view {
|
||||
class view_update_generator;
|
||||
}
|
||||
}
|
||||
|
||||
class mutation_fragment_queue {
|
||||
public:
|
||||
class impl {
|
||||
public:
|
||||
virtual future<> push(mutation_fragment mf) = 0;
|
||||
virtual void abort(std::exception_ptr ep) = 0;
|
||||
virtual void push_end_of_stream() = 0;
|
||||
virtual ~impl() {}
|
||||
};
|
||||
|
||||
private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
|
||||
public:
|
||||
mutation_fragment_queue(std::unique_ptr<impl> impl)
|
||||
: _impl(std::move(impl))
|
||||
{}
|
||||
|
||||
future<> push(mutation_fragment mf) {
|
||||
return _impl->push(std::move(mf));
|
||||
}
|
||||
|
||||
void abort(std::exception_ptr ep) {
|
||||
_impl->abort(ep);
|
||||
}
|
||||
|
||||
void push_end_of_stream() {
|
||||
_impl->push_end_of_stream();
|
||||
}
|
||||
};
|
||||
|
||||
class repair_writer : public enable_lw_shared_from_this<repair_writer> {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
// Current partition written to disk
|
||||
lw_shared_ptr<const decorated_key_with_hash> _current_dk_written_to_sstable;
|
||||
// Is current partition still open. A partition is opened when a
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
bool _partition_opened;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
bool _created_writer = false;
|
||||
public:
|
||||
class impl {
|
||||
public:
|
||||
virtual mutation_fragment_queue& queue() = 0;
|
||||
virtual future<> wait_for_writer_done() = 0;
|
||||
virtual void create_writer(lw_shared_ptr<repair_writer> writer) = 0;
|
||||
virtual ~impl() = default;
|
||||
};
|
||||
private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
mutation_fragment_queue* _mq;
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
std::unique_ptr<impl> impl)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _impl(std::move(impl))
|
||||
, _mq(&_impl->queue())
|
||||
{}
|
||||
|
||||
void create_writer() {
|
||||
_impl->create_writer(shared_from_this());
|
||||
_created_writer = true;
|
||||
}
|
||||
|
||||
future<> do_write(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf);
|
||||
|
||||
future<> wait_for_writer_done();
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
|
||||
schema_ptr schema() const noexcept {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
mutation_fragment_queue& queue() {
|
||||
return _impl->queue();
|
||||
}
|
||||
|
||||
private:
|
||||
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf);
|
||||
|
||||
|
||||
future<> write_partition_end();
|
||||
future<> write_end_of_stream();
|
||||
};
|
||||
|
||||
lw_shared_ptr<repair_writer> make_repair_writer(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
uint64_t estimated_partitions,
|
||||
streaming::stream_reason reason,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator);
|
||||
|
||||
143
test/boost/repair_test.cc
Normal file
143
test/boost/repair_test.cc
Normal file
@@ -0,0 +1,143 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "readers/from_fragments_v2.hh"
|
||||
#include "readers/upgrading_consumer.hh"
|
||||
#include "repair/hash.hh"
|
||||
#include "repair/row.hh"
|
||||
#include "repair/writer.hh"
|
||||
#include "repair/row_level.hh"
|
||||
#include "test/lib/mutation_source_test.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/reader_concurrency_semaphore.hh"
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
// Helper mutation_fragment_queue that stores the received stream of
|
||||
// mutation_fragments in a passed in deque of mutation_fragment_v2.
|
||||
// This allows easy reader construction to verify what was sent to the queue
|
||||
class test_mutation_fragment_queue_impl : public mutation_fragment_queue::impl, enable_lw_shared_from_this<test_mutation_fragment_queue_impl> {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
std::deque<mutation_fragment_v2>& _fragments;
|
||||
std::unique_ptr<upgrading_consumer<std::function<void(mutation_fragment_v2&&)>>> _consumer;
|
||||
public:
|
||||
test_mutation_fragment_queue_impl(schema_ptr s, reader_permit permit, std::deque<mutation_fragment_v2>& fragments)
|
||||
: mutation_fragment_queue::impl()
|
||||
, _schema(std::move(s))
|
||||
, _permit(std::move(permit))
|
||||
, _fragments(fragments)
|
||||
{
|
||||
_consumer = std::make_unique<upgrading_consumer<std::function<void(mutation_fragment_v2&&)>>>
|
||||
(*_schema, _permit, [this](mutation_fragment_v2&& mf) mutable {
|
||||
_fragments.push_back(std::move(mf));
|
||||
});
|
||||
}
|
||||
|
||||
virtual future<> push(mutation_fragment mf) override {
|
||||
_consumer->consume(std::move(mf));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual void abort(std::exception_ptr ep) override {}
|
||||
|
||||
virtual void push_end_of_stream() override {}
|
||||
};
|
||||
|
||||
mutation_fragment_queue make_test_mutation_fragment_queue(schema_ptr s, reader_permit permit, std::deque<mutation_fragment_v2>& fragments) {
|
||||
return mutation_fragment_queue(std::make_unique<test_mutation_fragment_queue_impl>(std::move(s), std::move(permit), fragments));
|
||||
}
|
||||
|
||||
// repair_writer::impl abstracts away underlying writer that will receive
|
||||
// mutation fragments sent to the underlying queue. This implementation
|
||||
// receives the queue as its dependency and delegates all related work
|
||||
// to the queue.
|
||||
class test_repair_writer_impl : public repair_writer::impl {
|
||||
mutation_fragment_queue _queue;
|
||||
public:
|
||||
test_repair_writer_impl(mutation_fragment_queue queue)
|
||||
: _queue(std::move(queue))
|
||||
{}
|
||||
|
||||
virtual future<> wait_for_writer_done() override {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual mutation_fragment_queue& queue() override {
|
||||
return _queue;
|
||||
}
|
||||
|
||||
virtual void create_writer(lw_shared_ptr<repair_writer> writer) override {
|
||||
// Empty implementation. The queue received in constructor
|
||||
// should already contain a fully initialised consumer.
|
||||
}
|
||||
};
|
||||
|
||||
// Creates a helper repair_writer object that will fill in the passed in
|
||||
// deque of fragments as it receives repair_rows during flush
|
||||
lw_shared_ptr<repair_writer> make_test_repair_writer(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2>& fragments) {
|
||||
mutation_fragment_queue mq = make_test_mutation_fragment_queue(schema, permit, fragments);
|
||||
return make_lw_shared<repair_writer>(std::move(schema), std::move(permit), std::make_unique<test_repair_writer_impl>(std::move(mq)));
|
||||
}
|
||||
|
||||
repair_rows_on_wire make_random_repair_rows_on_wire(random_mutation_generator& gen, schema_ptr s, reader_permit permit, lw_shared_ptr<replica::memtable> m) {
|
||||
repair_rows_on_wire input;
|
||||
std::vector<mutation> muts = gen(100);
|
||||
|
||||
for (mutation& mut : muts) {
|
||||
partition_key pk = mut.key();
|
||||
auto m2 = make_lw_shared<replica::memtable>(s);
|
||||
m->apply(mut);
|
||||
m2->apply(mut);
|
||||
flat_mutation_reader reader = downgrade_to_v1(m2->make_flat_reader(s, permit));
|
||||
std::list<frozen_mutation_fragment> mfs;
|
||||
reader.consume_pausable([&input, s, &mfs](mutation_fragment mf) {
|
||||
if ((mf.is_partition_start() && !mf.as_partition_start().partition_tombstone()) || mf.is_end_of_partition()) {
|
||||
// Stream of mutations coming from the wire doesn't contain partition_end
|
||||
// fragments. partition_start can be sent only if it contains a tombstone.
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
mfs.push_back(freeze(*s, std::move(mf)));
|
||||
return stop_iteration::no;
|
||||
}).get();
|
||||
input.push_back(partition_key_and_mutation_fragments(pk, std::move(mfs)));
|
||||
}
|
||||
return input;
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(flush_repair_rows_on_wire_to_sstable) {
|
||||
// The basic premise of repairing is applying missing mutations from other nodes
|
||||
// to the current one and vice versa. The missing mutations are passed on the
|
||||
// wire in the form of repair_rows_on_wire objects.
|
||||
//
|
||||
// This test exercises the path of receiving rows on wire and flushing them
|
||||
// to disk. repair_rows_on_wire is optimised for wire transfer and not for
|
||||
// internal manipulation of the data and writing to disk, so they are converted
|
||||
// to a friendlier representation of std::list<repair_row>. Such list is then
|
||||
// flushed to disk.
|
||||
//
|
||||
// The test generates a random stream of mutations, converts them to repair_rows_on_wire,
|
||||
// converts them to std::list<repair_row> and verifies that if they were flushed
|
||||
// to disk, they would produce the original stream.
|
||||
return seastar::async([&] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
reader_permit permit = semaphore.make_permit();
|
||||
random_mutation_generator gen{random_mutation_generator::generate_counters::no};
|
||||
schema_ptr s = gen.schema();
|
||||
auto m = make_lw_shared<replica::memtable>(s);
|
||||
repair_rows_on_wire input = make_random_repair_rows_on_wire(gen, s, permit, m);
|
||||
std::deque<mutation_fragment_v2> fragments;
|
||||
lw_shared_ptr<repair_writer> writer = make_test_repair_writer(s, permit, fragments);
|
||||
uint64_t seed = tests::random::get_int<uint64_t>();
|
||||
std::list<repair_row> repair_rows = to_repair_rows_list(std::move(input), s, seed, repair_master::yes, permit, repair_hasher(seed, s)).get();
|
||||
flush_rows(s, repair_rows, writer);
|
||||
writer->wait_for_writer_done().get();
|
||||
compare_readers(*s, m->make_flat_reader(s, permit), make_flat_mutation_reader_from_fragments(s, permit, std::move(fragments)));
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user