diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index cd9c64f397..69022f6ba4 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1814,7 +1814,7 @@ const db::commitlog::config& db::commitlog::active_config() const { // No commit_io_check needed in the log reader since the database will fail // on error at startup if required -future>> +future>> db::commitlog::read_log_file(const sstring& filename, const sstring& pfx, seastar::io_priority_class read_io_prio_class, commit_load_reader_func next, position_type off, const db::extensions* exts) { struct work { private: @@ -1828,7 +1828,7 @@ db::commitlog::read_log_file(const sstring& filename, const sstring& pfx, seasta public: file f; descriptor d; - stream s; + stream s; input_stream fin; input_stream r; uint64_t id = 0; @@ -2016,7 +2016,7 @@ db::commitlog::read_log_file(const sstring& filename, const sstring& pfx, seasta return make_ready_future<>(); } - return s.produce(std::move(buf), rp).handle_exception([this](auto ep) { + return s.produce({std::move(buf), rp}).handle_exception([this](auto ep) { return this->fail(); }); }); @@ -2069,7 +2069,7 @@ db::commitlog::read_log_file(const sstring& filename, const sstring& pfx, seasta w->s.set_exception(ep); }); - return std::make_unique>(std::move(ret)); + return std::make_unique>(std::move(ret)); }); } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 8af70ca6ac..93a6a04509 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -102,6 +102,11 @@ public: class segment; friend class rp_handle; + + struct buffer_and_replay_position { + fragmented_temporary_buffer buffer; + replay_position position; + }; private: ::shared_ptr _segment_manager; public: @@ -344,7 +349,7 @@ public: future> list_existing_segments() const; future> list_existing_segments(const sstring& dir) const; - typedef std::function(fragmented_temporary_buffer, replay_position)> commit_load_reader_func; + typedef std::function(buffer_and_replay_position)> commit_load_reader_func; class segment_error : public std::exception {}; @@ -380,7 +385,7 @@ public: } }; - static future>> read_log_file( + static future>> read_log_file( const sstring&, const sstring&, seastar::io_priority_class read_io_prio_class, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr); private: commitlog(config); diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index ded4cec0d1..1c225f2c89 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -111,7 +111,7 @@ public: return _column_mappings.stop(); } - future<> process(stats*, fragmented_temporary_buffer buf, replay_position rp) const; + future<> process(stats*, commitlog::buffer_and_replay_position buf_rp) const; future recover(sstring file, const sstring& fname_prefix) const; typedef std::unordered_map rp_map; @@ -226,8 +226,8 @@ db::commitlog_replayer::impl::recover(sstring file, const sstring& fname_prefix) auto& exts = _db.local().extensions(); return db::commitlog::read_log_file(file, fname_prefix, service::get_local_commitlog_priority(), - std::bind(&impl::process, this, s.get(), std::placeholders::_1, - std::placeholders::_2), p, &exts).then([](auto s) { + std::bind(&impl::process, this, s.get(), std::placeholders::_1), + p, &exts).then([](auto s) { auto f = s->done(); return f.finally([s = std::move(s)] {}); }).then_wrapped([s](future<> f) { @@ -242,7 +242,8 @@ db::commitlog_replayer::impl::recover(sstring file, const sstring& fname_prefix) }); } -future<> db::commitlog_replayer::impl::process(stats* s, fragmented_temporary_buffer buf, replay_position rp) const { +future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_replay_position buf_rp) const { + auto&& [buf, rp] = buf_rp; try { commitlog_entry_reader cer(buf); diff --git a/db/hints/manager.cc b/db/hints/manager.cc index ae4d7167f6..9e8e602619 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -742,7 +742,8 @@ bool manager::end_point_hints_manager::sender::send_one_file(const sstring& fnam lw_shared_ptr ctx_ptr = make_lw_shared(_last_schema_ver_to_column_mapping); try { - auto s = commitlog::read_log_file(fname, manager::FILENAME_PREFIX, service::get_local_streaming_read_priority(), [this, secs_since_file_mod, &fname, ctx_ptr] (fragmented_temporary_buffer buf, db::replay_position rp) mutable { + auto s = commitlog::read_log_file(fname, manager::FILENAME_PREFIX, service::get_local_streaming_read_priority(), [this, secs_since_file_mod, &fname, ctx_ptr] (commitlog::buffer_and_replay_position buf_rp) mutable { + auto&& [buf, rp] = buf_rp; // Check that we can still send the next hint. Don't try to send it if the destination host // is DOWN or if we have already failed to send some of the previous hints. if (!draining() && ctx_ptr->state.contains(send_state::segment_replay_failed)) { diff --git a/tests/commitlog_test.cc b/tests/commitlog_test.cc index 399f1a803a..23fcb6fbb4 100644 --- a/tests/commitlog_test.cc +++ b/tests/commitlog_test.cc @@ -306,7 +306,8 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) { SEASTAR_TEST_CASE(test_commitlog_reader){ static auto count_mutations_in_segment = [] (sstring path) -> future { auto count = make_lw_shared(0); - return db::commitlog::read_log_file(path, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [count](fragmented_temporary_buffer buf, db::replay_position rp) { + return db::commitlog::read_log_file(path, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [count](db::commitlog::buffer_and_replay_position buf_rp) { + auto&& [buf, rp] = buf_rp; auto linearization_buffer = bytes_ostream(); auto in = buf.get_istream(); auto str = to_sstring_view(in.read_bytes_view(buf.size_bytes(), linearization_buffer)); @@ -410,7 +411,8 @@ SEASTAR_TEST_CASE(test_commitlog_entry_corruption){ BOOST_REQUIRE(!segments.empty()); auto seg = segments[0]; return corrupt_segment(seg, rps->at(1).pos + 4, 0x451234ab).then([seg, rps, &log] { - return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [rps](fragmented_temporary_buffer buf, db::replay_position rp) { + return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [rps](db::commitlog::buffer_and_replay_position buf_rp) { + auto&& [buf, rp] = buf_rp; BOOST_CHECK_EQUAL(rp, rps->at(0)); return make_ready_future<>(); }).then([](auto s) { @@ -453,7 +455,7 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){ BOOST_REQUIRE(!segments.empty()); auto seg = segments[0]; return corrupt_segment(seg, rps->at(0).pos - 4, 0x451234ab).then([seg, rps, &log] { - return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [rps](fragmented_temporary_buffer buf, db::replay_position rp) { + return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [rps](db::commitlog::buffer_and_replay_position buf_rp) { BOOST_FAIL("Should not reach"); return make_ready_future<>(); }).then([](auto s) { @@ -495,7 +497,7 @@ SEASTAR_TEST_CASE(test_commitlog_reader_produce_exception){ auto segments = log.get_active_segment_names(); BOOST_REQUIRE(!segments.empty()); auto seg = segments[0]; - return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [](fragmented_temporary_buffer buf, db::replay_position rp) { + return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, service::get_local_commitlog_priority(), [](db::commitlog::buffer_and_replay_position buf_rp) { return make_exception_future(std::runtime_error("I am in a throwing mode")); }).then([](auto s) { return do_with(std::move(s), [](auto& s) {