locator: add ec2_snitch
This snitch will read the EC2 availability zone and set the DC and RACK as follows: If availability zone is "us-east-1d", then DC="us-east" and RACK="1d". If cassandra-rackdc.properties contains "dc_suffix" field then DC will be appended with its value. For instance if dc_suffix=_1_cassandra, then in the example above DC=us-east_1_cassandra Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
This commit is contained in:
@@ -355,6 +355,7 @@ urchin_core = (['database.cc',
|
||||
'locator/rack_inferring_snitch.cc',
|
||||
'locator/gossiping_property_file_snitch.cc',
|
||||
'locator/production_snitch_base.cc',
|
||||
'locator/ec2_snitch.cc',
|
||||
'message/messaging_service.cc',
|
||||
'service/migration_task.cc',
|
||||
'service/storage_service.cc',
|
||||
|
||||
120
locator/ec2_snitch.cc
Normal file
120
locator/ec2_snitch.cc
Normal file
@@ -0,0 +1,120 @@
|
||||
#include "locator/ec2_snitch.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
ec2_snitch::ec2_snitch(const sstring& fname, unsigned io_cpuid) : production_snitch_base(fname) {
|
||||
if (engine().cpu_id() == io_cpuid) {
|
||||
io_cpu_id() = io_cpuid;
|
||||
}
|
||||
}
|
||||
|
||||
future<> ec2_snitch::start() {
|
||||
using namespace boost::algorithm;
|
||||
|
||||
_state = snitch_state::initializing;
|
||||
|
||||
if (engine().cpu_id() == io_cpu_id()) {
|
||||
return aws_api_call(ZONE_QUERY_SERVER_ADDR, ZONE_NAME_QUERY_REQ).then([this](sstring az){
|
||||
assert(az.size());
|
||||
|
||||
std::vector<std::string> splits;
|
||||
|
||||
// Split "us-east-1a" or "asia-1a" into "us-east"/"1a" and "asia"/"1a".
|
||||
split(splits, az, is_any_of("-"));
|
||||
assert(splits.size() > 1);
|
||||
|
||||
_my_rack = splits[splits.size() - 1];
|
||||
|
||||
// hack for CASSANDRA-4026
|
||||
_my_dc = az.substr(0, az.size() - 1);
|
||||
if (_my_dc[_my_dc.size() - 1] == '1') {
|
||||
_my_dc = az.substr(0, az.size() - 3);
|
||||
}
|
||||
|
||||
return read_property_file().then([this] (sstring datacenter_suffix) {
|
||||
_my_dc += datacenter_suffix;
|
||||
logger().info("EC2Snitch using region: {}, zone: {}.", _my_dc, _my_rack);
|
||||
|
||||
return _my_distributed->invoke_on_all(
|
||||
[this] (snitch_ptr& local_s) {
|
||||
|
||||
// Distribute the new values on all CPUs but the current one
|
||||
if (engine().cpu_id() != io_cpu_id()) {
|
||||
local_s->set_my_dc(_my_dc);
|
||||
local_s->set_my_rack(_my_rack);
|
||||
}
|
||||
}).then([this] {
|
||||
set_snitch_ready();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
set_snitch_ready();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<sstring> ec2_snitch::aws_api_call(sstring addr, sstring cmd) {
|
||||
return engine().net().connect(make_ipv4_address(ipv4_addr{addr}))
|
||||
.then([this, addr, cmd] (connected_socket fd) {
|
||||
_sd = std::move(fd);
|
||||
_in = std::move(_sd.input());
|
||||
_out = std::move(_sd.output());
|
||||
_zone_req = sstring("GET ") + cmd +
|
||||
sstring(" HTTP/1.1\r\nHost: ") +addr +
|
||||
sstring("\r\n\r\n");
|
||||
|
||||
return _out.write(_zone_req.c_str()).then([this] {
|
||||
return _out.flush();
|
||||
});
|
||||
}).then([this] {
|
||||
_parser.init();
|
||||
return _in.consume(_parser).then([this] {
|
||||
if (_parser.eof()) {
|
||||
return make_exception_future<sstring>("Bad HTTP response");
|
||||
}
|
||||
|
||||
// Read HTTP response header first
|
||||
auto _rsp = _parser.get_parsed_response();
|
||||
auto it = _rsp->_headers.find("Content-Length");
|
||||
if (it == _rsp->_headers.end()) {
|
||||
return make_exception_future<sstring>("Error: HTTP response does not contain: Content-Length\n");
|
||||
}
|
||||
|
||||
auto content_len = std::stoi(it->second);
|
||||
|
||||
// Read HTTP response body
|
||||
return _in.read_exactly(content_len).then([this] (temporary_buffer<char> buf) {
|
||||
sstring res(buf.get(), buf.size());
|
||||
|
||||
return make_ready_future<sstring>(std::move(res));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<sstring> ec2_snitch::read_property_file() {
|
||||
return load_property_file().then([this] {
|
||||
sstring dc_suffix;
|
||||
|
||||
if (_prop_values.count(dc_suffix_property_key)) {
|
||||
dc_suffix = _prop_values[dc_suffix_property_key];
|
||||
}
|
||||
|
||||
return dc_suffix;
|
||||
});
|
||||
}
|
||||
|
||||
using registry_2_params = class_registrator<i_endpoint_snitch, ec2_snitch, const sstring&, unsigned>;
|
||||
static registry_2_params registrator2("org.apache.cassandra.locator.EC2Snitch");
|
||||
static registry_2_params registrator2_short_name("EC2Snitch");
|
||||
|
||||
|
||||
using registry_1_param = class_registrator<i_endpoint_snitch, ec2_snitch, const sstring&>;
|
||||
static registry_1_param registrator1("org.apache.cassandra.locator.EC2Snitch");
|
||||
static registry_1_param registrator1_short_name("EC2Snitch");
|
||||
|
||||
using registry_default = class_registrator<i_endpoint_snitch, ec2_snitch>;
|
||||
static registry_default registrator_default("org.apache.cassandra.locator.EC2Snitch");
|
||||
static registry_default registrator_default_short_name("EC2Snitch");
|
||||
} // namespace locator
|
||||
47
locator/ec2_snitch.hh
Normal file
47
locator/ec2_snitch.hh
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
* Modified by Cloudius Systems.
|
||||
* Copyright 2015 Cloudius Systems.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "locator/production_snitch_base.hh"
|
||||
#include "http/http_response_parser.hh"
|
||||
|
||||
namespace locator {
|
||||
class ec2_snitch : public production_snitch_base {
|
||||
public:
|
||||
static constexpr const char* ZONE_NAME_QUERY_REQ = "/latest/meta-data/placement/availability-zone";
|
||||
static constexpr const char* ZONE_QUERY_SERVER_ADDR = "169.254.169.254:80";
|
||||
|
||||
ec2_snitch(const sstring& fname = "", unsigned io_cpu_id = 0);
|
||||
virtual future<> start() override;
|
||||
virtual sstring get_name() const override {
|
||||
return "org.apache.cassandra.locator.EC2Snitch";
|
||||
}
|
||||
protected:
|
||||
future<sstring> aws_api_call(sstring addr, const sstring cmd);
|
||||
future<sstring> read_property_file();
|
||||
private:
|
||||
connected_socket _sd;
|
||||
input_stream<char> _in;
|
||||
output_stream<char> _out;
|
||||
http_response_parser _parser;
|
||||
sstring _zone_req;
|
||||
};
|
||||
} // namespace locator
|
||||
Reference in New Issue
Block a user