From 9a6e6b4ae12d6b597a1278cd68f777e8a3932063 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Sun, 28 Jun 2015 19:01:36 +0300 Subject: [PATCH] locator: added network_topology_strategy class Signed-off-by: Vlad Zolotarov New in v2: - Use new location of a sequenced_set. - Indentation. - Use all caps in a FIXME word. --- configure.py | 1 + locator/network_topology_strategy.cc | 212 +++++++++++++++++++++++++++ locator/network_topology_strategy.hh | 99 +++++++++++++ 3 files changed, 312 insertions(+) create mode 100644 locator/network_topology_strategy.cc create mode 100644 locator/network_topology_strategy.hh diff --git a/configure.py b/configure.py index 2c4df86ecc..8aaecfa7a4 100755 --- a/configure.py +++ b/configure.py @@ -474,6 +474,7 @@ urchin_core = (['database.cc', 'locator/abstract_replication_strategy.cc', 'locator/simple_strategy.cc', 'locator/local_strategy.cc', + 'locator/network_topology_strategy.cc', 'locator/token_metadata.cc', 'locator/locator.cc', 'locator/snitch_base.cc', diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc new file mode 100644 index 0000000000..130efa781e --- /dev/null +++ b/locator/network_topology_strategy.cc @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "locator/network_topology_strategy.hh" +#include "utils/sequenced_set.hh" +#include + +namespace locator { + + +network_topology_strategy::network_topology_strategy( + const sstring& keyspace_name, + token_metadata& token_metadata, + snitch_ptr& snitch, + const std::map& config_options) : + abstract_replication_strategy(keyspace_name, + token_metadata, + snitch, + config_options) { + for (auto& config_pair : config_options) { + auto& key = config_pair.first; + auto& val = config_pair.second; + + // + // FIXME!!! + // The first option we get at the moment is a class name. Skip it! + // + if (boost::iequals(key, "class")) { + continue; + } + + if (boost::iequals(key, "replication_factor")) { + throw exceptions::configuration_exception( + "replication_factor is an option for SimpleStrategy, not " + "NetworkTopologyStrategy"); + } + + _dc_rep_factor.emplace(key, std::stol(val)); + _datacenteres.push_back(key); + } + + _rep_factor = 0; + + for (auto& one_dc_rep_factor : _dc_rep_factor) { + _rep_factor += one_dc_rep_factor.second; + } + + debug("Configured datacenter replicas are:"); + for (auto& p : _dc_rep_factor) { + debug("{}: {}", p.first, p.second); + } +} + +std::vector +network_topology_strategy::calculate_natural_endpoints( + const token& search_token) { + // + // We want to preserve insertion order so that the first added endpoint + // becomes primary. + // + utils::sequenced_set replicas; + + // replicas we have found in each DC + std::unordered_map> dc_replicas; + // tracks the racks we have already placed replicas in + std::unordered_map> seen_racks; + // + // tracks the endpoints that we skipped over while looking for unique racks + // when we relax the rack uniqueness we can append this to the current + // result so we don't have to wind back the iterator + // + std::unordered_map> + skipped_dc_endpoints; + + // + // Populate the temporary data structures. + // + replicas.reserve(get_replication_factor()); + for (auto& dc_rep_factor_pair : _dc_rep_factor) { + auto& dc_name = dc_rep_factor_pair.first; + + dc_replicas[dc_name].reserve(dc_rep_factor_pair.second); + seen_racks[dc_name] = {}; + skipped_dc_endpoints[dc_name] = {}; + } + + topology& tp = _token_metadata.get_topology(); + + // + // all endpoints in each DC, so we can check when we have exhausted all + // the members of a DC + // + std::unordered_map>& + all_endpoints = tp.get_datacenter_endpoints(); + // + // all racks in a DC so we can check when we have exhausted all racks in a + // DC + // + std::unordered_map>>& + racks = tp.get_datacenter_racks(); + + // not aware of any cluster members + assert(!all_endpoints.empty() && !racks.empty()); + + for (auto& next : _token_metadata.ring_range(search_token)) { + + if (has_sufficient_replicas(dc_replicas, all_endpoints)) { + break; + } + + inet_address ep = *_token_metadata.get_endpoint(next); + sstring dc = _snitch->get_datacenter(ep); + + auto& seen_racks_dc_set = seen_racks[dc]; + auto& racks_dc_map = racks[dc]; + auto& skipped_dc_endpoints_set = skipped_dc_endpoints[dc]; + auto& dc_replicas_dc_set = dc_replicas[dc]; + + // have we already found all replicas for this dc? + if (_dc_rep_factor.find(dc) == _dc_rep_factor.end() || + has_sufficient_replicas(dc, dc_replicas, all_endpoints)) { + continue; + } + + // + // can we skip checking the rack? - namely, we've seen all racks in this + // DC already and may add this endpoint right away. + // + if (seen_racks_dc_set.size() == racks_dc_map.size()) { + dc_replicas_dc_set.insert(ep); + replicas.push_back(ep); + } else { + sstring rack = _snitch->get_rack(ep); + // is this a new rack? - we prefer to replicate on different racks + if (seen_racks_dc_set.find(rack) != seen_racks_dc_set.end()) { + skipped_dc_endpoints_set.push_back(ep); + } else { // this IS a new rack + dc_replicas_dc_set.insert(ep); + replicas.push_back(ep); + seen_racks_dc_set.insert(rack); + // + // if we've run out of distinct racks, add the hosts we skipped + // past already (up to RF) + // + if (seen_racks_dc_set.size() == racks_dc_map.size()) + { + auto skipped_it = skipped_dc_endpoints_set.begin(); + while (skipped_it != skipped_dc_endpoints_set.end() && + !has_sufficient_replicas(dc, dc_replicas, all_endpoints)) { + inet_address skipped = *skipped_it++; + dc_replicas_dc_set.insert(skipped); + replicas.push_back(skipped); + } + } + } + } + } + + return std::move(replicas.get_vector()); +} + +inline bool network_topology_strategy::has_sufficient_replicas( + const sstring& dc, + std::unordered_map>& dc_replicas, + std::unordered_map>& all_endpoints) { + + return dc_replicas[dc].size() >= + std::min(all_endpoints[dc].size(), get_replication_factor(dc)); +} + +inline bool network_topology_strategy::has_sufficient_replicas( + std::unordered_map>& dc_replicas, + std::unordered_map>& all_endpoints) { + + for (auto& dc : get_datacenters()) { + if (!has_sufficient_replicas(dc, dc_replicas, all_endpoints)) { + return false; + } + } + + return true; +} + +using registry = class_registrator&>; +static registry registrator("org.apache.cassandra.locator.NetworkTopologyStrategy"); +static registry registrator_short_name("NetworkTopologyStrategy"); +} diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh new file mode 100644 index 0000000000..8a9d9419c0 --- /dev/null +++ b/locator/network_topology_strategy.hh @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#pragma once + +#include "locator/abstract_replication_strategy.hh" +#include "exceptions/exceptions.hh" + +namespace locator { +class network_topology_strategy : public abstract_replication_strategy { +public: + network_topology_strategy( + const sstring& keyspace_name, + token_metadata& token_metadata, + snitch_ptr& snitch, + const std::map& config_options); + + virtual size_t get_replication_factor() const override { + return _rep_factor; + } + + size_t get_replication_factor(const sstring& dc) const { + auto dc_factor = _dc_rep_factor.find(dc); + return (dc_factor == _dc_rep_factor.end()) ? 0 : dc_factor->second; + } + +protected: + /** + * calculate endpoints in one pass through the tokens by tracking our + * progress in each DC, rack etc. + */ + virtual std::vector calculate_natural_endpoints( + const token& search_token) override; + +private: + bool has_sufficient_replicas( + const sstring& dc, + std::unordered_map>& dc_replicas, + std::unordered_map>& all_endpoints); + + bool has_sufficient_replicas( + std::unordered_map>& dc_replicas, + std::unordered_map>& all_endpoints); + + const std::vector& get_datacenters() const { + return _datacenteres; + } + + void validate_options() { + for (auto& c : _config_options) + { + if (c.first == sstring("replication_factor")) + throw exceptions::configuration_exception( + "replication_factor is an option for simple_strategy, not " + "network_topology_strategy"); + + validate_replication_factor(c.second); + } + } + + // ???? + #if 0 + public Collection recognized_options() + { + // We explicitely allow all options + return null; + } + #endif + +private: + // map: data centers -> replication factor + std::unordered_map _dc_rep_factor; + + std::vector _datacenteres; + size_t _rep_factor; +}; +} // namespace locator