tablets: Implement cleanup step

This change adds a stub for tablet cleanup on the replica side and wires
it into the tablet migration process.

The handling on replica side is incomplete because it doesn't remove
the actual data yet. It only flushes the memtables, so that all data
is in sstables and none requires a memtable flush.

This patch is necessary to make decommission work. Otherwise, a
memtable flush would happen when the decommissioned node is put in the
drained state (as in nodetool drain) and it would fail on missing host
id mapping (node is no longer in topology), which is examined by the
tablet sharder when producing sstable sharding metadata. Leading to
abort due to failed memtable flush.
This commit is contained in:
Tomasz Grabiec
2023-08-28 01:30:08 +02:00
parent 5cf035878d
commit d5539e080d
10 changed files with 87 additions and 11 deletions

View File

@@ -52,4 +52,5 @@ struct raft_topology_pull_params {};
verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb [[cancellable]] raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot;
verb [[cancellable]] tablet_stream_data (locator::global_tablet_id);
verb [[cancellable]] tablet_cleanup (locator::global_tablet_id);
}

View File

@@ -45,6 +45,8 @@ write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage
return write_replica_set_selector::next;
case tablet_transition_stage::cleanup:
return write_replica_set_selector::next;
case tablet_transition_stage::end_migration:
return write_replica_set_selector::next;
}
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
@@ -64,6 +66,8 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage)
return read_replica_set_selector::next;
case tablet_transition_stage::cleanup:
return read_replica_set_selector::next;
case tablet_transition_stage::end_migration:
return read_replica_set_selector::next;
}
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
@@ -232,6 +236,7 @@ static const std::unordered_map<tablet_transition_stage, sstring> tablet_transit
{tablet_transition_stage::streaming, "streaming"},
{tablet_transition_stage::use_new, "use_new"},
{tablet_transition_stage::cleanup, "cleanup"},
{tablet_transition_stage::end_migration, "end_migration"},
};
static const std::unordered_map<sstring, tablet_transition_stage> tablet_transition_stage_from_name = std::invoke([] {

View File

@@ -141,6 +141,7 @@ enum class tablet_transition_stage {
write_both_read_new,
use_new,
cleanup,
end_migration,
};
sstring tablet_transition_stage_to_string(tablet_transition_stage);

View File

@@ -597,6 +597,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::NODE_OPS_CMD:
case messaging_verb::HINT_MUTATION:
case messaging_verb::TABLET_STREAM_DATA:
case messaging_verb::TABLET_CLEANUP:
return 1;
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:

View File

@@ -183,7 +183,8 @@ enum class messaging_verb : int32_t {
RAFT_TOPOLOGY_CMD = 64,
RAFT_PULL_TOPOLOGY_SNAPSHOT = 65,
TABLET_STREAM_DATA = 66,
LAST = 67,
TABLET_CLEANUP = 67,
LAST = 68,
};
} // namespace netw

View File

@@ -62,6 +62,7 @@
#include "sstables/generation_type.hh"
#include "db/rate_limiter.hh"
#include "db/operation_type.hh"
#include "locator/tablets.hh"
#include "utils/serialized_action.hh"
#include "compaction/compaction_fwd.hh"
@@ -792,6 +793,7 @@ public:
const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; }
void update_effective_replication_map(locator::effective_replication_map_ptr);
[[gnu::always_inline]] bool uses_tablets() const;
future<> cleanup_tablet(locator::tablet_id);
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
future<const_row_ptr> find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
shard_id shard_of(const mutation& m) const {

View File

@@ -2964,4 +2964,9 @@ bool table::requires_cleanup(const sstables::sstable_set& set) const {
}));
}
future<> table::cleanup_tablet(locator::tablet_id) {
co_await flush();
// FIXME: Remove sstables
}
} // namespace replica

View File

@@ -1399,6 +1399,7 @@ class topology_coordinator {
// Next migration of the same tablet is guaranteed to use a different instance.
struct tablet_migration_state {
background_action_holder streaming;
background_action_holder cleanup;
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
};
@@ -1508,11 +1509,15 @@ class topology_coordinator {
.build());
};
auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) {
if (advance_in_background(gid, tablet_state.barriers[stage], "barrier", [&] {
auto do_barrier = [&] {
return advance_in_background(gid, tablet_state.barriers[trinfo.stage], "barrier", [&] {
needs_barrier = true;
return barrier.get_shared_future();
})) {
});
};
auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) {
if (do_barrier()) {
transition_to(stage);
}
};
@@ -1544,13 +1549,26 @@ class topology_coordinator {
transition_to_with_barrier(locator::tablet_transition_stage::cleanup);
break;
case locator::tablet_transition_stage::cleanup:
// FIXME: Actually perform the cleanup. Block on integration with compaction groups.
_tablets.erase(gid);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.build());
if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] {
locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst);
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
netw::msg_addr(id2ip(dst.host)), _as, gid);
})) {
transition_to(locator::tablet_transition_stage::end_migration);
}
break;
case locator::tablet_transition_stage::end_migration:
// Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs.
// See do_tablet_operation() doc.
if (do_barrier()) {
_tablets.erase(gid);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.build());
}
break;
}
});
@@ -6208,6 +6226,42 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
});
}
future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
return do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) {
shard_id shard;
{
auto tm = guard.get_token_metadata();
auto& tmap = guard.get_tablet_map();
auto *trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this cleanup was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::cleanup) {
throw std::runtime_error(format("Tablet {} stage is not at cleanup", tablet));
}
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
if (leaving_replica.host != tm->get_my_id()) {
throw std::runtime_error(format("Tablet {} has leaving replica different than this one", tablet));
}
auto shard_opt = tmap.get_shard(tablet.tablet, tm->get_my_id());
if (!shard_opt) {
on_internal_error(slogger, format("Tablet {} has no shard on this node", tablet));
}
shard = *shard_opt;
}
return _db.invoke_on(shard, [tablet] (replica::database& db) {
auto& table = db.find_column_family(tablet.table);
return table.cleanup_tablet(tablet.tablet);
});
});
}
void storage_service::init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
@@ -6276,6 +6330,9 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
return ss.stream_tablet(tablet);
});
});
ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [this] (locator::global_tablet_id tablet) {
return cleanup_tablet(tablet);
});
}
future<> storage_service::uninit_messaging_service() {

View File

@@ -159,6 +159,7 @@ private:
sstring op_name,
std::function<future<>(locator::tablet_metadata_guard&)> op);
future<> stream_tablet(locator::global_tablet_id);
future<> cleanup_tablet(locator::global_tablet_id);
inet_address host2ip(locator::host_id);
public:
storage_service(abort_source& as, distributed<replica::database>& db,

View File

@@ -265,6 +265,8 @@ private:
return false;
case tablet_transition_stage::cleanup:
return false;
case tablet_transition_stage::end_migration:
return false;
}
on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast<int>(trinfo->stage)));
}