db: Fix commitlog replay to not drop cell mutations with older schema

column_mapping is not safe to access across shards, because data_type
is not safe to access. One of the manifestation of this is that
abstract_type::is_value_compatible_with() always fails if the two
types belong to different shards.

During replay, column_mapping lives on the replaying shard, and is
used by converting_mutation_partition_applier against the schema on
the target shard. Since types in the mapping will be considered
incompatible with types in the schema, all cells will be dropped.

Fix by using column_mapping in a safe way, by copying it to the target
shard if necessary. Each shard maintains its own cache of column
mappings.

Fixes #1924.
Message-Id: <1481310463-13868-1-git-send-email-tgrabiec@scylladb.com>
This commit is contained in:
Tomasz Grabiec
2016-12-09 20:07:43 +01:00
committed by Pekka Enberg
parent 32d55bbb4c
commit 059a1a4f22
4 changed files with 48 additions and 5 deletions

View File

@@ -61,8 +61,14 @@
static logging::logger logger("commitlog_replayer");
struct column_mappings {
std::unordered_map<table_schema_version, column_mapping> map;
future<> stop() { return make_ready_future<>(); }
};
class db::commitlog_replayer::impl {
std::unordered_map<table_schema_version, column_mapping> _column_mappings;
seastar::sharded<column_mappings> _column_mappings;
friend class db::commitlog_replayer;
public:
impl(seastar::sharded<cql3::query_processor>& db);
@@ -220,14 +226,16 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
commitlog_entry_reader cer(buf);
auto& fm = cer.mutation();
auto cm_it = _column_mappings.find(fm.schema_version());
if (cm_it == _column_mappings.end()) {
auto& local_cm = _column_mappings.local().map;
auto cm_it = local_cm.find(fm.schema_version());
if (cm_it == local_cm.end()) {
if (!cer.get_column_mapping()) {
throw std::runtime_error(sprint("unknown schema version {}", fm.schema_version()));
}
logger.debug("new schema version {} in entry {}", fm.schema_version(), rp);
cm_it = _column_mappings.emplace(fm.schema_version(), *cer.get_column_mapping()).first;
cm_it = local_cm.emplace(fm.schema_version(), *cer.get_column_mapping()).first;
}
const column_mapping& src_cm = cm_it->second;
auto shard_id = rp.shard_id();
if (rp < _min_pos[shard_id]) {
@@ -246,7 +254,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
}
auto shard = _qp.local().db().local().shard_of(fm);
return _qp.local().db().invoke_on(shard, [this, cer = std::move(cer), cm_it, rp, shard, s] (database& db) -> future<> {
return _qp.local().db().invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) -> future<> {
auto& fm = cer.mutation();
// TODO: might need better verification that the deserialized mutation
// is schema compatible. My guess is that just applying the mutation
@@ -263,6 +271,11 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
// their "replay_position" attribute will be empty, which is
// lower than anything the new session will produce.
if (cf.schema()->version() != fm.schema_version()) {
auto& local_cm = _column_mappings.local().map;
auto cm_it = local_cm.find(fm.schema_version());
if (cm_it == local_cm.end()) {
cm_it = local_cm.emplace(fm.schema_version(), src_cm).first;
}
const column_mapping& cm = cm_it->second;
mutation m(fm.decorated_key(*cf.schema()), cf.schema());
converting_mutation_partition_applier v(cm, *cf.schema(), m.partition());
@@ -310,6 +323,7 @@ future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::
}
future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
return _impl->_column_mappings.start().then([this, files = std::move(files)] {
logger.info("Replaying {}", join(", ", files));
return map_reduce(files, [this](auto f) {
logger.debug("Replaying {}", f);
@@ -341,7 +355,10 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
, totals.invalid_mutations
, totals.skipped_mutations
);
}).finally([this] {
return _impl->_column_mappings.stop();
});
});
}
future<> db::commitlog_replayer::recover(sstring f) {