diff --git a/db/view/view.cc b/db/view/view.cc index fdc5a7804f..3a4d36bc30 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -46,6 +46,9 @@ #include "cql3/statements/select_statement.hh" #include "cql3/util.hh" #include "db/view/view.hh" +#include "gms/inet_address.hh" +#include "locator/network_topology_strategy.hh" +#include "service/storage_service.hh" namespace db { @@ -609,6 +612,68 @@ future> generate_view_updates( return f.finally([builder = std::move(builder)] { }); } +// Calculate the node ("natural endpoint") to which this node should send +// a view update. +// +// A materialized view table is in the same keyspace as its base table, +// and in particular both have the same replication factor. Therefore it +// is possible, for a particular base partition and related view partition +// to "pair" between the base replicas and view replicas holding those +// partitions. The first (in ring order) base replica is paired with the +// first view replica, the second with the second, and so on. The purpose +// of this function is to find, assuming that this node is one of the base +// replicas for a given partition, the paired view replica. +// +// If the keyspace's replication strategy is a NetworkTopologyStrategy, +// we pair only nodes in the same datacenter. +// If one of the base replicas also happens to be a view replica, it is +// paired with itself (with the other nodes paired by order in the list +// after taking this node out). +// +// If the assumption that the given base token belongs to this replica +// does not hold, we return an empty optional. +static stdx::optional +get_view_natural_endpoint(const sstring& keyspace_name, + const dht::token& base_token, const dht::token& view_token) { + auto &db = service::get_local_storage_service().db().local(); + auto& rs = db.find_keyspace(keyspace_name).get_replication_strategy(); + auto my_address = utils::fb_utilities::get_broadcast_address(); + auto my_datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(my_address); + bool network_topology = dynamic_cast(&rs); + std::vector base_endpoints, view_endpoints; + for (auto&& base_endpoint : rs.get_natural_endpoints(base_token)) { + if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(base_endpoint) == my_datacenter) { + base_endpoints.push_back(base_endpoint); + } + } + + for (auto&& view_endpoint : rs.get_natural_endpoints(view_token)) { + // If this base replica is also one of the view replicas, we use + // ourselves as the view replica. + if (view_endpoint == my_address) { + return view_endpoint; + } + // We have to remove any endpoint which is shared between the base + // and the view, as it will select itself and throw off the counts + // otherwise. + auto it = std::find(base_endpoints.begin(), base_endpoints.end(), + view_endpoint); + if (it != base_endpoints.end()) { + base_endpoints.erase(it); + } else if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(view_endpoint) == my_datacenter) { + view_endpoints.push_back(view_endpoint); + } + } + + assert(base_endpoints.size() == view_endpoints.size()); + auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), my_address); + if (base_it == base_endpoints.end()) { + // This node is not a base replica of this key, so we return empty + return {}; + } + return view_endpoints[base_it - base_endpoints.begin()]; +} + } // namespace view } // namespace db