commitlog: Make allocate_when_possible a template

And call it by-value with the polymorphic writers. This
eliminates outer coroutine frame and ensures we use only one
for fast-case allocation.
This commit is contained in:
Calle Wilund
2021-07-14 13:42:24 +00:00
parent 69ead0e658
commit 4990ba2769

View File

@@ -300,8 +300,9 @@ public:
_request_controller.signal(size);
}
future<>
allocate_when_possible(entry_writer& writer, db::timeout_clock::time_point timeout);
template<typename T, typename R = typename T::result_type>
requires std::derived_from<T, db::commitlog::entry_writer> && std::same_as<R, decltype(std::declval<T>().result())>
future<R> allocate_when_possible(T writer, db::timeout_clock::time_point timeout);
struct stats {
uint64_t cycle_count = 0;
@@ -1132,8 +1133,9 @@ public:
}
};
future<>
db::commitlog::segment_manager::allocate_when_possible(entry_writer& writer, db::timeout_clock::time_point timeout) {
template<typename T, typename R>
requires std::derived_from<T, db::commitlog::entry_writer> && std::same_as<R, decltype(std::declval<T>().result())>
future<R> db::commitlog::segment_manager::allocate_when_possible(T writer, db::timeout_clock::time_point timeout) {
auto size = writer.size();
// If this is already too big now, we should throw early. It's also a correctness issue, since
// if we are too big at this moment we'll never reach allocate() to actually throw at that
@@ -1159,7 +1161,7 @@ db::commitlog::segment_manager::allocate_when_possible(entry_writer& writer, db:
switch (s->allocate(writer, permit, timeout)) {
case write_result::ok:
co_return;
co_return writer.result();
case write_result::must_sync:
s = co_await with_timeout(timeout, s->sync());
continue;
@@ -1168,7 +1170,7 @@ db::commitlog::segment_manager::allocate_when_possible(entry_writer& writer, db:
continue;
case write_result::ok_need_batch_sync:
s = co_await s->batch_cycle(timeout);
co_return;
co_return writer.result();
}
}
}
@@ -2067,10 +2069,14 @@ future<db::rp_handle> db::commitlog::add(const cf_id_type& id,
void result(size_t, rp_handle h) override {
res = std::move(h);
}
using result_type = db::rp_handle;
result_type result() {
return std::move(res);
}
};
serializer_func_entry_writer writer(id, size, std::move(func), sync);
co_await _segment_manager->allocate_when_possible(writer, timeout);
co_return std::move(writer.res);
return _segment_manager->allocate_when_possible(serializer_func_entry_writer(id, size, std::move(func), sync), timeout);
}
future<db::rp_handle> db::commitlog::add_entry(const cf_id_type& id, const commitlog_entry_writer& cew, timeout_clock::time_point timeout)
@@ -2106,10 +2112,14 @@ future<db::rp_handle> db::commitlog::add_entry(const cf_id_type& id, const commi
void result(size_t, rp_handle h) override {
res = std::move(h);
}
using result_type = db::rp_handle;
result_type result() {
return std::move(res);
}
};
cl_entry_writer writer(cew);
co_await _segment_manager->allocate_when_possible(writer, timeout);
co_return std::move(writer.res);
return _segment_manager->allocate_when_possible(cl_entry_writer(cew), timeout);
}
future<std::vector<db::rp_handle>>
@@ -2162,12 +2172,16 @@ db::commitlog::add_entries(std::vector<commitlog_entry_writer> entry_writers, db
assert(i == res.size());
res.emplace_back(std::move(h));
}
using result_type = std::vector<db::rp_handle>;
result_type result() {
return std::move(res);
}
};
force_sync sync(std::any_of(entry_writers.begin(), entry_writers.end(), [](auto& w) { return bool(w.sync()); }));
cl_entries_writer writer(sync, std::move(entry_writers));
co_await _segment_manager->allocate_when_possible(writer, timeout);
co_return std::move(writer.res);
return _segment_manager->allocate_when_possible(cl_entries_writer(sync, std::move(entry_writers)), timeout);
}
db::commitlog::commitlog(config cfg)