Merge 'merge raft commands to group0 before applying them whenever possible' from Gleb

Since most group0 commands are just mutations it is easy to combine them
before passing them to a subsystem they destined to since it is more
efficient. The logic that handles those mutations in a subsystem will
run once for each batch of commands instead of for each individual
command. This is especially useful when a node catches up to a leader and
gets a lot of commands together.

The patch here does exactly that. It combines commands into a single
command if possible, but it preserves an order between commands, so each
time it encounters a command to a different subsystem it flushes already
combined batch and starts a new one. This extra safety assumes that
there are dependencies between subsystems managed by group0, so the order
matters. It may be not the case now, but we prefer to be on a safe side.

Broadcast table commands are not mutations, so they are never combined.

* 'raft-merge-cmds' of https://github.com/gleb-cloudius/scylla:
  test: add test for group0 raft command merging
  service: raft: respect max mutation size limit when persisting raft entries
  group0_state_machine: merge commands before applying them whenever possible
This commit is contained in:
Kamil Braun
2023-06-28 17:21:07 +02:00
8 changed files with 270 additions and 35 deletions

View File

@@ -548,6 +548,7 @@ scylla_tests = set([
'test/boost/locator_topology_test',
'test/boost/string_format_test',
'test/boost/tagged_integer_test',
'test/boost/group0_cmd_merge_test',
'test/manual/ec2_snitch_test',
'test/manual/enormous_table_scan_test',
'test/manual/gce_snitch_test',

View File

@@ -316,6 +316,9 @@ future<fsm_output> fsm::poll_output() {
}
co_await _sm_events.wait();
}
while (utils::get_local_injector().enter("fsm::poll_output/pause")) {
co_await seastar::sleep(std::chrono::milliseconds(100));
}
co_return get_output();
}

View File

@@ -343,6 +343,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
}
future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector<canonical_mutation>& canonical_mutations) {
canonical_mutation_merge_count++;
mlogger.debug("Applying schema mutations from {}", src);
auto& proxy = _storage_proxy;
const auto& db = proxy.get_db().local();

View File

@@ -98,6 +98,8 @@ public:
// Merge mutations received from src.
// Keep mutations alive around whole async operation.
future<> merge_schema_from(netw::msg_addr src, const std::vector<canonical_mutation>& mutations);
// Incremented each time the function above is called. Needed by tests.
size_t canonical_mutation_merge_count = 0;
// Deprecated. The canonical mutation should be used instead.
future<> merge_schema_from(netw::msg_addr src, const std::vector<frozen_mutation>& mutations);

View File

@@ -12,6 +12,7 @@
#include "dht/token.hh"
#include "message/messaging_service.hh"
#include "mutation/canonical_mutation.hh"
#include "seastar/core/on_internal_error.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "schema_mutations.hh"
#include "frozen_schema.hh"
@@ -35,7 +36,9 @@
#include "partition_slice_builder.hh"
#include "timestamp.hh"
#include "utils/overloaded_functor.hh"
#include <boost/range/algorithm/transform.hpp>
#include <optional>
#include "db/config.hh"
namespace service {
@@ -59,6 +62,155 @@ static mutation convert_history_mutation(canonical_mutation m, const data_dictio
future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
slogger.trace("apply() is called with {} commands", command.size());
struct merger {
std::vector<group0_command> cmd_to_merge;
std::optional<mutation> merged_history_mutation;
utils::UUID last_group0_state_id;
group0_state_machine& sm;
size_t size = 0;
semaphore_units<> read_apply_mutex_holder;
const size_t max_command_size;
merger(group0_state_machine& sm_, utils::UUID id, semaphore_units<> mux) : last_group0_state_id(id)
, sm(sm_)
, read_apply_mutex_holder(std::move(mux))
, max_command_size(sm._sp.data_dictionary().get_config().commitlog_segment_size_in_mb() * 1024 * 1024 / 2) {}
size_t cmd_size(group0_command& cmd) {
if (holds_alternative<broadcast_table_query>(cmd.change)) {
return 0;
}
auto r = get_command_mutations(cmd) | boost::adaptors::transformed([] (const canonical_mutation& m) { return m.representation().size(); });
return std::accumulate(std::begin(r), std::end(r), size_t(0));
}
bool can_merge(group0_command& cmd, size_t s) {
if (!cmd_to_merge.empty()) {
// broadcast table commands or different type of commands cannot be merged
if (cmd_to_merge[0].change.index() != cmd.change.index() || holds_alternative<broadcast_table_query>(cmd.change)) {
return false;
}
}
// Check that merged command will not be larger than half of commitlog segment.
// Merged command can be, in fact, much smaller but better to be safe than sorry.
// Skip the check for the first command.
if (size && size + s > max_command_size) {
return false;
}
return true;
}
void add(group0_command&& cmd, size_t added_size) {
slogger.trace("add to merging set new_state_id: {}", cmd.new_state_id);
auto m = convert_history_mutation(std::move(cmd.history_append), sm._sp.data_dictionary());
last_group0_state_id = cmd.new_state_id;
cmd_to_merge.push_back(std::move(cmd));
size += added_size;
if (merged_history_mutation) {
merged_history_mutation->apply(std::move(m));
} else {
merged_history_mutation = std::move(m);
}
}
std::vector<canonical_mutation>& get_command_mutations(group0_command& cmd) {
return std::visit(make_visitor(
[] (schema_change& chng) -> std::vector<canonical_mutation>& {
return chng.mutations;
},
[] (broadcast_table_query& query) -> std::vector<canonical_mutation>& {
on_internal_error(slogger, "trying to merge broadcast table command");
},
[] (topology_change& chng) -> std::vector<canonical_mutation>& {
return chng.mutations;
}
), cmd.change);
}
std::pair<group0_command, mutation> merge() {
auto& cmd = cmd_to_merge.back(); // use metadata from the last merged command
slogger.trace("merge new_state_id: {}", cmd.new_state_id);
using mutation_set_type = std::unordered_set<mutation, mutation_hash_by_key, mutation_equals_by_key>;
std::unordered_map<table_id, mutation_set_type> mutations;
if (cmd_to_merge.size() > 1) {
// skip merging if there is only one command
for (auto&& c : cmd_to_merge) {
for (auto&& cm : get_command_mutations(c)) {
auto schema = sm._sp.data_dictionary().find_schema(cm.column_family_id());
auto m = cm.to_mutation(schema);
auto& tbl_muts = mutations[cm.column_family_id()];
auto it = tbl_muts.find(m);
if (it == tbl_muts.end()) {
tbl_muts.emplace(std::move(m));
} else {
const_cast<mutation&>(*it).apply(std::move(m)); // Won't change key
}
}
}
std::vector<canonical_mutation> ms;
for (auto&& tables : mutations) {
for (auto&& partitions : tables.second) {
ms.push_back(canonical_mutation(partitions));
}
}
get_command_mutations(cmd) = std::move(ms);
}
auto res = std::make_pair(std::move(cmd), std::move(merged_history_mutation).value());
cmd_to_merge.clear();
merged_history_mutation.reset();
return res;
}
future<> apply(group0_command cmd, mutation history) {
// We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained.
// It is now important that we apply the change *before* we append the group0 state ID to the history table.
//
// If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because
// the state ID was not yet appended so the above check will pass.
// TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command
// is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without
// access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire
// change is applied and the state ID is updated or none of this happens.
// E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts.
co_await std::visit(make_visitor(
[&] (schema_change& chng) -> future<> {
return sm._mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
},
[&] (broadcast_table_query& query) -> future<> {
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(sm._sp, query.query, cmd.new_state_id);
sm._client.set_query_result(cmd.new_state_id, std::move(result));
},
[&] (topology_change& chng) -> future<> {
return sm._ss.topology_transition(sm._sp, sm._cdc_gen_svc, cmd.creator_addr, std::move(chng.mutations));
}
), cmd.change);
co_await sm._sp.mutate_locally({std::move(history)}, nullptr);
}
future<> merge_and_apply() {
auto [c, h] = merge();
return apply(std::move(c), std::move(h));
}
bool empty() const {
return cmd_to_merge.empty();
}
utils::UUID last_id() const {
return last_group0_state_id;
}
};
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
merger m(*this, co_await db::system_keyspace::get_last_group0_state_id(), std::move(read_apply_mutex_holder));
for (auto&& c : command) {
auto is = ser::as_input_stream(c);
auto cmd = ser::deserialize(is, boost::type<group0_command>{});
@@ -67,11 +219,8 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
if (*cmd.prev_state_id != last_group0_state_id) {
if (*cmd.prev_state_id != m.last_id()) {
// This command used obsolete state. Make it a no-op.
// BTW. on restart, all commands after last snapshot descriptor become no-ops even when they originally weren't no-ops.
// This is because we don't restart from snapshot descriptor, but using current state of the tables so the last state ID
@@ -79,39 +228,24 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
// Similar thing may happen when we pull group0 state in transfer_snapshot - we pull the latest state of remote tables,
// not state at the snapshot descriptor.
slogger.trace("cmd.prev_state_id ({}) different than last group 0 state ID in history table ({})",
cmd.prev_state_id, last_group0_state_id);
cmd.prev_state_id, m.last_id());
continue;
}
} else {
slogger.trace("unconditional modification, cmd.new_state_id: {}", cmd.new_state_id);
}
// We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained.
// It is now important that we apply the change *before* we append the group0 state ID to the history table.
//
// If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because
// the state ID was not yet appended so the above check will pass.
// TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command
// is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without
// access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire
// change is applied and the state ID is updated or none of this happens.
// E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts.
co_await std::visit(make_visitor(
[&] (schema_change& chng) -> future<> {
return _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
},
[&] (broadcast_table_query& query) -> future<> {
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id);
_client.set_query_result(cmd.new_state_id, std::move(result));
},
[&] (topology_change& chng) -> future<> {
return _ss.topology_transition(_sp, _cdc_gen_svc, cmd.creator_addr, std::move(chng.mutations));
auto size = m.cmd_size(cmd);
if (!m.can_merge(cmd, size)) {
co_await m.merge_and_apply();
}
), cmd.change);
co_await _sp.mutate_locally({convert_history_mutation(std::move(cmd.history_append), _sp.data_dictionary())}, nullptr);
m.add(std::move(cmd), size);
}
if (!m.empty()) {
// apply remainder
co_await m.merge_and_apply();
}
}

View File

@@ -34,6 +34,7 @@ raft_sys_table_storage::raft_sys_table_storage(cql3::query_processor& qp, raft::
, _qp(qp)
, _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit())
, _pending_op_fut(make_ready_future<>())
, _max_mutation_size(_qp.db().get_config().commitlog_segment_size_in_mb() * 1024 * 1204 / 2)
{
static const auto store_cql = format("INSERT INTO system.{} (group_id, term, \"index\", data) VALUES (?, ?, ?, ?)",
db::system_keyspace::RAFT);
@@ -194,10 +195,7 @@ future<> raft_sys_table_storage::store_snapshot_descriptor(const raft::snapshot_
});
}
future<> raft_sys_table_storage::do_store_log_entries(const std::vector<raft::log_entry_ptr>& entries) {
if (entries.empty()) {
co_return;
}
future<size_t> raft_sys_table_storage::do_store_log_entries_one_batch(const std::vector<raft::log_entry_ptr>& entries, size_t start_idx) {
std::vector<cql3::statements::batch_statement::single_statement> batch_stmts;
// statement values that can be allocated at once (one contiguous allocation)
std::vector<std::vector<cql3::raw_value>> stmt_values;
@@ -211,12 +209,19 @@ future<> raft_sys_table_storage::do_store_log_entries(const std::vector<raft::lo
stmt_data_views.reserve(entries_size);
stmt_value_views.reserve(entries_size);
for (const raft::log_entry_ptr& eptr : entries) {
batch_stmts.emplace_back(cql3::statements::batch_statement::single_statement(_store_entry_stmt, false));
size_t size = 0;
size_t idx = start_idx;
for (; idx < entries_size; idx++) {
auto& eptr = entries[idx];
auto data_tmp_buf = fragmented_temporary_buffer::allocate_to_fit(ser::get_sizeof(eptr->data));
auto data_out_str = data_tmp_buf.get_ostream();
ser::serialize(data_out_str, eptr->data);
if (size && size + data_tmp_buf.size_bytes() > _max_mutation_size) {
break;
}
size += data_tmp_buf.size_bytes();
batch_stmts.emplace_back(cql3::statements::batch_statement::single_statement(_store_entry_stmt, false));
// don't include serialized "data" here since it will require to linearize the stream
std::vector<cql3::raw_value> single_stmt_values;
@@ -261,6 +266,23 @@ future<> raft_sys_table_storage::do_store_log_entries(const std::vector<raft::lo
_qp.get_cql_stats());
co_await batch.execute(_qp, _dummy_query_state, batch_options);
if (idx != entries_size) {
co_return idx;
}
co_return 0;
}
future<> raft_sys_table_storage::do_store_log_entries(const std::vector<raft::log_entry_ptr>& entries) {
if (entries.empty()) {
co_return;
}
size_t idx = 0;
do {
idx = co_await do_store_log_entries_one_batch(entries, idx);
} while (idx != 0);
}
future<> raft_sys_table_storage::store_log_entries(const std::vector<raft::log_entry_ptr>& entries) {

View File

@@ -51,6 +51,8 @@ class raft_sys_table_storage : public raft::persistence {
// this helper.
future<> _pending_op_fut;
const size_t _max_mutation_size;
public:
explicit raft_sys_table_storage(cql3::query_processor& qp, raft::group_id gid, raft::server_id server_id);
@@ -76,6 +78,7 @@ public:
future<> bootstrap(raft::configuration initial_configuation);
private:
future<size_t> do_store_log_entries_one_batch(const std::vector<raft::log_entry_ptr>& entries, size_t start_idx);
future<> do_store_log_entries(const std::vector<raft::log_entry_ptr>& entries);
// Truncate all entries from the persisted log with indices <= idx
// Called from the `store_snapshot` function.

View File

@@ -0,0 +1,69 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "test/lib/scylla_test_case.hh"
#include "test/lib/cql_test_env.hh"
#include "service/migration_manager.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/raft/group0_state_machine.hh"
#include "idl/experimental/broadcast_tables_lang.dist.hh"
#include "idl/experimental/broadcast_tables_lang.dist.impl.hh"
#include "idl/group0_state_machine.dist.hh"
#include "idl/group0_state_machine.dist.impl.hh"
#include "utils/error_injection.hh"
SEASTAR_TEST_CASE(test_group0_cmd_merge) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return make_ready_future<>();
#else
cql_test_config cfg;
cfg.db_config->commitlog_segment_size_in_mb(1);
return do_with_cql_env_thread([] (cql_test_env& env) {
auto& group0 = env.get_raft_group_registry().local().group0();
auto& mm = env.migration_manager().local();
auto id = utils::UUID_gen::get_time_UUID();
service::group0_command group0_cmd {
.history_append{db::system_keyspace::make_group0_history_state_id_mutation(
id, gc_clock::duration{0}, "test")},
.new_state_id = id,
.creator_addr{utils::fb_utilities::get_broadcast_address()},
.creator_id{group0.id()}
};
std::vector<canonical_mutation> cms;
size_t size = 0;
auto muts = mm.prepare_keyspace_drop_announcement("ks", api::new_timestamp()).get0();
// Maximum mutation size is half of commitlog segment size wich we set
// to 1M. Make one command a little bit larger than third of the max size.
while (size < 200*1024) {
for (auto&& m : muts) {
cms.emplace_back(m);
size += cms.back().representation().size();
}
}
group0_cmd.change = service::schema_change{std::move(cms)};
raft::command cmd;
ser::serialize(cmd, group0_cmd);
auto merges = mm.canonical_mutation_merge_count;
utils::get_local_injector().enable("fsm::poll_output/pause");
auto f = when_all(
group0.add_entry(cmd, raft::wait_type::applied, nullptr),
group0.add_entry(cmd, raft::wait_type::applied, nullptr),
group0.add_entry(cmd, raft::wait_type::applied, nullptr));
// Sleep is needed for all the entreis added above to hit the log
seastar::sleep(std::chrono::milliseconds(100)).get();
// After unpause all entreis added above will be committed and applied together
utils::get_local_injector().disable("fsm::poll_output/pause");
f.get(); // Wait for apply to complete
// Thete should be two calls to migration manager since two out of
// three command should be merged.
BOOST_REQUIRE_EQUAL(mm.canonical_mutation_merge_count - merges, 2);
}, cfg);
#endif
}