diff --git a/configure.py b/configure.py index 55a5b73d36..ff6935860a 100755 --- a/configure.py +++ b/configure.py @@ -548,6 +548,7 @@ scylla_tests = set([ 'test/boost/locator_topology_test', 'test/boost/string_format_test', 'test/boost/tagged_integer_test', + 'test/boost/group0_cmd_merge_test', 'test/manual/ec2_snitch_test', 'test/manual/enormous_table_scan_test', 'test/manual/gce_snitch_test', diff --git a/raft/fsm.cc b/raft/fsm.cc index 9a8852f6ba..38ab996d16 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -316,6 +316,9 @@ future fsm::poll_output() { } co_await _sm_events.wait(); } + while (utils::get_local_injector().enter("fsm::poll_output/pause")) { + co_await seastar::sleep(std::chrono::milliseconds(100)); + } co_return get_output(); } diff --git a/service/migration_manager.cc b/service/migration_manager.cc index c3a6779db4..66597e4ee0 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -343,6 +343,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr } future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& canonical_mutations) { + canonical_mutation_merge_count++; mlogger.debug("Applying schema mutations from {}", src); auto& proxy = _storage_proxy; const auto& db = proxy.get_db().local(); diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 5778873e54..bde4192145 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -98,6 +98,8 @@ public: // Merge mutations received from src. // Keep mutations alive around whole async operation. future<> merge_schema_from(netw::msg_addr src, const std::vector& mutations); + // Incremented each time the function above is called. Needed by tests. + size_t canonical_mutation_merge_count = 0; // Deprecated. The canonical mutation should be used instead. future<> merge_schema_from(netw::msg_addr src, const std::vector& mutations); diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index 22024ee4a1..2c2042a54f 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -12,6 +12,7 @@ #include "dht/token.hh" #include "message/messaging_service.hh" #include "mutation/canonical_mutation.hh" +#include "seastar/core/on_internal_error.hh" #include "service/broadcast_tables/experimental/query_result.hh" #include "schema_mutations.hh" #include "frozen_schema.hh" @@ -35,7 +36,9 @@ #include "partition_slice_builder.hh" #include "timestamp.hh" #include "utils/overloaded_functor.hh" +#include #include +#include "db/config.hh" namespace service { @@ -59,6 +62,155 @@ static mutation convert_history_mutation(canonical_mutation m, const data_dictio future<> group0_state_machine::apply(std::vector command) { slogger.trace("apply() is called with {} commands", command.size()); + + struct merger { + std::vector cmd_to_merge; + std::optional merged_history_mutation; + utils::UUID last_group0_state_id; + group0_state_machine& sm; + size_t size = 0; + semaphore_units<> read_apply_mutex_holder; + const size_t max_command_size; + merger(group0_state_machine& sm_, utils::UUID id, semaphore_units<> mux) : last_group0_state_id(id) + , sm(sm_) + , read_apply_mutex_holder(std::move(mux)) + , max_command_size(sm._sp.data_dictionary().get_config().commitlog_segment_size_in_mb() * 1024 * 1024 / 2) {} + + size_t cmd_size(group0_command& cmd) { + if (holds_alternative(cmd.change)) { + return 0; + } + auto r = get_command_mutations(cmd) | boost::adaptors::transformed([] (const canonical_mutation& m) { return m.representation().size(); }); + return std::accumulate(std::begin(r), std::end(r), size_t(0)); + } + bool can_merge(group0_command& cmd, size_t s) { + if (!cmd_to_merge.empty()) { + // broadcast table commands or different type of commands cannot be merged + if (cmd_to_merge[0].change.index() != cmd.change.index() || holds_alternative(cmd.change)) { + return false; + } + } + + // Check that merged command will not be larger than half of commitlog segment. + // Merged command can be, in fact, much smaller but better to be safe than sorry. + // Skip the check for the first command. + if (size && size + s > max_command_size) { + return false; + } + + return true; + } + void add(group0_command&& cmd, size_t added_size) { + slogger.trace("add to merging set new_state_id: {}", cmd.new_state_id); + auto m = convert_history_mutation(std::move(cmd.history_append), sm._sp.data_dictionary()); + last_group0_state_id = cmd.new_state_id; + cmd_to_merge.push_back(std::move(cmd)); + size += added_size; + if (merged_history_mutation) { + merged_history_mutation->apply(std::move(m)); + } else { + merged_history_mutation = std::move(m); + } + } + + std::vector& get_command_mutations(group0_command& cmd) { + return std::visit(make_visitor( + [] (schema_change& chng) -> std::vector& { + return chng.mutations; + }, + [] (broadcast_table_query& query) -> std::vector& { + on_internal_error(slogger, "trying to merge broadcast table command"); + }, + [] (topology_change& chng) -> std::vector& { + return chng.mutations; + } + ), cmd.change); + } + + std::pair merge() { + auto& cmd = cmd_to_merge.back(); // use metadata from the last merged command + slogger.trace("merge new_state_id: {}", cmd.new_state_id); + using mutation_set_type = std::unordered_set; + std::unordered_map mutations; + + if (cmd_to_merge.size() > 1) { + // skip merging if there is only one command + for (auto&& c : cmd_to_merge) { + for (auto&& cm : get_command_mutations(c)) { + auto schema = sm._sp.data_dictionary().find_schema(cm.column_family_id()); + auto m = cm.to_mutation(schema); + auto& tbl_muts = mutations[cm.column_family_id()]; + auto it = tbl_muts.find(m); + if (it == tbl_muts.end()) { + tbl_muts.emplace(std::move(m)); + } else { + const_cast(*it).apply(std::move(m)); // Won't change key + } + } + } + + std::vector ms; + for (auto&& tables : mutations) { + for (auto&& partitions : tables.second) { + ms.push_back(canonical_mutation(partitions)); + } + } + + get_command_mutations(cmd) = std::move(ms); + } + auto res = std::make_pair(std::move(cmd), std::move(merged_history_mutation).value()); + cmd_to_merge.clear(); + merged_history_mutation.reset(); + return res; + } + + future<> apply(group0_command cmd, mutation history) { + // We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained. + // It is now important that we apply the change *before* we append the group0 state ID to the history table. + // + // If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because + // the state ID was not yet appended so the above check will pass. + + // TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command + // is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without + // access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire + // change is applied and the state ID is updated or none of this happens. + // E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts. + + co_await std::visit(make_visitor( + [&] (schema_change& chng) -> future<> { + return sm._mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations)); + }, + [&] (broadcast_table_query& query) -> future<> { + auto result = co_await service::broadcast_tables::execute_broadcast_table_query(sm._sp, query.query, cmd.new_state_id); + sm._client.set_query_result(cmd.new_state_id, std::move(result)); + }, + [&] (topology_change& chng) -> future<> { + return sm._ss.topology_transition(sm._sp, sm._cdc_gen_svc, cmd.creator_addr, std::move(chng.mutations)); + } + ), cmd.change); + + co_await sm._sp.mutate_locally({std::move(history)}, nullptr); + } + + future<> merge_and_apply() { + auto [c, h] = merge(); + return apply(std::move(c), std::move(h)); + } + + bool empty() const { + return cmd_to_merge.empty(); + } + + utils::UUID last_id() const { + return last_group0_state_id; + } + }; + + auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(); + + merger m(*this, co_await db::system_keyspace::get_last_group0_state_id(), std::move(read_apply_mutex_holder)); + for (auto&& c : command) { auto is = ser::as_input_stream(c); auto cmd = ser::deserialize(is, boost::type{}); @@ -67,11 +219,8 @@ future<> group0_state_machine::apply(std::vector command) { cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id); slogger.trace("cmd.history_append: {}", cmd.history_append); - auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(); - if (cmd.prev_state_id) { - auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id(); - if (*cmd.prev_state_id != last_group0_state_id) { + if (*cmd.prev_state_id != m.last_id()) { // This command used obsolete state. Make it a no-op. // BTW. on restart, all commands after last snapshot descriptor become no-ops even when they originally weren't no-ops. // This is because we don't restart from snapshot descriptor, but using current state of the tables so the last state ID @@ -79,39 +228,24 @@ future<> group0_state_machine::apply(std::vector command) { // Similar thing may happen when we pull group0 state in transfer_snapshot - we pull the latest state of remote tables, // not state at the snapshot descriptor. slogger.trace("cmd.prev_state_id ({}) different than last group 0 state ID in history table ({})", - cmd.prev_state_id, last_group0_state_id); + cmd.prev_state_id, m.last_id()); continue; } } else { slogger.trace("unconditional modification, cmd.new_state_id: {}", cmd.new_state_id); } - // We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained. - // It is now important that we apply the change *before* we append the group0 state ID to the history table. - // - // If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because - // the state ID was not yet appended so the above check will pass. - - // TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command - // is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without - // access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire - // change is applied and the state ID is updated or none of this happens. - // E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts. - - co_await std::visit(make_visitor( - [&] (schema_change& chng) -> future<> { - return _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations)); - }, - [&] (broadcast_table_query& query) -> future<> { - auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id); - _client.set_query_result(cmd.new_state_id, std::move(result)); - }, - [&] (topology_change& chng) -> future<> { - return _ss.topology_transition(_sp, _cdc_gen_svc, cmd.creator_addr, std::move(chng.mutations)); + auto size = m.cmd_size(cmd); + if (!m.can_merge(cmd, size)) { + co_await m.merge_and_apply(); } - ), cmd.change); - co_await _sp.mutate_locally({convert_history_mutation(std::move(cmd.history_append), _sp.data_dictionary())}, nullptr); + m.add(std::move(cmd), size); + } + + if (!m.empty()) { + // apply remainder + co_await m.merge_and_apply(); } } diff --git a/service/raft/raft_sys_table_storage.cc b/service/raft/raft_sys_table_storage.cc index abf4ae7e01..2c601cdf01 100644 --- a/service/raft/raft_sys_table_storage.cc +++ b/service/raft/raft_sys_table_storage.cc @@ -34,6 +34,7 @@ raft_sys_table_storage::raft_sys_table_storage(cql3::query_processor& qp, raft:: , _qp(qp) , _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit()) , _pending_op_fut(make_ready_future<>()) + , _max_mutation_size(_qp.db().get_config().commitlog_segment_size_in_mb() * 1024 * 1204 / 2) { static const auto store_cql = format("INSERT INTO system.{} (group_id, term, \"index\", data) VALUES (?, ?, ?, ?)", db::system_keyspace::RAFT); @@ -194,10 +195,7 @@ future<> raft_sys_table_storage::store_snapshot_descriptor(const raft::snapshot_ }); } -future<> raft_sys_table_storage::do_store_log_entries(const std::vector& entries) { - if (entries.empty()) { - co_return; - } +future raft_sys_table_storage::do_store_log_entries_one_batch(const std::vector& entries, size_t start_idx) { std::vector batch_stmts; // statement values that can be allocated at once (one contiguous allocation) std::vector> stmt_values; @@ -211,12 +209,19 @@ future<> raft_sys_table_storage::do_store_log_entries(const std::vectordata)); auto data_out_str = data_tmp_buf.get_ostream(); ser::serialize(data_out_str, eptr->data); + if (size && size + data_tmp_buf.size_bytes() > _max_mutation_size) { + break; + } + size += data_tmp_buf.size_bytes(); + batch_stmts.emplace_back(cql3::statements::batch_statement::single_statement(_store_entry_stmt, false)); // don't include serialized "data" here since it will require to linearize the stream std::vector single_stmt_values; @@ -261,6 +266,23 @@ future<> raft_sys_table_storage::do_store_log_entries(const std::vector raft_sys_table_storage::do_store_log_entries(const std::vector& entries) { + if (entries.empty()) { + co_return; + } + + size_t idx = 0; + do { + idx = co_await do_store_log_entries_one_batch(entries, idx); + } while (idx != 0); } future<> raft_sys_table_storage::store_log_entries(const std::vector& entries) { diff --git a/service/raft/raft_sys_table_storage.hh b/service/raft/raft_sys_table_storage.hh index 27573b5b6a..d82af8f153 100644 --- a/service/raft/raft_sys_table_storage.hh +++ b/service/raft/raft_sys_table_storage.hh @@ -51,6 +51,8 @@ class raft_sys_table_storage : public raft::persistence { // this helper. future<> _pending_op_fut; + const size_t _max_mutation_size; + public: explicit raft_sys_table_storage(cql3::query_processor& qp, raft::group_id gid, raft::server_id server_id); @@ -76,6 +78,7 @@ public: future<> bootstrap(raft::configuration initial_configuation); private: + future do_store_log_entries_one_batch(const std::vector& entries, size_t start_idx); future<> do_store_log_entries(const std::vector& entries); // Truncate all entries from the persisted log with indices <= idx // Called from the `store_snapshot` function. diff --git a/test/boost/group0_cmd_merge_test.cc b/test/boost/group0_cmd_merge_test.cc new file mode 100644 index 0000000000..fd0601d598 --- /dev/null +++ b/test/boost/group0_cmd_merge_test.cc @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "test/lib/scylla_test_case.hh" +#include "test/lib/cql_test_env.hh" + +#include "service/migration_manager.hh" +#include "service/raft/raft_group_registry.hh" +#include "service/raft/group0_state_machine.hh" +#include "idl/experimental/broadcast_tables_lang.dist.hh" +#include "idl/experimental/broadcast_tables_lang.dist.impl.hh" +#include "idl/group0_state_machine.dist.hh" +#include "idl/group0_state_machine.dist.impl.hh" +#include "utils/error_injection.hh" + +SEASTAR_TEST_CASE(test_group0_cmd_merge) { +#ifndef SCYLLA_ENABLE_ERROR_INJECTION + fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n"); + return make_ready_future<>(); +#else + cql_test_config cfg; + cfg.db_config->commitlog_segment_size_in_mb(1); + return do_with_cql_env_thread([] (cql_test_env& env) { + auto& group0 = env.get_raft_group_registry().local().group0(); + auto& mm = env.migration_manager().local(); + auto id = utils::UUID_gen::get_time_UUID(); + service::group0_command group0_cmd { + .history_append{db::system_keyspace::make_group0_history_state_id_mutation( + id, gc_clock::duration{0}, "test")}, + .new_state_id = id, + .creator_addr{utils::fb_utilities::get_broadcast_address()}, + .creator_id{group0.id()} + }; + std::vector cms; + size_t size = 0; + auto muts = mm.prepare_keyspace_drop_announcement("ks", api::new_timestamp()).get0(); + // Maximum mutation size is half of commitlog segment size wich we set + // to 1M. Make one command a little bit larger than third of the max size. + while (size < 200*1024) { + for (auto&& m : muts) { + cms.emplace_back(m); + size += cms.back().representation().size(); + } + } + group0_cmd.change = service::schema_change{std::move(cms)}; + raft::command cmd; + ser::serialize(cmd, group0_cmd); + auto merges = mm.canonical_mutation_merge_count; + utils::get_local_injector().enable("fsm::poll_output/pause"); + auto f = when_all( + group0.add_entry(cmd, raft::wait_type::applied, nullptr), + group0.add_entry(cmd, raft::wait_type::applied, nullptr), + group0.add_entry(cmd, raft::wait_type::applied, nullptr)); + // Sleep is needed for all the entreis added above to hit the log + seastar::sleep(std::chrono::milliseconds(100)).get(); + // After unpause all entreis added above will be committed and applied together + utils::get_local_injector().disable("fsm::poll_output/pause"); + f.get(); // Wait for apply to complete + // Thete should be two calls to migration manager since two out of + // three command should be merged. + BOOST_REQUIRE_EQUAL(mm.canonical_mutation_merge_count - merges, 2); + }, cfg); +#endif +}