tablets: Add per-tablet session id field to tablet metadata
range_streamer will pick it up when creating topology_guard. It's materialized in memory only for migrating tablets in tablet_transition_info.
This commit is contained in:
@@ -71,10 +71,14 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage)
|
|||||||
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
|
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
|
||||||
}
|
}
|
||||||
|
|
||||||
tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica)
|
tablet_transition_info::tablet_transition_info(tablet_transition_stage stage,
|
||||||
|
tablet_replica_set next,
|
||||||
|
tablet_replica pending_replica,
|
||||||
|
service::session_id session_id)
|
||||||
: stage(stage)
|
: stage(stage)
|
||||||
, next(std::move(next))
|
, next(std::move(next))
|
||||||
, pending_replica(std::move(pending_replica))
|
, pending_replica(std::move(pending_replica))
|
||||||
|
, session_id(session_id)
|
||||||
, writes(get_selector_for_writes(stage))
|
, writes(get_selector_for_writes(stage))
|
||||||
, reads(get_selector_for_reads(stage))
|
, reads(get_selector_for_reads(stage))
|
||||||
{ }
|
{ }
|
||||||
@@ -280,6 +284,9 @@ std::ostream& operator<<(std::ostream& out, const tablet_map& r) {
|
|||||||
out << format("\n [{}]: last_token={}, replicas={}", tid, r.get_last_token(tid), tablet.replicas);
|
out << format("\n [{}]: last_token={}, replicas={}", tid, r.get_last_token(tid), tablet.replicas);
|
||||||
if (auto tr = r.get_tablet_transition_info(tid)) {
|
if (auto tr = r.get_tablet_transition_info(tid)) {
|
||||||
out << format(", stage={}, new_replicas={}, pending={}", tr->stage, tr->next, tr->pending_replica);
|
out << format(", stage={}, new_replicas={}, pending={}", tr->stage, tr->next, tr->pending_replica);
|
||||||
|
if (tr->session_id) {
|
||||||
|
out << format(", session={}", tr->session_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
first = false;
|
first = false;
|
||||||
tid = *r.next_tablet(tid);
|
tid = *r.next_tablet(tid);
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
#include "dht/token.hh"
|
#include "dht/token.hh"
|
||||||
#include "utils/small_vector.hh"
|
#include "utils/small_vector.hh"
|
||||||
#include "locator/host_id.hh"
|
#include "locator/host_id.hh"
|
||||||
|
#include "service/session.hh"
|
||||||
#include "dht/i_partitioner_fwd.hh"
|
#include "dht/i_partitioner_fwd.hh"
|
||||||
#include "schema/schema_fwd.hh"
|
#include "schema/schema_fwd.hh"
|
||||||
#include "utils/chunked_vector.hh"
|
#include "utils/chunked_vector.hh"
|
||||||
@@ -171,10 +172,12 @@ struct tablet_transition_info {
|
|||||||
tablet_transition_stage stage;
|
tablet_transition_stage stage;
|
||||||
tablet_replica_set next;
|
tablet_replica_set next;
|
||||||
tablet_replica pending_replica; // Optimization (next - tablet_info::replicas)
|
tablet_replica pending_replica; // Optimization (next - tablet_info::replicas)
|
||||||
|
service::session_id session_id;
|
||||||
write_replica_set_selector writes;
|
write_replica_set_selector writes;
|
||||||
read_replica_set_selector reads;
|
read_replica_set_selector reads;
|
||||||
|
|
||||||
tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica);
|
tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica,
|
||||||
|
service::session_id session_id = {});
|
||||||
|
|
||||||
bool operator==(const tablet_transition_info&) const = default;
|
bool operator==(const tablet_transition_info&) const = default;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
#include "mutation/mutation.hh"
|
#include "mutation/mutation.hh"
|
||||||
#include "db/system_keyspace.hh"
|
#include "db/system_keyspace.hh"
|
||||||
#include "replica/tablets.hh"
|
#include "replica/tablets.hh"
|
||||||
|
#include "service/session.hh"
|
||||||
|
|
||||||
namespace replica {
|
namespace replica {
|
||||||
|
|
||||||
@@ -35,6 +36,8 @@ public:
|
|||||||
tablet_mutation_builder& set_new_replicas(dht::token last_token, locator::tablet_replica_set replicas);
|
tablet_mutation_builder& set_new_replicas(dht::token last_token, locator::tablet_replica_set replicas);
|
||||||
tablet_mutation_builder& set_replicas(dht::token last_token, locator::tablet_replica_set replicas);
|
tablet_mutation_builder& set_replicas(dht::token last_token, locator::tablet_replica_set replicas);
|
||||||
tablet_mutation_builder& set_stage(dht::token last_token, locator::tablet_transition_stage stage);
|
tablet_mutation_builder& set_stage(dht::token last_token, locator::tablet_transition_stage stage);
|
||||||
|
tablet_mutation_builder& set_session(dht::token last_token, service::session_id);
|
||||||
|
tablet_mutation_builder& del_session(dht::token last_token);
|
||||||
tablet_mutation_builder& del_transition(dht::token last_token);
|
tablet_mutation_builder& del_transition(dht::token last_token);
|
||||||
|
|
||||||
mutation build() {
|
mutation build() {
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ schema_ptr make_tablets_schema() {
|
|||||||
.with_column("replicas", replica_set_type)
|
.with_column("replicas", replica_set_type)
|
||||||
.with_column("new_replicas", replica_set_type)
|
.with_column("new_replicas", replica_set_type)
|
||||||
.with_column("stage", utf8_type)
|
.with_column("stage", utf8_type)
|
||||||
|
.with_column("session", uuid_type)
|
||||||
.with_version(db::system_keyspace::generate_schema_version(id))
|
.with_version(db::system_keyspace::generate_schema_version(id))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
@@ -82,6 +83,9 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke
|
|||||||
if (auto tr_info = tablets.get_tablet_transition_info(tid)) {
|
if (auto tr_info = tablets.get_tablet_transition_info(tid)) {
|
||||||
m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts);
|
m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts);
|
||||||
m.set_clustered_cell(ck, "new_replicas", make_list_value(replica_set_type, replicas_to_data_value(tr_info->next)), ts);
|
m.set_clustered_cell(ck, "new_replicas", make_list_value(replica_set_type, replicas_to_data_value(tr_info->next)), ts);
|
||||||
|
if (tr_info->session_id) {
|
||||||
|
m.set_clustered_cell(ck, "session", data_value(tr_info->session_id.uuid()), ts);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tid = *tablets.next_tablet(tid);
|
tid = *tablets.next_tablet(tid);
|
||||||
co_await coroutine::maybe_yield();
|
co_await coroutine::maybe_yield();
|
||||||
@@ -107,6 +111,19 @@ tablet_mutation_builder::set_stage(dht::token last_token, locator::tablet_transi
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tablet_mutation_builder&
|
||||||
|
tablet_mutation_builder::set_session(dht::token last_token, service::session_id session_id) {
|
||||||
|
_m.set_clustered_cell(get_ck(last_token), "session", data_value(session_id.uuid()), _ts);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
tablet_mutation_builder&
|
||||||
|
tablet_mutation_builder::del_session(dht::token last_token) {
|
||||||
|
auto session_col = _s->get_column_definition("session");
|
||||||
|
_m.set_clustered_cell(get_ck(last_token), *session_col, atomic_cell::make_dead(_ts, gc_clock::now()));
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
tablet_mutation_builder&
|
tablet_mutation_builder&
|
||||||
tablet_mutation_builder::del_transition(dht::token last_token) {
|
tablet_mutation_builder::del_transition(dht::token last_token) {
|
||||||
auto ck = get_ck(last_token);
|
auto ck = get_ck(last_token);
|
||||||
@@ -114,6 +131,8 @@ tablet_mutation_builder::del_transition(dht::token last_token) {
|
|||||||
_m.set_clustered_cell(ck, *stage_col, atomic_cell::make_dead(_ts, gc_clock::now()));
|
_m.set_clustered_cell(ck, *stage_col, atomic_cell::make_dead(_ts, gc_clock::now()));
|
||||||
auto new_replicas_col = _s->get_column_definition("new_replicas");
|
auto new_replicas_col = _s->get_column_definition("new_replicas");
|
||||||
_m.set_clustered_cell(ck, *new_replicas_col, atomic_cell::make_dead(_ts, gc_clock::now()));
|
_m.set_clustered_cell(ck, *new_replicas_col, atomic_cell::make_dead(_ts, gc_clock::now()));
|
||||||
|
auto session_col = _s->get_column_definition("session");
|
||||||
|
_m.set_clustered_cell(ck, *session_col, atomic_cell::make_dead(_ts, gc_clock::now()));
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -198,8 +217,12 @@ future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
|
|||||||
throw std::runtime_error(format("Too many pending replicas for table {} tablet {}: {}",
|
throw std::runtime_error(format("Too many pending replicas for table {} tablet {}: {}",
|
||||||
table, current->tid, pending));
|
table, current->tid, pending));
|
||||||
}
|
}
|
||||||
|
service::session_id session_id;
|
||||||
|
if (row.has("session")) {
|
||||||
|
session_id = service::session_id(row.get_as<utils::UUID>("session"));
|
||||||
|
}
|
||||||
current->map.set_tablet_transition_info(current->tid, tablet_transition_info{stage,
|
current->map.set_tablet_transition_info(current->tid, tablet_transition_info{stage,
|
||||||
std::move(new_tablet_replicas), *pending.begin()});
|
std::move(new_tablet_replicas), *pending.begin(), session_id});
|
||||||
}
|
}
|
||||||
|
|
||||||
current->map.set_tablet(current->tid, tablet_info{std::move(tablet_replicas)});
|
current->map.set_tablet(current->tid, tablet_info{std::move(tablet_replicas)});
|
||||||
|
|||||||
@@ -153,7 +153,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
|
|||||||
tablet_replica {h1, 4},
|
tablet_replica {h1, 4},
|
||||||
tablet_replica {h2, 2},
|
tablet_replica {h2, 2},
|
||||||
},
|
},
|
||||||
tablet_replica {h1, 4}
|
tablet_replica {h1, 4},
|
||||||
|
session_id(utils::UUID_gen::get_time_UUID())
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user