commitlog_replayer: don't use query_processor
During normal writes, query processing happens before commitlog, so logically commitlog replaying the commitlog shouldn't need it. And in fact the dependency on query_processor can be eliminated, all it needs is the local node's database.
This commit is contained in:
@@ -53,7 +53,6 @@
|
||||
#include "database.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "log.hh"
|
||||
#include "converting_mutation_partition_applier.hh"
|
||||
#include "schema_registry.hh"
|
||||
@@ -78,7 +77,7 @@ class db::commitlog_replayer::impl {
|
||||
|
||||
friend class db::commitlog_replayer;
|
||||
public:
|
||||
impl(seastar::sharded<cql3::query_processor>& db);
|
||||
impl(seastar::sharded<database>& db);
|
||||
|
||||
future<> init();
|
||||
|
||||
@@ -131,20 +130,20 @@ public:
|
||||
return j != i->second.end() ? j->second : replay_position();
|
||||
}
|
||||
|
||||
seastar::sharded<cql3::query_processor>&
|
||||
_qp;
|
||||
seastar::sharded<database>&
|
||||
_db;
|
||||
shard_rpm_map
|
||||
_rpm;
|
||||
shard_rp_map
|
||||
_min_pos;
|
||||
};
|
||||
|
||||
db::commitlog_replayer::impl::impl(seastar::sharded<cql3::query_processor>& qp)
|
||||
: _qp(qp)
|
||||
db::commitlog_replayer::impl::impl(seastar::sharded<database>& db)
|
||||
: _db(db)
|
||||
{}
|
||||
|
||||
future<> db::commitlog_replayer::impl::init() {
|
||||
return _qp.map_reduce([this](shard_rpm_map map) {
|
||||
return _db.map_reduce([this](shard_rpm_map map) {
|
||||
for (auto& p1 : map) {
|
||||
for (auto& p2 : p1.second) {
|
||||
auto& pp = _rpm[p1.first][p2.first];
|
||||
@@ -156,9 +155,9 @@ future<> db::commitlog_replayer::impl::init() {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, [this](cql3::query_processor& qp) {
|
||||
return do_with(shard_rpm_map{}, [this, &qp](shard_rpm_map& map) {
|
||||
return parallel_for_each(qp.db().local().get_column_families(), [&map, &qp](auto& cfp) {
|
||||
}, [this](database& db) {
|
||||
return do_with(shard_rpm_map{}, [this, &db](shard_rpm_map& map) {
|
||||
return parallel_for_each(db.get_column_families(), [&map, &db](auto& cfp) {
|
||||
auto uuid = cfp.first;
|
||||
// We do this on each cpu, for each CF, which technically is a little wasteful, but the values are
|
||||
// cached, this is only startup, and it makes the code easier.
|
||||
@@ -188,7 +187,7 @@ future<> db::commitlog_replayer::impl::init() {
|
||||
// existing sstables-per-shard.
|
||||
// So, go through all CF:s and check, if a shard mapping does not
|
||||
// have data for it, assume we must set global pos to zero.
|
||||
for (auto&p : _qp.local().db().local().get_column_families()) {
|
||||
for (auto&p : _db.local().get_column_families()) {
|
||||
for (auto&p1 : _rpm) { // for each shard
|
||||
if (!p1.second.count(p.first)) {
|
||||
_min_pos[p1.first] = replay_position();
|
||||
@@ -223,7 +222,7 @@ db::commitlog_replayer::impl::recover(sstring file, const sstring& fname_prefix)
|
||||
}
|
||||
|
||||
auto s = make_lw_shared<stats>();
|
||||
auto& exts = _qp.local().db().local().extensions();
|
||||
auto& exts = _db.local().extensions();
|
||||
|
||||
return db::commitlog::read_log_file(file, fname_prefix, service::get_local_commitlog_priority(),
|
||||
std::bind(&impl::process, this, s.get(), std::placeholders::_1,
|
||||
@@ -274,8 +273,8 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto shard = _qp.local().db().local().shard_of(fm);
|
||||
return _qp.local().db().invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) -> future<> {
|
||||
auto shard = _db.local().shard_of(fm);
|
||||
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) -> future<> {
|
||||
auto& fm = cer.mutation();
|
||||
// TODO: might need better verification that the deserialized mutation
|
||||
// is schema compatible. My guess is that just applying the mutation
|
||||
@@ -323,8 +322,8 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
db::commitlog_replayer::commitlog_replayer(seastar::sharded<cql3::query_processor>& qp)
|
||||
: _impl(std::make_unique<impl>(qp))
|
||||
db::commitlog_replayer::commitlog_replayer(seastar::sharded<database>& db)
|
||||
: _impl(std::make_unique<impl>(db))
|
||||
{}
|
||||
|
||||
db::commitlog_replayer::commitlog_replayer(commitlog_replayer&& r) noexcept
|
||||
@@ -334,8 +333,8 @@ db::commitlog_replayer::commitlog_replayer(commitlog_replayer&& r) noexcept
|
||||
db::commitlog_replayer::~commitlog_replayer()
|
||||
{}
|
||||
|
||||
future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::sharded<cql3::query_processor>& qp) {
|
||||
return do_with(commitlog_replayer(qp), [](auto&& rp) {
|
||||
future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::sharded<database>& db) {
|
||||
return do_with(commitlog_replayer(db), [](auto&& rp) {
|
||||
auto f = rp._impl->init();
|
||||
return f.then([rp = std::move(rp)]() mutable {
|
||||
return make_ready_future<commitlog_replayer>(std::move(rp));
|
||||
|
||||
@@ -47,10 +47,6 @@
|
||||
|
||||
class database;
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
|
||||
class commitlog;
|
||||
@@ -60,13 +56,13 @@ public:
|
||||
commitlog_replayer(commitlog_replayer&&) noexcept;
|
||||
~commitlog_replayer();
|
||||
|
||||
static future<commitlog_replayer> create_replayer(seastar::sharded<cql3::query_processor>&);
|
||||
static future<commitlog_replayer> create_replayer(seastar::sharded<database>&);
|
||||
|
||||
future<> recover(std::vector<sstring> files, sstring fname_prefix);
|
||||
future<> recover(sstring file, sstring fname_prefix);
|
||||
|
||||
private:
|
||||
commitlog_replayer(seastar::sharded<cql3::query_processor>&);
|
||||
commitlog_replayer(seastar::sharded<database>&);
|
||||
|
||||
class impl;
|
||||
std::unique_ptr<impl> _impl;
|
||||
|
||||
2
main.cc
2
main.cc
@@ -688,7 +688,7 @@ int main(int ac, char** av) {
|
||||
auto paths = cl->get_segments_to_replay();
|
||||
if (!paths.empty()) {
|
||||
supervisor::notify("replaying commit log");
|
||||
auto rp = db::commitlog_replayer::create_replayer(qp).get0();
|
||||
auto rp = db::commitlog_replayer::create_replayer(db).get0();
|
||||
rp.recover(paths, db::commitlog::descriptor::FILENAME_PREFIX).get();
|
||||
supervisor::notify("replaying commit log - flushing memtables");
|
||||
db.invoke_on_all([] (database& db) {
|
||||
|
||||
Reference in New Issue
Block a user