Files
scylla/db/view/view_update_generator.hh
Nadav Har'El 4505a86f46 tablets, mv: fix base-view pairing to consider base replication map
In the view update code, the function get_view_natural_endpoint()
determines which view replica this base replica should send an update
to. It currently gets the *view* table's replication map (i.e., the map
from view tokens to lists of replicas holding the token), but assumes
that this is also the *base* table's replication map.

This assumption was true with vnodes, but is no longer true with
tablets - the base table's replication map can be completely different
from the view table's. By looking at the wrong mapping,
get_view_natural_endpoint() can believe that this node isn't really
a base-replica and drop the view update. Alternatively, it can think
it is a base replica - but use the wrong base-view pairing and create
base-view inconsistencies.

This patch solves this bug - get_view_natural_endpoint() now gets two
separate replication maps - the base's and the view's. The callers
need to remember what the base table was (in some cases they didn't
care at the point of the call), and pass it to the function call.

This patch also includes a simple test that reproduces the bug, and
confirms it is fixed: The test has a 6-node cluster using tablets
and a base table with RF=1, and writes one row to it. Before this
patch, the code usually gets confused, thinking the base replica
isn't a replica and loses the view update. With this patch, the
view update works.

Fixes #16227.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#16228
2023-12-04 16:38:54 +02:00

100 lines
2.9 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "sstables/shared_sstable.hh"
#include "db/timeout_clock.hh"
#include "utils/chunked_vector.hh"
#include "schema/schema_fwd.hh"
#include <seastar/core/sharded.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/semaphore.hh>
using namespace seastar;
struct frozen_mutation_and_schema;
namespace dht {
class token;
}
namespace tracing {
class trace_state_ptr;
}
namespace replica {
class database;
class table;
struct cf_stats;
}
namespace service {
class storage_proxy;
struct allow_hints_tag;
using allow_hints = bool_class<allow_hints_tag>;
}
namespace db::view {
class stats;
struct wait_for_all_updates_tag {};
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
class view_update_generator : public async_sharded_service<view_update_generator> {
public:
static constexpr size_t registration_queue_size = 100;
private:
replica::database& _db;
sharded<service::storage_proxy>& _proxy;
seastar::abort_source _as;
future<> _started = make_ready_future<>();
seastar::condition_variable _pending_sstables;
named_semaphore _registration_sem{registration_queue_size, named_semaphore_exception_factory{"view update generator"}};
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
metrics::metric_groups _metrics;
class progress_tracker;
std::unique_ptr<progress_tracker> _progress_tracker;
optimized_optional<abort_source::subscription> _early_abort_subscription;
void do_abort() noexcept;
public:
view_update_generator(replica::database& db, sharded<service::storage_proxy>& proxy, abort_source& as);
~view_update_generator();
future<> start();
future<> stop();
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
replica::database& get_db() noexcept { return _db; }
future<> mutate_MV(
schema_ptr base,
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
replica::cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all);
ssize_t available_register_units() const { return _registration_sem.available_units(); }
size_t queued_batches_count() const { return _sstables_with_tables.size(); }
private:
bool should_throttle() const;
void setup_metrics();
void discover_staging_sstables();
};
}