Merge "Introduce row level repair" from Asias

"
=== How the the partition level repair works

- The repair master decides which ranges to work on.
- The repair master splits the ranges to sub ranges which contains around 100
partitions.
- The repair master computes the checksum of the 100 partitions and asks the
related peers to compute the checksum of the 100 partitions.
- If the checksum matches, the data in this sub range is synced.
- If the checksum mismatches, repair master fetches the data from all the peers
and sends back the merged data to peers.

=== Major problems with partition level repair

- A mismatch of a single row in any of the 100 partitions causes 100
partitions to be transferred. A single partition can be very large. Not to
mention the size of 100 partitions.

- Checksum (find the mismatch) and streaming (fix the mismatch) will read the
same data twice

=== Row level repair

Row level checksum and synchronization: detect row level mismatch and transfer
only the mismatch

=== How the row level repair works

- To solve the problem of reading data twice

Read the data only once for both checksum and synchronization between nodes.

We work on a small range which contains only a few mega bytes of rows,
We read all the rows within the small range into memory. Find the
mismatch and send the mismatch rows between peers.

We need to find a sync boundary among the nodes which contains only N bytes of
rows.

- To solve the problem of sending unnecessary data.

We need to find the mismatched rows between nodes and only send the delta.
The problem is called set reconciliation problem which is a common problem in
distributed systems.

For example:
Node1 has set1 = {row1, row2, row3}
Node2 has set2 = {      row2, row3}
Node3 has set3 = {row1, row2, row4}

To repair:
Node1 fetches nothing from Node2 (set2 - set1), fetches row4 (set3 - set1) from Node3.
Node1 sends row1 and row4 (set1 + set2 + set3 - set2) to Node2
Node1 sends row3 (set1 + set2 + set3 - set3) to Node3.

=== How to implement repair with set reconciliation

- Step A: Negotiate sync boundary

class repair_sync_boundary {
    dht::decorated_key pk;
    position_in_partition position
}

Reads rows from disk into row buffers until the size is larger than N
bytes. Return the repair_sync_boundary of the last mutation_fragment we
read from disk. The smallest repair_sync_boundary of all nodes is
set as the current_sync_boundary.

- Step B: Get missing rows from peer nodes so that repair master contains all the rows

Request combined hashes from all nodes between last_sync_boundary and
current_sync_boundary. If the combined hashes from all nodes are identical,
data is synced, goto Step A. If not, request the full hashes from peers.

At this point, the repair master knows exactly what rows are missing. Request the
missing rows from peer nodes.

Now, local node contains all the rows.

- Step C: Send missing rows to the peer nodes

Since local node also knows what peer nodes own, it sends the missing rows to
the peer nodes.

=== How the RPC API looks like

- repair_range_start()

Step A:
- request_sync_boundary()

Step B:
- request_combined_row_hashes()
- reqeust_full_row_hashes()
- request_row_diff()

Step C:
- send_row_diff()

- repair_range_stop()

=== Performance evaluation

We created a cluster of 3 Scylla nodes on AWS using i3.xlarge instance. We
created a keyspace with a replication factor of 3 and inserted 1 billion
rows to each of the 3 nodes. Each node has 241 GiB of data.
We tested 3 cases below.

1) 0% synced: one of the node has zero data. The other two nodes have 1 billion identical rows.

Time to repair:
   old = 87 min
   new = 70 min (rebuild took 50 minutes)
   improvement = 19.54%

2) 100% synced: all of the 3 nodes have 1 billion identical rows.
Time to repair:
   old = 43 min
   new = 24 min
   improvement = 44.18%

3) 99.9% synced: each node has 1 billion identical rows and 1 billion * 0.1% distinct rows.

Time to repair:
   old: 211 min
   new: 44 min
   improvement: 79.15%

Bytes sent on wire for repair:
   old: tx= 162 GiB,  rx = 90 GiB
   new: tx= 1.15 GiB, tx = 0.57 GiB
   improvement: tx = 99.29%, rx = 99.36%

It is worth noting that row level repair sends and receives exactly the
number of rows needed in theory.

In this test case, repair master needs to receives 2 million rows and
sends 4 million rows. Here are the details: Each node has 1 billion *
0.1% distinct rows, that is 1 million rows. So repair master receives 1
million rows from repair slave 1 and 1 million rows from repair slave 2.
Repair master sends 1 million rows from repair master and 1 million rows
received from repair slave 1 to repair slave 2. Repair master sends
sends 1 million rows from repair master and 1 million rows received from
repair slave 2 to repair slave 1.

In the result, we saw the rows on wire were as expected.

tx_row_nr  = 1000505 + 999619 + 1001257 + 998619 (4 shards, the numbers are for each shard) = 4'000'000
rx_row_nr  =  500233 + 500235 +  499559 + 499973 (4 shards, the numbers are for each shard) = 2'000'000

Fixes: #3033

Tests: dtests/repair_additional_test.py
"

* 'asias/row_level_repair_v7' of github.com:cloudius-systems/seastar-dev: (51 commits)
  repair: Enable row level repair
  repair: Add row_level_repair
  repair: Add docs for row level repair
  repair: Add repair_init_messaging_service_handler
  repair: Add repair_meta
  repair: Add repair_writer
  repair: Add repair_reader
  repair: Add repair_row
  repair: Add fragment_hasher
  repair: Add decorated_key_with_hash
  repair: Add get_random_seed
  repair: Add get_common_diff_detect_algorithm
  repair: Add shard_config
  repair: Add suportted_diff_detect_algorithms
  repair: Add repair_stats to repair_info
  repair: Introduce repair_stats
  flat_mutation_reader:  Add make_generating_reader
  storage_service: Introduce ROW_LEVEL_REPAIR feature
  messaging_service: Add RPC verbs for row level repair
  repair: Export the repair logger
  ...
This commit is contained in:
Avi Kivity
2018-12-25 13:13:00 +02:00
26 changed files with 2645 additions and 251 deletions

View File

@@ -663,6 +663,7 @@ scylla_core = (['database.cc',
'init.cc',
'lister.cc',
'repair/repair.cc',
'repair/row_level.cc',
'exceptions/exceptions.cc',
'auth/allow_all_authenticator.cc',
'auth/allow_all_authorizer.cc',

View File

@@ -21,7 +21,9 @@
#include "i_partitioner.hh"
#include <seastar/core/reactor.hh>
#include "murmur3_partitioner.hh"
#include "dht/murmur3_partitioner.hh"
#include "dht/random_partitioner.hh"
#include "dht/byte_ordered_partitioner.hh"
#include "utils/class_registrator.hh"
#include "types.hh"
#include "utils/murmur_hash.hh"
@@ -182,13 +184,17 @@ std::unique_ptr<i_partitioner> default_partitioner;
void set_global_partitioner(const sstring& class_name, unsigned ignore_msb)
{
default_partitioner = make_partitioner(class_name, smp::count, ignore_msb);
}
std::unique_ptr<dht::i_partitioner> make_partitioner(sstring partitioner_name, unsigned shard_count, unsigned sharding_ignore_msb_bits) {
try {
default_partitioner = create_object<i_partitioner, const unsigned&, const unsigned&>(class_name, smp::count, ignore_msb);
return create_object<i_partitioner, const unsigned&, const unsigned&>(partitioner_name, shard_count, sharding_ignore_msb_bits);
} catch (std::exception& e) {
auto supported_partitioners = ::join(", ", class_registry<i_partitioner>::classes() |
boost::adaptors::map_keys);
throw std::runtime_error(format("Partitioner {} is not supported, supported partitioners = {{ {} }} : {}",
class_name, supported_partitioners, e.what()));
partitioner_name, supported_partitioners, e.what()));
}
}

View File

@@ -178,6 +178,11 @@ public:
dht::token _token;
partition_key _key;
decorated_key(dht::token t, partition_key k)
: _token(std::move(t))
, _key(std::move(k)) {
}
struct less_comparator {
schema_ptr s;
less_comparator(schema_ptr s);
@@ -802,6 +807,8 @@ public:
stdx::optional<dht::token_range> next();
};
std::unique_ptr<dht::i_partitioner> make_partitioner(sstring name, unsigned shard_count, unsigned sharding_ignore_msb_bits);
} // dht
namespace std {

164
docs/row_level_repair.md Normal file
View File

@@ -0,0 +1,164 @@
# Row level repair
## How the the partition level repair works
- The repair master decides which ranges to work on.
- The repair master splits the ranges to sub ranges which contains around 100
partitions.
- The repair master computes the checksum of the 100 partitions and asks the
related peers to compute the checksum of the 100 partitions.
- If the checksum matches, the data in this sub range is synced.
- If the checksum mismatches, repair master fetches the data from all the peers
and sends back the merged data to peers.
## Major problems with partition level repair
- A mismatch of a single row in any of the 100 partitions causes 100
partitions to be transferred. A single partition can be very large. Not to
mention the size of 100 partitions.
- Checksum (find the mismatch) and streaming (fix the mismatch) will read the
same data twice
## Row level repair
Row level checksum and synchronization: detect row level mismatch and transfer
only the mismatch
## How the row level repair works
- To solve the problem of reading data twice
Read the data only once for both checksum and synchronization between nodes.
We work on a small range which contains only a few mega bytes of rows,
We read all the rows within the small range into memory. Find the
mismatch and send the mismatch rows between peers.
We need to find a sync boundary among the nodes which contains only N bytes of
rows.
- To solve the problem of sending unnecessary data.
We need to find the mismatched rows between nodes and only send the delta.
The problem is called set reconciliation problem which is a common problem in
distributed systems.
For example:
- Node1 has set1 = {row1, row2, row3}
- Node2 has set2 = { row2, row3}
- Node3 has set3 = {row1, row2, row4}
To repair:
- Node1 fetches nothing from Node2 (set2 - set1), fetches row4 (set3 - set1) from Node3.
- Node1 sends row1 and row4 (set1 + set2 + set3 - set2) to Node2.
- Node1 sends row3 (set1 + set2 + set3 - set3) to Node3.
## How to implement repair with set reconciliation
- Step A: Negotiate sync boundary
class repair_sync_boundary {
dht::decorated_key pk;
position_in_partition position
}
Reads rows from disk into row buffers until the size is larger than N
bytes. Return the repair_sync_boundary of the last mutation_fragment we
read from disk. The smallest repair_sync_boundary of all nodes is
set as the current_sync_boundary.
- Step B: Get missing rows from peer nodes so that repair master contains all the rows
Request combined hashes from all nodes between last_sync_boundary and
current_sync_boundary. If the combined hashes from all nodes are identical,
data is synced, goto Step A. If not, request the full hashes from peers.
At this point, the repair master knows exactly what rows are missing. Request the
missing rows from peer nodes.
Now, local node contains all the rows.
- Step C: Send missing rows to the peer nodes
Since local node also knows what peer nodes own, it sends the missing rows to
the peer nodes.
## How the RPC API looks like
Start:
- repair_range_start()
Step A:
- request_sync_boundary()
Step B:
- request_combined_row_hashes()
- request_full_row_hashes()
- request_row_diff()
Step C:
- send_row_diff()
Finish:
- repair_range_stop()
## Performance evaluation
We created a cluster of 3 Scylla nodes on AWS using i3.xlarge instance. We
created a keyspace with a replication factor of 3 and inserted 1 billion
rows to each of the 3 nodes. Each node has 241 GiB of data.
We tested 3 cases below.
### 1) 0% synced: one of the node has zero data. The other two nodes have 1 billion identical rows.
Time to repair:
* old = 87 min
* new = 70 min (rebuild took 50 minutes)
* improvement = 19.54%
### 2) 100% synced: all of the 3 nodes have 1 billion identical rows.
Time to repair:
* old = 43 min
* new = 24 min
* improvement = 44.18%
### 3) 99.9% synced: each node has 1 billion identical rows and 1 billion * 0.1% distinct rows.
Time to repair:
* old: 211 min
* new: 44 min
* improvement: 79.15%
Bytes sent on wire for repair:
* old: tx= 162 GiB, rx = 90 GiB
* new: tx= 1.15 GiB, tx = 0.57 GiB
* improvement: tx = 99.29%, rx = 99.36%
It is worth noting that row level repair sends and receives exactly the
number of rows needed in theory.
In this test case, repair master needs to receives 2 million rows and
sends 4 million rows. Here are the details: Each node has 1 billion *
0.1% distinct rows, that is 1 million rows. So repair master receives 1
million rows from repair follower 1 and 1 million rows from repair follower 2.
Repair master sends 1 million rows from repair master and 1 million rows
received from repair follower 1 to repair follower 2. Repair master sends
sends 1 million rows from repair master and 1 million rows received from
repair follower 2 to repair follower 1.
In the result, we saw the rows on wire were as expected.
tx_row_nr = 1000505 + 999619 + 1001257 + 998619 (4 shards, the numbers are for each shard) = 4'000'000
rx_row_nr = 500233 + 500235 + 499559 + 499973 (4 shards, the numbers are for each shard) = 2'000'000

View File

@@ -811,3 +811,41 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, std::deque<mutation_
};
return make_flat_mutation_reader<reader>(std::move(schema), std::move(fragments));
}
/*
* This reader takes a get_next_fragment generator that produces mutation_fragment_opt which is returned by
* generating_reader.
*
*/
class generating_reader final : public flat_mutation_reader::impl {
std::function<future<mutation_fragment_opt> ()> _get_next_fragment;
public:
generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt> ()> get_next_fragment)
: impl(std::move(s)), _get_next_fragment(std::move(get_next_fragment))
{ }
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return _get_next_fragment().then([this] (mutation_fragment_opt mopt) {
if (!mopt) {
_end_of_stream = true;
} else {
push_mutation_fragment(std::move(*mopt));
}
});
});
}
virtual void next_partition() override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
};
flat_mutation_reader make_generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt> ()> get_next_fragment) {
return make_flat_mutation_reader<generating_reader>(std::move(s), std::move(get_next_fragment));
}

View File

@@ -671,3 +671,6 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db:
});
});
}
flat_mutation_reader
make_generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt> ()> get_next_fragment);

View File

@@ -27,3 +27,48 @@ enum class repair_checksum : uint8_t {
class partition_checksum {
std::array<uint8_t, 32> digest();
};
class repair_hash {
uint64_t hash;
};
enum class bound_weight : int8_t {
before_all_prefixed = -1,
equal = 0,
after_all_prefixed = 1,
};
enum class partition_region : uint8_t {
partition_start,
static_row,
clustered,
partition_end,
};
class position_in_partition {
partition_region get_type();
bound_weight get_bound_weight();
std::optional<clustering_key_prefix> get_clustering_key_prefix();
};
struct partition_key_and_mutation_fragments {
partition_key get_key();
std::list<frozen_mutation_fragment> get_mutation_fragments();
};
class repair_sync_boundary {
dht::decorated_key pk;
position_in_partition position;
};
struct get_sync_boundary_response {
std::optional<repair_sync_boundary> boundary;
repair_hash row_buf_combined_csum;
uint64_t row_buf_size;
uint64_t new_rows_size;
uint64_t new_rows_nr;
};
enum class row_level_diff_detect_algorithm : uint8_t {
send_full_set,
};

View File

@@ -8,4 +8,10 @@ class token {
dht::token::kind _kind;
bytes _data;
};
class decorated_key {
dht::token token();
partition_key key();
};
}

View File

@@ -238,11 +238,7 @@ void reconnectable_snitch_helper::reconnect(gms::inet_address public_address, gm
//
netw::get_messaging_service().invoke_on_all([public_address, local_address] (auto& local_ms) {
local_ms.cache_preferred_ip(public_address, local_address);
netw::msg_addr id = {
.addr = public_address
};
local_ms.remove_rpc_client(id);
local_ms.remove_rpc_client(netw::msg_addr(public_address));
}).get();
logger().debug("Initiated reconnect to an Internal IP {} for the {}", local_address, public_address);

View File

@@ -49,6 +49,7 @@
#include "init.hh"
#include "release.hh"
#include "repair/repair.hh"
#include "repair/row_level.hh"
#include <cstdio>
#include <seastar/core/file.hh>
#include <sys/time.h>
@@ -759,6 +760,7 @@ int main(int ac, char** av) {
});
});
}).get();
repair_init_messaging_service_handler().get();
supervisor::notify("starting storage service", true);
auto& ss = service::get_local_storage_service();
ss.init_messaging_service_part().get();

View File

@@ -58,6 +58,7 @@
#include "idl/query.dist.hh"
#include "idl/cache_temperature.dist.hh"
#include "idl/view.dist.hh"
#include "idl/mutation.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
@@ -77,6 +78,7 @@
#include "idl/partition_checksum.dist.impl.hh"
#include "idl/query.dist.impl.hh"
#include "idl/cache_temperature.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/multi_algo_compressor_factory.hh>
#include "idl/view.dist.impl.hh"
@@ -448,6 +450,16 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::REPLICATION_FINISHED:
case messaging_verb::REPAIR_CHECKSUM_RANGE:
case messaging_verb::STREAM_MUTATION_FRAGMENTS:
case messaging_verb::REPAIR_ROW_LEVEL_START:
case messaging_verb::REPAIR_ROW_LEVEL_STOP:
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES:
case messaging_verb::REPAIR_GET_COMBINED_ROW_HASH:
case messaging_verb::REPAIR_GET_SYNC_BOUNDARY:
case messaging_verb::REPAIR_GET_ROW_DIFF:
case messaging_verb::REPAIR_PUT_ROW_DIFF:
case messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS:
case messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS:
case messaging_verb::REPAIR_GET_DIFF_ALGORITHMS:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:
@@ -516,11 +528,7 @@ future<> messaging_service::init_local_preferred_ip_cache() {
// just read.
//
for (auto& p : _preferred_ip_cache) {
msg_addr id = {
.addr = p.first
};
this->remove_rpc_client(id);
this->remove_rpc_client(msg_addr(p.first));
}
});
}
@@ -1007,4 +1015,113 @@ future<partition_checksum> messaging_service::send_repair_checksum_range(
std::move(keyspace), std::move(cf), std::move(range), hash_version);
}
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
void messaging_service::register_repair_get_full_row_hashes(std::function<future<std::unordered_set<repair_hash>> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));
}
void messaging_service::unregister_repair_get_full_row_hashes() {
_rpc->unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES);
}
future<std::unordered_set<repair_hash>> messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id) {
return send_message<future<std::unordered_set<repair_hash>>>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id);
}
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
void messaging_service::register_repair_get_combined_row_hash(std::function<future<repair_hash> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(func));
}
void messaging_service::unregister_repair_get_combined_row_hash() {
_rpc->unregister_handler(messaging_verb::REPAIR_GET_COMBINED_ROW_HASH);
}
future<repair_hash> messaging_service::send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary) {
return send_message<future<repair_hash>>(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(id), repair_meta_id, std::move(common_sync_boundary));
}
void messaging_service::register_repair_get_sync_boundary(std::function<future<get_sync_boundary_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(func));
}
void messaging_service::unregister_repair_get_sync_boundary() {
_rpc->unregister_handler(messaging_verb::REPAIR_GET_SYNC_BOUNDARY);
}
future<get_sync_boundary_response> messaging_service::send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary) {
return send_message<future<get_sync_boundary_response>>(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(id), repair_meta_id, std::move(skipped_sync_boundary));
}
// Wrapper for REPAIR_GET_ROW_DIFF
void messaging_service::register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(func));
}
void messaging_service::unregister_repair_get_row_diff() {
_rpc->unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF);
}
future<repair_rows_on_wire> messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
return send_message<future<repair_rows_on_wire>>(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(id), repair_meta_id, std::move(set_diff), needs_all_rows);
}
// Wrapper for REPAIR_PUT_ROW_DIFF
void messaging_service::register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff)>&& func) {
register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(func));
}
void messaging_service::unregister_repair_put_row_diff() {
_rpc->unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF);
}
future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff) {
return send_message<void>(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(id), repair_meta_id, std::move(row_diff));
}
// Wrapper for REPAIR_ROW_LEVEL_START
void messaging_service::register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name)>&& func) {
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
}
void messaging_service::unregister_repair_row_level_start() {
_rpc->unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
}
future<> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name) {
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name));
}
// Wrapper for REPAIR_ROW_LEVEL_STOP
void messaging_service::register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func) {
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(func));
}
void messaging_service::unregister_repair_row_level_stop() {
_rpc->unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_STOP);
}
future<> messaging_service::send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range) {
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range));
}
// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS
void messaging_service::register_repair_get_estimated_partitions(std::function<future<uint64_t> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(func));
}
void messaging_service::unregister_repair_get_estimated_partitions() {
_rpc->unregister_handler(messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS);
}
future<uint64_t> messaging_service::send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id) {
return send_message<future<uint64_t>>(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id);
}
// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS
void messaging_service::register_repair_set_estimated_partitions(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions)>&& func) {
register_handler(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(func));
}
void messaging_service::unregister_repair_set_estimated_partitions() {
_rpc->unregister_handler(messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS);
}
future<> messaging_service::send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions) {
return send_message<void>(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, estimated_partitions);
}
// Wrapper for REPAIR_GET_DIFF_ALGORITHMS
void messaging_service::register_repair_get_diff_algorithms(std::function<future<std::vector<row_level_diff_detect_algorithm>> (const rpc::client_info& cinfo)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(func));
}
void messaging_service::unregister_repair_get_diff_algorithms() {
_rpc->unregister_handler(messaging_verb::REPAIR_GET_DIFF_ALGORITHMS);
}
future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_repair_get_diff_algorithms(msg_addr id) {
return send_message<future<std::vector<row_level_diff_detect_algorithm>>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id));
}
} // namespace net

View File

@@ -38,6 +38,9 @@
#include "streaming/stream_reason.hh"
#include "cache_temperature.hh"
#include <list>
#include <vector>
#include <optional>
#include <seastar/net/tls.hh>
// forward declarations
@@ -117,7 +120,17 @@ enum class messaging_verb : int32_t {
COUNTER_MUTATION = 23,
MUTATION_FAILED = 24,
STREAM_MUTATION_FRAGMENTS = 25,
LAST = 26,
REPAIR_ROW_LEVEL_START = 26,
REPAIR_ROW_LEVEL_STOP = 27,
REPAIR_GET_FULL_ROW_HASHES = 28,
REPAIR_GET_COMBINED_ROW_HASH = 29,
REPAIR_GET_SYNC_BOUNDARY = 30,
REPAIR_GET_ROW_DIFF = 31,
REPAIR_PUT_ROW_DIFF = 32,
REPAIR_GET_ESTIMATED_PARTITIONS= 33,
REPAIR_SET_ESTIMATED_PARTITIONS= 34,
REPAIR_GET_DIFF_ALGORITHMS = 35,
LAST = 36,
};
} // namespace netw
@@ -272,6 +285,56 @@ public:
void unregister_repair_checksum_range();
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version);
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
void register_repair_get_full_row_hashes(std::function<future<std::unordered_set<repair_hash>> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
void unregister_repair_get_full_row_hashes();
future<std::unordered_set<repair_hash>> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id);
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
void register_repair_get_combined_row_hash(std::function<future<repair_hash> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary)>&& func);
void unregister_repair_get_combined_row_hash();
future<repair_hash> send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary);
// Wrapper for REPAIR_GET_SYNC_BOUNDARY
void register_repair_get_sync_boundary(std::function<future<get_sync_boundary_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary)>&& func);
void unregister_repair_get_sync_boundary();
future<get_sync_boundary_response> send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary);
// Wrapper for REPAIR_GET_ROW_DIFF
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows)>&& func);
void unregister_repair_get_row_diff();
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows);
// Wrapper for REPAIR_PUT_ROW_DIFF
void register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff)>&& func);
void unregister_repair_put_row_diff();
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);
// Wrapper for REPAIR_ROW_LEVEL_START
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name)>&& func);
void unregister_repair_row_level_start();
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name);
// Wrapper for REPAIR_ROW_LEVEL_STOP
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);
void unregister_repair_row_level_stop();
future<> send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range);
// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS
void register_repair_get_estimated_partitions(std::function<future<uint64_t> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
void unregister_repair_get_estimated_partitions();
future<uint64_t> send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id);
// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS
void register_repair_set_estimated_partitions(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions)>&& func);
void unregister_repair_set_estimated_partitions();
future<> send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions);
// Wrapper for REPAIR_GET_DIFF_ALGORITHMS
void register_repair_get_diff_algorithms(std::function<future<std::vector<row_level_diff_detect_algorithm>> (const rpc::client_info& cinfo)>&& func);
void unregister_repair_get_diff_algorithms();
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
// Wrapper for GOSSIP_ECHO verb
void register_gossip_echo(std::function<future<> ()>&& func);
void unregister_gossip_echo();

View File

@@ -35,6 +35,8 @@ struct msg_addr {
struct hash {
size_t operator()(const msg_addr& id) const;
};
explicit msg_addr(gms::inet_address ip) : addr(ip), cpu_id(0) { }
msg_addr(gms::inet_address ip, uint32_t cpu) : addr(ip), cpu_id(cpu) { }
};
}

View File

@@ -65,7 +65,7 @@ std::ostream& operator<<(std::ostream& out, position_in_partition_view pos) {
} else {
out << "null";
}
return out << "," << pos._bound_weight << "}";
return out << "," << int8_t(pos._bound_weight) << "}";
}
std::ostream& operator<<(std::ostream& out, const position_in_partition& pos) {

View File

@@ -799,7 +799,7 @@ public:
return _row;
}
position_in_partition_view position() const {
return position_in_partition_view(partition_region::clustered, _flags._after_ck - _flags._before_ck, &_key);
return position_in_partition_view(partition_region::clustered, bound_weight(_flags._after_ck - _flags._before_ck), &_key);
}
is_continuous continuous() const { return is_continuous(_flags._continuous); }

View File

@@ -26,6 +26,7 @@
#include "clustering_bounds_comparator.hh"
#include "query-request.hh"
#include <optional>
#include <boost/icl/interval_set.hpp>
inline
@@ -53,20 +54,26 @@ lexicographical_relation relation_for_upper_bound(composite_view v) {
abort();
}
enum class bound_weight : int8_t {
before_all_prefixed = -1,
equal = 0,
after_all_prefixed = 1,
};
inline
int position_weight(bound_kind k) {
bound_weight position_weight(bound_kind k) {
switch(k) {
case bound_kind::excl_end:
case bound_kind::incl_start:
return -1;
return bound_weight::before_all_prefixed;
case bound_kind::incl_end:
case bound_kind::excl_start:
return 1;
return bound_weight::after_all_prefixed;
}
abort();
}
enum class partition_region {
enum class partition_region : uint8_t {
partition_start,
static_row,
clustered,
@@ -77,19 +84,19 @@ class position_in_partition_view {
friend class position_in_partition;
partition_region _type;
int _bound_weight = 0;
bound_weight _bound_weight = bound_weight::equal;
const clustering_key_prefix* _ck; // nullptr when _type != clustered
public:
position_in_partition_view(partition_region type, int bound_weight, const clustering_key_prefix* ck)
position_in_partition_view(partition_region type, bound_weight weight, const clustering_key_prefix* ck)
: _type(type)
, _bound_weight(bound_weight)
, _bound_weight(weight)
, _ck(ck)
{ }
bool is_before_key() const {
return _bound_weight < 0;
return _bound_weight == bound_weight::before_all_prefixed;
}
bool is_after_key() const {
return _bound_weight > 0;
return _bound_weight == bound_weight::after_all_prefixed;
}
private:
// Returns placement of this position_in_partition relative to *_ck,
@@ -99,10 +106,10 @@ private:
// includes just the prefix key or a range start which excludes just a prefix key.
// In both cases we should return lexicographical_relation::before_all_strictly_prefixed here.
// Refs #1446.
if (_bound_weight <= 0) {
return lexicographical_relation::before_all_prefixed;
} else {
if (_bound_weight == bound_weight::after_all_prefixed) {
return lexicographical_relation::after_all_prefixed;
} else {
return lexicographical_relation::before_all_prefixed;
}
}
public:
@@ -148,24 +155,24 @@ public:
}
static position_in_partition_view after_key(const clustering_key& ck) {
return {partition_region::clustered, 1, &ck};
return {partition_region::clustered, bound_weight::after_all_prefixed, &ck};
}
bool is_partition_start() const { return _type == partition_region::partition_start; }
bool is_partition_end() const { return _type == partition_region::partition_end; }
bool is_static_row() const { return _type == partition_region::static_row; }
bool is_clustering_row() const { return has_clustering_key() && !_bound_weight; }
bool is_clustering_row() const { return has_clustering_key() && _bound_weight == bound_weight::equal; }
bool has_clustering_key() const { return _type == partition_region::clustered; }
// Returns true if all fragments that can be seen for given schema have
// positions >= than this. partition_start is ignored.
bool is_before_all_fragments(const schema& s) const {
return _type == partition_region::partition_start || _type == partition_region::static_row
|| (_type == partition_region::clustered && !s.has_static_columns() && _bound_weight < 0 && key().is_empty(s));
|| (_type == partition_region::clustered && !s.has_static_columns() && _bound_weight == bound_weight::before_all_prefixed && key().is_empty(s));
}
bool is_after_all_clustered_rows(const schema& s) const {
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight > 0);
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight == bound_weight::after_all_prefixed);
}
// Valid when >= before_all_clustered_rows()
@@ -175,13 +182,13 @@ public:
// Can be called only when !is_static_row && !is_clustering_row().
bound_view as_start_bound_view() const {
assert(_bound_weight != 0);
return bound_view(*_ck, _bound_weight < 0 ? bound_kind::incl_start : bound_kind::excl_start);
assert(_bound_weight != bound_weight::equal);
return bound_view(*_ck, _bound_weight == bound_weight::before_all_prefixed ? bound_kind::incl_start : bound_kind::excl_start);
}
bound_view as_end_bound_view() const {
assert(_bound_weight != 0);
return bound_view(*_ck, _bound_weight < 0 ? bound_kind::excl_end : bound_kind::incl_end);
assert(_bound_weight != bound_weight::equal);
return bound_view(*_ck, _bound_weight == bound_weight::before_all_prefixed ? bound_kind::excl_end : bound_kind::incl_end);
}
friend std::ostream& operator<<(std::ostream&, position_in_partition_view);
@@ -190,8 +197,8 @@ public:
class position_in_partition {
partition_region _type;
int _bound_weight = 0;
stdx::optional<clustering_key_prefix> _ck;
bound_weight _bound_weight = bound_weight::equal;
std::optional<clustering_key_prefix> _ck;
public:
friend class clustering_interval_set;
struct partition_start_tag_t { };
@@ -203,7 +210,11 @@ public:
struct before_clustering_row_tag_t { };
struct range_tag_t { };
using range_tombstone_tag_t = range_tag_t;
partition_region get_type() const { return _type; }
bound_weight get_bound_weight() const { return _bound_weight; }
const std::optional<clustering_key_prefix>& get_clustering_key_prefix() const { return _ck; }
position_in_partition(partition_region type, bound_weight weight, std::optional<clustering_key_prefix> ck)
: _type(type), _bound_weight(weight), _ck(std::move(ck)) { }
explicit position_in_partition(partition_start_tag_t) : _type(partition_region::partition_start) { }
explicit position_in_partition(end_of_partition_tag_t) : _type(partition_region::partition_end) { }
explicit position_in_partition(static_row_tag_t) : _type(partition_region::static_row) { }
@@ -211,13 +222,13 @@ public:
: _type(partition_region::clustered), _ck(std::move(ck)) { }
position_in_partition(after_clustering_row_tag_t, clustering_key_prefix ck)
// FIXME: Use lexicographical_relation::before_strictly_prefixed here. Refs #1446
: _type(partition_region::clustered), _bound_weight(1), _ck(std::move(ck)) { }
: _type(partition_region::clustered), _bound_weight(bound_weight::after_all_prefixed), _ck(std::move(ck)) { }
position_in_partition(after_clustering_row_tag_t, position_in_partition_view pos)
: _type(partition_region::clustered)
, _bound_weight(pos._bound_weight ? pos._bound_weight : 1)
, _bound_weight(pos._bound_weight != bound_weight::equal ? pos._bound_weight : bound_weight::after_all_prefixed)
, _ck(*pos._ck) { }
position_in_partition(before_clustering_row_tag_t, clustering_key_prefix ck)
: _type(partition_region::clustered), _bound_weight(-1), _ck(std::move(ck)) { }
: _type(partition_region::clustered), _bound_weight(bound_weight::before_all_prefixed), _ck(std::move(ck)) { }
position_in_partition(range_tag_t, bound_view bv)
: _type(partition_region::clustered), _bound_weight(position_weight(bv.kind())), _ck(bv.prefix()) { }
position_in_partition(range_tag_t, bound_kind kind, clustering_key_prefix&& prefix)
@@ -274,16 +285,16 @@ public:
bool is_partition_start() const { return _type == partition_region::partition_start; }
bool is_partition_end() const { return _type == partition_region::partition_end; }
bool is_static_row() const { return _type == partition_region::static_row; }
bool is_clustering_row() const { return has_clustering_key() && !_bound_weight; }
bool is_clustering_row() const { return has_clustering_key() && _bound_weight == bound_weight::equal; }
bool has_clustering_key() const { return _type == partition_region::clustered; }
bool is_after_all_clustered_rows(const schema& s) const {
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight > 0);
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight == bound_weight::after_all_prefixed);
}
bool is_before_all_clustered_rows(const schema& s) const {
return _type < partition_region::clustered
|| (_type == partition_region::clustered && _ck->is_empty(s) && _bound_weight < 0);
|| (_type == partition_region::clustered && _ck->is_empty(s) && _bound_weight == bound_weight::before_all_prefixed);
}
template<typename Hasher>
@@ -404,7 +415,7 @@ public:
if (!a._ck) {
return 0;
}
return _cmp(*a._ck, a._bound_weight, *b._ck, b._bound_weight);
return _cmp(*a._ck, int8_t(a._bound_weight), *b._ck, int8_t(b._bound_weight));
}
public:
tri_compare(const schema& s) : _cmp(s) { }
@@ -485,7 +496,7 @@ inline
bool no_clustering_row_between(const schema& s, position_in_partition_view a, position_in_partition_view b) {
clustering_key_prefix::equality eq(s);
if (a._ck && b._ck) {
return eq(*a._ck, *b._ck) && (a._bound_weight >= 0 || b._bound_weight <= 0);
return eq(*a._ck, *b._ck) && (a._bound_weight != bound_weight::before_all_prefixed || b._bound_weight != bound_weight::after_all_prefixed);
} else {
return !a._ck && !b._ck;
}
@@ -588,8 +599,8 @@ public:
query::clustering_row_ranges result;
for (position_range r : *this) {
result.push_back(query::clustering_range::make(
{r.start().key(), r.start()._bound_weight <= 0},
{r.end().key(), r.end()._bound_weight > 0}));
{r.start().key(), r.start()._bound_weight != bound_weight::after_all_prefixed},
{r.end().key(), r.end()._bound_weight == bound_weight::after_all_prefixed}));
}
return result;
}

View File

@@ -20,6 +20,7 @@
*/
#include "repair.hh"
#include "repair/row_level.hh"
#include "range_split.hh"
#include "atomic_cell_hash.hh"
@@ -44,142 +45,7 @@
#include <seastar/core/gate.hh>
#include <seastar/util/defer.hh>
static logging::logger rlogger("repair");
class repair_info {
public:
seastar::sharded<database>& db;
sstring keyspace;
dht::token_range_vector ranges;
std::vector<sstring> cfs;
int id;
shard_id shard;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
size_t nr_failed_ranges = 0;
bool aborted = false;
// Map of peer -> <cf, ranges>
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_in;
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_out;
// FIXME: this "100" needs to be a parameter.
uint64_t target_partitions = 100;
// This affects how many ranges we put in a stream plan. The more the more
// memory we use to store the ranges in memory. However, it can reduce the
// total number of stream_plan we use for the repair.
size_t sub_ranges_to_stream = 10 * 1024;
size_t sp_index = 0;
size_t current_sub_ranges_nr_in = 0;
size_t current_sub_ranges_nr_out = 0;
int ranges_index = 0;
// Only allow one stream_plan in flight
semaphore sp_parallelism_semaphore{1};
lw_shared_ptr<streaming::stream_plan> _sp_in;
lw_shared_ptr<streaming::stream_plan> _sp_out;
public:
repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
const dht::token_range_vector& ranges_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_)
: db(db_)
, keyspace(keyspace_)
, ranges(ranges_)
, cfs(cfs_)
, id(id_)
, shard(engine().cpu_id())
, data_centers(data_centers_)
, hosts(hosts_) {
}
future<> do_streaming() {
size_t ranges_in = 0;
size_t ranges_out = 0;
_sp_in = make_lw_shared<streaming::stream_plan>(format("repair-in-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
_sp_out = make_lw_shared<streaming::stream_plan>(format("repair-out-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
for (auto& x : ranges_need_repair_in) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_in += stream_ranges.size();
_sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_in.clear();
current_sub_ranges_nr_in = 0;
for (auto& x : ranges_need_repair_out) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_out += stream_ranges.size();
_sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_out.clear();
current_sub_ranges_nr_out = 0;
if (ranges_in || ranges_out) {
rlogger.info("Start streaming for repair id={}, shard={}, index={}, ranges_in={}, ranges_out={}", id, shard, sp_index, ranges_in, ranges_out);
}
sp_index++;
return _sp_in->execute().discard_result().then([this, sp_in = _sp_in, sp_out = _sp_out] {
return _sp_out->execute().discard_result();
}).handle_exception([this] (auto ep) {
rlogger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
}).finally([this] {
_sp_in = {};
_sp_out = {};
});
}
void check_failed_ranges() {
if (nr_failed_ranges) {
rlogger.info("repair {} on shard {} failed - {} ranges failed", id, shard, nr_failed_ranges);
throw std::runtime_error(format("repair {:d} on shard {:d} failed to do checksum for {:d} sub ranges", id, shard, nr_failed_ranges));
} else {
rlogger.info("repair {} on shard {} completed successfully", id, shard);
}
}
future<> request_transfer_ranges(const sstring& cf,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out) {
rlogger.debug("Add cf {}, range {}, current_sub_ranges_nr_in {}, current_sub_ranges_nr_out {}", cf, range, current_sub_ranges_nr_in, current_sub_ranges_nr_out);
return seastar::with_semaphore(sp_parallelism_semaphore, 1, [this, cf, range, neighbors_in, neighbors_out] {
for (const auto& peer : neighbors_in) {
ranges_need_repair_in[peer][cf].emplace_back(range);
current_sub_ranges_nr_in++;
}
for (const auto& peer : neighbors_out) {
ranges_need_repair_out[peer][cf].emplace_back(range);
current_sub_ranges_nr_out++;
}
if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) {
return do_streaming();
}
return make_ready_future<>();
});
}
void abort() {
if (_sp_in) {
_sp_in->abort();
}
if (_sp_out) {
_sp_out->abort();
}
aborted = true;
}
void check_in_abort() {
if (aborted) {
throw std::runtime_error(format("repair id {:d} is aborted on shard {:d}", id, shard));
}
}
};
logging::logger rlogger("repair");
template <typename T1, typename T2>
inline
@@ -198,6 +64,14 @@ static std::ostream& operator<<(std::ostream& os, const std::unordered_map<T1, T
return os;
}
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo) {
switch (algo) {
case row_level_diff_detect_algorithm::send_full_set:
return out << "send_full_set";
};
return out << "unknown";
}
static std::vector<sstring> list_column_families(const database& db, const sstring& keyspace) {
std::vector<sstring> ret;
for (auto &&e : db.get_column_families_mapping()) {
@@ -748,7 +622,7 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
constexpr int parallelism = 100;
static thread_local semaphore parallelism_semaphore(parallelism);
static future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
const sstring& cf, const dht::token_range& range) {
return db.map_reduce0(
[keyspace, cf, range] (auto& db) {
@@ -766,6 +640,118 @@ static future<uint64_t> estimate_partitions(seastar::sharded<database>& db, cons
);
}
repair_info::repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
const dht::token_range_vector& ranges_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_)
: db(db_)
, keyspace(keyspace_)
, ranges(ranges_)
, cfs(cfs_)
, id(id_)
, shard(engine().cpu_id())
, data_centers(data_centers_)
, hosts(hosts_)
, _row_level_repair(service::get_local_storage_service().cluster_supports_row_level_repair()) {
}
future<> repair_info::do_streaming() {
size_t ranges_in = 0;
size_t ranges_out = 0;
_sp_in = make_lw_shared<streaming::stream_plan>(format("repair-in-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
_sp_out = make_lw_shared<streaming::stream_plan>(format("repair-out-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
for (auto& x : ranges_need_repair_in) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_in += stream_ranges.size();
_sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_in.clear();
current_sub_ranges_nr_in = 0;
for (auto& x : ranges_need_repair_out) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_out += stream_ranges.size();
_sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_out.clear();
current_sub_ranges_nr_out = 0;
if (ranges_in || ranges_out) {
rlogger.info("Start streaming for repair id={}, shard={}, index={}, ranges_in={}, ranges_out={}", id, shard, sp_index, ranges_in, ranges_out);
}
sp_index++;
return _sp_in->execute().discard_result().then([this, sp_in = _sp_in, sp_out = _sp_out] {
return _sp_out->execute().discard_result();
}).handle_exception([this] (auto ep) {
rlogger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
}).finally([this] {
_sp_in = {};
_sp_out = {};
});
}
void repair_info::check_failed_ranges() {
rlogger.info("repair {} on shard {} stats: ranges_nr={}, sub_ranges_nr={}, {}",
id, shard, ranges.size(), _sub_ranges_nr, _stats.get_stats());
if (nr_failed_ranges) {
rlogger.info("repair {} on shard {} failed - {} ranges failed", id, shard, nr_failed_ranges);
throw std::runtime_error(format("repair {:d} on shard {:d} failed to do checksum for {:d} sub ranges", id, shard, nr_failed_ranges));
} else {
rlogger.info("repair {} on shard {} completed successfully", id, shard);
}
}
future<> repair_info::request_transfer_ranges(const sstring& cf,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out) {
rlogger.debug("Add cf {}, range {}, current_sub_ranges_nr_in {}, current_sub_ranges_nr_out {}", cf, range, current_sub_ranges_nr_in, current_sub_ranges_nr_out);
return seastar::with_semaphore(sp_parallelism_semaphore, 1, [this, cf, range, neighbors_in, neighbors_out] {
for (const auto& peer : neighbors_in) {
ranges_need_repair_in[peer][cf].emplace_back(range);
current_sub_ranges_nr_in++;
}
for (const auto& peer : neighbors_out) {
ranges_need_repair_out[peer][cf].emplace_back(range);
current_sub_ranges_nr_out++;
}
if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) {
return do_streaming();
}
return make_ready_future<>();
});
}
void repair_info::abort() {
if (_sp_in) {
_sp_in->abort();
}
if (_sp_out) {
_sp_out->abort();
}
aborted = true;
}
void repair_info::check_in_abort() {
if (aborted) {
throw std::runtime_error(format("repair id {:d} is aborted on shard {:d}", id, shard));
}
}
// Repair a single cf in a single local range.
// Comparable to RepairJob in Origin.
static future<> repair_cf_range(repair_info& ri,
@@ -976,7 +962,12 @@ static future<> repair_range(repair_info& ri, const dht::token_range& range) {
return do_with(get_neighbors(ri.db.local(), ri.keyspace, range, ri.data_centers, ri.hosts), [&ri, range, id] (const auto& neighbors) {
rlogger.debug("[repair #{}] new session: will sync {} on range {} for {}.{}", id, neighbors, range, ri.keyspace, ri.cfs);
return do_for_each(ri.cfs.begin(), ri.cfs.end(), [&ri, &neighbors, range] (auto&& cf) {
return repair_cf_range(ri, cf, range, neighbors);
ri._sub_ranges_nr++;
if (ri.row_level_repair()) {
return repair_cf_range_row_level(ri, cf, range, neighbors);
} else {
return repair_cf_range(ri, cf, range, neighbors);
}
});
});
}
@@ -1020,6 +1011,67 @@ static sstring get_local_dc() {
utils::fb_utilities::get_broadcast_address());
}
void repair_stats::add(const repair_stats& o) {
round_nr += o.round_nr;
round_nr_fast_path_already_synced += o.round_nr_fast_path_already_synced;
round_nr_fast_path_same_combined_hashes += o.round_nr_fast_path_same_combined_hashes;
round_nr_slow_path += o.round_nr_slow_path;
rpc_call_nr += o.rpc_call_nr;
tx_hashes_nr += o.tx_hashes_nr;
rx_hashes_nr += o.rx_hashes_nr;
tx_row_nr += o.tx_row_nr;
rx_row_nr += o.rx_row_nr;
tx_row_bytes += o.tx_row_bytes;
rx_row_bytes += o.rx_row_bytes;
auto add_map = [] (auto& target, auto& src) {
for (const auto& [k, v] : src) {
target[k] += v;
}
};
add_map(row_from_disk_bytes, o.row_from_disk_bytes);
add_map(row_from_disk_nr, o.row_from_disk_nr);
add_map(tx_row_nr_peer, o.tx_row_nr_peer);
add_map(rx_row_nr_peer, o.rx_row_nr_peer);
}
sstring repair_stats::get_stats() {
std::map<gms::inet_address, float> row_from_disk_bytes_per_sec;
std::map<gms::inet_address, float> row_from_disk_rows_per_sec;
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(lowres_clock::now() - start_time).count();
for (auto& x : row_from_disk_bytes) {
if (std::fabs(duration) > FLT_EPSILON) {
row_from_disk_bytes_per_sec[x.first] = x.second / duration / 1024 / 1024;
} else {
row_from_disk_bytes_per_sec[x.first] = 0;
}
}
for (auto& x : row_from_disk_nr) {
if (std::fabs(duration) > FLT_EPSILON) {
row_from_disk_rows_per_sec[x.first] = x.second / duration;
} else {
row_from_disk_rows_per_sec[x.first] = 0;
}
}
return format("round_nr={}, round_nr_fast_path_already_synced={}, round_nr_fast_path_same_combined_hashes={}, round_nr_slow_path={}, rpc_call_nr={}, tx_hashes_nr={}, rx_hashes_nr={}, duration={} seconds, tx_row_nr={}, rx_row_nr={}, tx_row_bytes={}, rx_row_bytes={}, row_from_disk_bytes={}, row_from_disk_nr={}, row_from_disk_bytes_per_sec={} MiB/s, row_from_disk_rows_per_sec={} Rows/s, tx_row_nr_peer={}, rx_row_nr_peer={}",
round_nr,
round_nr_fast_path_already_synced,
round_nr_fast_path_same_combined_hashes,
round_nr_slow_path,
rpc_call_nr,
tx_hashes_nr,
rx_hashes_nr,
duration,
tx_row_nr,
rx_row_nr,
tx_row_bytes,
rx_row_bytes,
row_from_disk_bytes,
row_from_disk_nr,
row_from_disk_bytes_per_sec,
row_from_disk_rows_per_sec,
tx_row_nr_peer,
rx_row_nr_peer);
}
struct repair_options {
// If primary_range is true, we should perform repair only on this node's
@@ -1194,38 +1246,58 @@ private:
}
};
static thread_local semaphore ranges_parallelism_semaphore(16);
static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
if (ri->row_level_repair()) {
// repair all the ranges in limited parallelism
return parallel_for_each(ri->ranges, [ri] (auto&& range) {
return with_semaphore(ranges_parallelism_semaphore, 1, [ri, &range] {
ri->check_in_abort();
ri->ranges_index++;
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->cfs, range);
return repair_range(*ri, range);
});
});
} else {
// repair all the ranges in sequence
return do_for_each(ri->ranges, [ri] (auto&& range) {
ri->check_in_abort();
ri->ranges_index++;
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->cfs, range);
return do_with(dht::selective_token_range_sharder(range, ri->shard), [ri] (auto& sharder) {
return repeat([ri, &sharder] () {
check_in_shutdown();
ri->check_in_abort();
auto range_shard = sharder.next();
if (range_shard) {
return repair_range(*ri, *range_shard).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).then([ri] {
// Do streaming for the remaining ranges we do not stream in
// repair_cf_range
ri->check_in_abort();
return ri->do_streaming();
});
}
}
// repair_ranges repairs a list of token ranges, each assumed to be a token
// range for which this node holds a replica, and, importantly, each range
// is assumed to be a indivisible in the sense that all the tokens in has the
// same nodes as replicas.
static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
repair_tracker.add_repair_info(ri->id, ri);
// repair all the ranges in sequence
return do_for_each(ri->ranges, [ri] (auto&& range) {
ri->check_in_abort();
ri->ranges_index++;
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->cfs, range);
return do_with(dht::selective_token_range_sharder(range, ri->shard), [ri] (auto& sharder) {
return repeat([ri, &sharder] () {
check_in_shutdown();
ri->check_in_abort();
auto range_shard = sharder.next();
if (range_shard) {
return repair_range(*ri, *range_shard).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).then([ri] {
// Do streaming for the remaining ranges we do not stream in
// repair_cf_range
ri->check_in_abort();
return ri->do_streaming();
}).then([ri] {
return do_repair_ranges(ri).then([ri] {
ri->check_failed_ranges();
repair_tracker.remove_repair_info(ri->id);
return make_ready_future<>();

View File

@@ -31,7 +31,7 @@
#include "database_fwd.hh"
#include "flat_mutation_reader.hh"
#include "utils/UUID.hh"
#include "streaming/stream_plan.hh"
class repair_exception : public std::exception {
private:
@@ -116,6 +116,191 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::dht::token_range& range, repair_checksum rt);
class repair_stats {
public:
uint64_t round_nr = 0;
uint64_t round_nr_fast_path_already_synced = 0;
uint64_t round_nr_fast_path_same_combined_hashes= 0;
uint64_t round_nr_slow_path = 0;
uint64_t rpc_call_nr = 0;
uint64_t tx_hashes_nr = 0;
uint64_t rx_hashes_nr = 0;
uint64_t tx_row_nr = 0;
uint64_t rx_row_nr = 0;
uint64_t tx_row_bytes = 0;
uint64_t rx_row_bytes = 0;
std::map<gms::inet_address, uint64_t> row_from_disk_bytes;
std::map<gms::inet_address, uint64_t> row_from_disk_nr;
std::map<gms::inet_address, uint64_t> tx_row_nr_peer;
std::map<gms::inet_address, uint64_t> rx_row_nr_peer;
lowres_clock::time_point start_time = lowres_clock::now();
public:
void add(const repair_stats& o);
sstring get_stats();
};
class repair_info {
public:
seastar::sharded<database>& db;
sstring keyspace;
dht::token_range_vector ranges;
std::vector<sstring> cfs;
int id;
shard_id shard;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
size_t nr_failed_ranges = 0;
bool aborted = false;
// Map of peer -> <cf, ranges>
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_in;
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_out;
// FIXME: this "100" needs to be a parameter.
uint64_t target_partitions = 100;
// This affects how many ranges we put in a stream plan. The more the more
// memory we use to store the ranges in memory. However, it can reduce the
// total number of stream_plan we use for the repair.
size_t sub_ranges_to_stream = 10 * 1024;
size_t sp_index = 0;
size_t current_sub_ranges_nr_in = 0;
size_t current_sub_ranges_nr_out = 0;
int ranges_index = 0;
// Only allow one stream_plan in flight
semaphore sp_parallelism_semaphore{1};
lw_shared_ptr<streaming::stream_plan> _sp_in;
lw_shared_ptr<streaming::stream_plan> _sp_out;
repair_stats _stats;
bool _row_level_repair;
uint64_t _sub_ranges_nr = 0;
public:
repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
const dht::token_range_vector& ranges_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_);
future<> do_streaming();
void check_failed_ranges();
future<> request_transfer_ranges(const sstring& cf,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out);
void abort();
void check_in_abort();
void update_statistics(const repair_stats& stats) {
_stats.add(stats);
}
bool row_level_repair() {
return _row_level_repair;
}
};
future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
const sstring& cf, const dht::token_range& range);
// Represent a position of a mutation_fragment read from a flat mutation
// reader. Repair nodes negotiate a small range identified by two
// repair_sync_boundary to work on in each round.
struct repair_sync_boundary {
dht::decorated_key pk;
position_in_partition position;
class tri_compare {
dht::ring_position_comparator _pk_cmp;
position_in_partition::tri_compare _position_cmp;
public:
tri_compare(const schema& s) : _pk_cmp(s), _position_cmp(s) { }
int operator()(const repair_sync_boundary& a, const repair_sync_boundary& b) const {
int ret = _pk_cmp(a.pk, b.pk);
if (ret == 0) {
ret = _position_cmp(a.position, b.position);
}
return ret;
}
};
friend std::ostream& operator<<(std::ostream& os, const repair_sync_boundary& x) {
return os << "{ " << x.pk << "," << x.position << " }";
}
};
// Hash of a repair row
class repair_hash {
public:
uint64_t hash = 0;
repair_hash() = default;
explicit repair_hash(uint64_t h) : hash(h) {
}
void clear() {
hash = 0;
}
void add(const repair_hash& other) {
hash ^= other.hash;
}
bool operator==(const repair_hash& x) const {
return x.hash == hash;
}
bool operator!=(const repair_hash& x) const {
return x.hash != hash;
}
bool operator<(const repair_hash& x) const {
return x.hash < hash;
}
friend std::ostream& operator<<(std::ostream& os, const repair_hash& x) {
return os << x.hash;
}
};
// Return value of the REPAIR_GET_SYNC_BOUNDARY RPC verb
struct get_sync_boundary_response {
std::optional<repair_sync_boundary> boundary;
repair_hash row_buf_combined_csum;
// The current size of the row buf
uint64_t row_buf_size;
// The number of bytes this verb read from disk
uint64_t new_rows_size;
// The number of rows this verb read from disk
uint64_t new_rows_nr;
};
struct node_repair_meta_id {
gms::inet_address ip;
uint32_t repair_meta_id;
bool operator==(const node_repair_meta_id& x) const {
return x.ip == ip && x.repair_meta_id == repair_meta_id;
}
};
// Represent a partition_key and frozen_mutation_fragments within the partition_key.
class partition_key_and_mutation_fragments {
partition_key _key;
std::list<frozen_mutation_fragment> _mfs;
public:
partition_key_and_mutation_fragments(partition_key key, std::list<frozen_mutation_fragment> mfs)
: _key(std::move(key))
, _mfs(std::move(mfs)) {
}
const partition_key& get_key() const { return _key; }
const std::list<frozen_mutation_fragment>& get_mutation_fragments() const { return _mfs; }
partition_key& get_key() { return _key; }
std::list<frozen_mutation_fragment>& get_mutation_fragments() { return _mfs; }
void push_mutation_fragment(frozen_mutation_fragment mf) { _mfs.push_back(std::move(mf)); }
};
using repair_rows_on_wire = std::list<partition_key_and_mutation_fragments>;
enum class row_level_diff_detect_algorithm : uint8_t {
send_full_set,
};
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo);
namespace std {
template<>
struct hash<partition_checksum> {
@@ -125,4 +310,15 @@ struct hash<partition_checksum> {
return h;
}
};
template<>
struct hash<repair_hash> {
size_t operator()(repair_hash h) const { return h.hash; }
};
template<>
struct hash<node_repair_meta_id> {
size_t operator()(node_repair_meta_id id) const { return utils::tuple_hash()(id.ip, id.repair_meta_id); }
};
}

1558
repair/row_level.cc Normal file

File diff suppressed because it is too large Load Diff

33
repair/row_level.hh Normal file
View File

@@ -0,0 +1,33 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <vector>
#include "gms/inet_address.hh"
future<> repair_init_messaging_service_handler();
class repair_info;
future<> repair_cf_range_row_level(repair_info& ri,
sstring cf_name, dht::token_range range,
const std::vector<gms::inet_address>& all_peer_nodes);

View File

@@ -21,10 +21,13 @@
#pragma once
#include <vector>
#include <unordered_set>
#include <list>
#include <array>
#include <seastar/core/sstring.hh>
#include <unordered_map>
#include <experimental/optional>
#include <optional>
#include "enum_set.hh"
#include "utils/managed_bytes.hh"
#include "bytes_ostream.hh"

View File

@@ -81,6 +81,31 @@ static inline void serialize_array(Output& out, const Container& v) {
template<typename Container>
struct container_traits;
template<typename T>
struct container_traits<std::unordered_set<T>> {
struct back_emplacer {
std::unordered_set<T>& c;
back_emplacer(std::unordered_set<T>& c_) : c(c_) {}
void operator()(T&& v) {
c.emplace(std::move(v));
}
};
};
template<typename T>
struct container_traits<std::list<T>> {
struct back_emplacer {
std::list<T>& c;
back_emplacer(std::list<T>& c_) : c(c_) {}
void operator()(T&& v) {
c.emplace_back(std::move(v));
}
};
void resize(std::list<T>& c, size_t size) {
c.resize(size);
}
};
template<typename T>
struct container_traits<std::vector<T>> {
struct back_emplacer {
@@ -207,6 +232,49 @@ struct vector_serializer {
}
template<typename T>
struct serializer<std::list<T>> {
template<typename Input>
static std::list<T> read(Input& in) {
auto sz = deserialize(in, boost::type<uint32_t>());
std::list<T> v;
deserialize_array_helper<false, T>::doit(in, v, sz);
return v;
}
template<typename Output>
static void write(Output& out, const std::list<T>& v) {
safe_serialize_as_uint32(out, v.size());
serialize_array_helper<false, T>::doit(out, v);
}
template<typename Input>
static void skip(Input& in) {
auto sz = deserialize(in, boost::type<uint32_t>());
skip_array<T>(in, sz);
}
};
template<typename T>
struct serializer<std::unordered_set<T>> {
template<typename Input>
static std::unordered_set<T> read(Input& in) {
auto sz = deserialize(in, boost::type<uint32_t>());
std::unordered_set<T> v;
v.reserve(sz);
deserialize_array_helper<false, T>::doit(in, v, sz);
return v;
}
template<typename Output>
static void write(Output& out, const std::unordered_set<T>& v) {
safe_serialize_as_uint32(out, v.size());
serialize_array_helper<false, T>::doit(out, v);
}
template<typename Input>
static void skip(Input& in) {
auto sz = deserialize(in, boost::type<uint32_t>());
skip_array<T>(in, sz);
}
};
template<typename T>
struct serializer<std::vector<T>>
: idl::serializers::internal::vector_serializer<std::vector<T>>
@@ -472,6 +540,33 @@ void serialize_fragmented(Output& out, FragmentedBuffer&& v) {
serializer<bytes>::write_fragmented(out, std::forward<FragmentedBuffer>(v));
}
template<typename T>
struct serializer<std::optional<T>> {
template<typename Input>
static std::optional<T> read(Input& in) {
std::optional<T> v;
auto b = deserialize(in, boost::type<bool>());
if (b) {
v = deserialize(in, boost::type<T>());
}
return v;
}
template<typename Output>
static void write(Output& out, const std::optional<T>& v) {
serialize(out, bool(v));
if (v) {
serialize(out, v.value());
}
}
template<typename Input>
static void skip(Input& in) {
auto present = deserialize(in, boost::type<bool>());
if (present) {
serializer<T>::skip(in);
}
}
};
template<typename T>
struct serializer<std::experimental::optional<T>> {
template<typename Input>

View File

@@ -103,6 +103,7 @@ static const sstring ROLES_FEATURE = "ROLES";
static const sstring LA_SSTABLE_FEATURE = "LA_SSTABLE_FORMAT";
static const sstring STREAM_WITH_RPC_STREAM = "STREAM_WITH_RPC_STREAM";
static const sstring MC_SSTABLE_FEATURE = "MC_SSTABLE_FORMAT";
static const sstring ROW_LEVEL_REPAIR = "ROW_LEVEL_REPAIR";
distributed<storage_service> _the_storage_service;
@@ -146,6 +147,7 @@ storage_service::storage_service(distributed<database>& db, sharded<auth::servic
, _la_sstable_feature(_feature_service, LA_SSTABLE_FEATURE)
, _stream_with_rpc_stream_feature(_feature_service, STREAM_WITH_RPC_STREAM)
, _mc_sstable_feature(_feature_service, MC_SSTABLE_FEATURE)
, _row_level_repair_feature(_feature_service, ROW_LEVEL_REPAIR)
, _replicate_action([this] { return do_replicate_to_all_cores(); })
, _update_pending_ranges_action([this] { return do_update_pending_ranges(); })
, _sys_dist_ks(sys_dist_ks) {
@@ -172,6 +174,7 @@ void storage_service::enable_all_features() {
_la_sstable_feature.enable();
_stream_with_rpc_stream_feature.enable();
_mc_sstable_feature.enable();
_row_level_repair_feature.enable();
}
enum class node_external_status {
@@ -247,7 +250,8 @@ sstring storage_service::get_config_supported_features() {
LA_SSTABLE_FEATURE,
STREAM_WITH_RPC_STREAM,
MATERIALIZED_VIEWS_FEATURE,
INDEXES_FEATURE
INDEXES_FEATURE,
ROW_LEVEL_REPAIR
};
auto& config = service::get_local_storage_service()._db.local().get_config();
if (config.enable_sstables_mc_format()) {

View File

@@ -295,6 +295,7 @@ private:
gms::feature _la_sstable_feature;
gms::feature _stream_with_rpc_stream_feature;
gms::feature _mc_sstable_feature;
gms::feature _row_level_repair_feature;
public:
void enable_all_features();
@@ -2281,6 +2282,10 @@ public:
bool cluster_supports_mc_sstable() const {
return bool(_mc_sstable_feature);
}
bool cluster_supports_row_level_repair() const {
return bool(_row_level_repair_feature);
}
private:
future<> set_cql_ready(bool ready);
private:

View File

@@ -68,39 +68,6 @@ namespace streaming {
logging::logger sslog("stream_session");
/*
* This reader takes a get_next_fragment generator that produces mutation_fragment_opt which is returned by
* generating_reader.
*
*/
class generating_reader final : public flat_mutation_reader::impl {
std::function<future<mutation_fragment_opt> ()> _get_next_fragment;
public:
generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt> ()> get_next_fragment)
: impl(std::move(s)), _get_next_fragment(std::move(get_next_fragment))
{ }
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return _get_next_fragment().then([this] (mutation_fragment_opt mopt) {
if (!mopt) {
_end_of_stream = true;
} else {
push_mutation_fragment(std::move(*mopt));
}
});
});
}
virtual void next_partition() override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
};
static auto get_stream_result_future(utils::UUID plan_id) {
auto& sm = get_local_stream_manager();
auto f = sm.get_sending_stream(plan_id);
@@ -235,7 +202,7 @@ void stream_session::init_messaging_service_handler() {
});
};
distribute_reader_and_consume_on_shards(s, dht::global_partitioner(),
make_flat_mutation_reader<generating_reader>(s, std::move(get_next_mutation_fragment)),
make_generating_reader(s, std::move(get_next_mutation_fragment)),
[cf_id, plan_id, estimated_partitions, reason] (flat_mutation_reader reader) {
auto& cf = service::get_local_storage_service().db().local().find_column_family(cf_id);

View File

@@ -32,8 +32,8 @@ class xx_hasher {
XXH64_state_t _state;
public:
xx_hasher() noexcept {
XXH64_reset(&_state, 0);
explicit xx_hasher(uint64_t seed = 0) noexcept {
XXH64_reset(&_state, seed);
}
void update(const char* ptr, size_t length) {