diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index a6afe73ff..20e7905f9 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -18,6 +18,7 @@ ** xref:3.tutorials/3c.dns-lookup.adoc[DNS Lookup Tutorial] ** xref:3.tutorials/3d.tls-context.adoc[TLS Context Configuration] ** xref:3.tutorials/3e.hash-server.adoc[Hash Server] +** xref:3.tutorials/3f.reconnect.adoc[Reconnect with Backoff] * xref:4.guide/4.intro.adoc[Guide] ** xref:4.guide/4a.tcp-networking.adoc[TCP/IP Networking] ** xref:4.guide/4b.concurrent-programming.adoc[Concurrent Programming] diff --git a/doc/modules/ROOT/pages/3.tutorials/3f.reconnect.adoc b/doc/modules/ROOT/pages/3.tutorials/3f.reconnect.adoc new file mode 100644 index 000000000..5f592e732 --- /dev/null +++ b/doc/modules/ROOT/pages/3.tutorials/3f.reconnect.adoc @@ -0,0 +1,340 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += Reconnect with Exponential Backoff + +This tutorial builds a TCP client that connects to a server and automatically +reconnects with exponential backoff when the connection fails. You'll learn +how to combine timers with sockets for retry logic and how to use stop tokens +for graceful shutdown. + +NOTE: Code snippets assume: +[source,cpp] +---- +#include +#include +#include +#include +#include +#include +#include +#include + +namespace corosio = boost::corosio; +namespace capy = boost::capy; +---- + +== Overview + +Client applications often need to maintain a persistent connection to a server. +When the server is temporarily unavailable — during a restart, a network blip, +or a deployment — the client should retry rather than give up immediately. +Retrying too aggressively wastes resources and can overwhelm a recovering +server, so the delay between attempts should grow over time. + +Exponential backoff solves this: start with a short delay, double it on each +failure, and cap it at a maximum. This gives fast recovery when the outage is +brief and backs off gracefully when it isn't. + +This tutorial demonstrates: + +* Separating the backoff _policy_ (pure state) from the _mechanism_ (timer wait) +* Using `timer` for inter-attempt delays +* Graceful cancellation via stop tokens +* Why `io_context::stop()` alone is not sufficient for coroutine shutdown + +== The Backoff Policy + +The delay logic is pure computation — no I/O, no coroutines. A simple value +type tracks the current delay, doubles it on each call, and caps it at a +configured maximum: + +[source,cpp] +---- +struct exponential_backoff +{ + using duration = std::chrono::milliseconds; + +private: + duration initial_; + duration delay_; + duration max_; + +public: + exponential_backoff(duration initial, duration max) noexcept + : initial_(initial) + , delay_(initial) + , max_(max) + { + } + + /// Return the current delay and advance to the next. + duration next() noexcept + { + auto current = (std::min)(delay_, max_); + delay_ = (std::min)(delay_ * 2, max_); + return current; + } + + /// Restart the sequence from the initial delay. + void reset() noexcept + { + delay_ = initial_; + } +}; +---- + +With an initial delay of 500ms and a 30s cap, calling `next()` produces: +500, 1000, 2000, 4000, 8000, 16000, 30000, 30000, ... + +Keeping the policy separate from the timer means it can be reused in any +context — synchronous retries, tests, or logging — without pulling in +async machinery. + +== Session Coroutine + +Once connected, the client reads data until the peer disconnects: + +[source,cpp] +---- +capy::task<> +do_session(corosio::tcp_socket& sock) +{ + char buf[4096]; + for (;;) + { + auto [ec, n] = + co_await sock.read_some(capy::mutable_buffer(buf, sizeof buf)); + if (ec) + break; + std::cout.write(buf, static_cast(n)); + std::cout.flush(); + } +} +---- + +This is the same read loop you would find in any echo client. The interesting +part is what happens after it returns — the caller reconnects. + +== Reconnection Loop + +The retry loop ties everything together. On each failed connection it asks the +backoff policy for the next delay, waits on a timer, and tries again: + +[source,cpp] +---- +capy::task<> +connect_with_backoff( + corosio::io_context& ioc, + corosio::endpoint ep, + exponential_backoff backoff, + int max_attempts) +{ + corosio::tcp_socket sock(ioc); + corosio::timer delay(ioc); + int attempt = 0; + + for (;;) + { + ++attempt; + + auto [ec] = co_await sock.connect(ep); + if (!ec) + { + std::cout << "Connected on attempt " << attempt << std::endl; + co_await do_session(sock); + + // Peer disconnected — restart the retry sequence + sock.close(); + backoff.reset(); + attempt = 0; + continue; + } + + sock.close(); + + if (max_attempts > 0 && attempt >= max_attempts) + co_return; + + auto wait_for = backoff.next(); + + delay.expires_after(wait_for); + auto [timer_ec] = co_await delay.wait(); + if (timer_ec == capy::cond::canceled) + co_return; + + // delay doubles automatically via backoff.next() + } +} +---- + +There are two exit conditions: + +1. **Max attempts exhausted** — the coroutine gives up. +2. **Timer cancelled** — someone signaled the stop token, requesting graceful + shutdown. The coroutine unwinds through normal control flow. + +After a successful connection and subsequent disconnect, `backoff.reset()` +restarts the delay sequence from the initial value. + +== Graceful Shutdown with Stop Tokens + +The key insight of this tutorial: **`io_context::stop()` does not cancel +pending operations.** It only stops the event loop. Suspended coroutines are +left in place and destroyed during `~io_context` without ever observing an +error. This is by design — `stop()` is a _pause_ that preserves state for +a potential `restart()`. + +For graceful shutdown where coroutines unwind through their own control flow, +use a stop token: + +[source,cpp] +---- +std::stop_source stop_src; + +capy::run_async(ioc.get_executor(), stop_src.get_token())( + connect_with_backoff(ioc, ep, backoff, 10)); + +// Later, from any thread: +stop_src.request_stop(); +---- + +When the stop source is signaled: + +1. The timer's `wait()` returns `cond::canceled`. +2. The coroutine checks the error and executes `co_return`. +3. Local variables (`sock`, `delay`) are destroyed through normal unwinding. +4. With no more outstanding work, `run()` returns. +5. `~io_context` finds an empty heap — nothing to clean up. + +Contrast with calling `stop()` directly: + +1. `run()` exits immediately. +2. The coroutine remains suspended — it never sees an error. +3. `~io_context` calls `h.destroy()` on the coroutine frame, bypassing its + error-handling logic. + +Both paths are safe (no leaks or crashes), but only the stop token path +executes the coroutine's own cleanup code. + +[cols="1,1,1"] +|=== +| Mechanism | Coroutine sees cancellation? | Use case + +| `stop_token` +| Yes — operations return `cond::canceled` +| Graceful shutdown + +| `stop()` + `restart()` +| No — coroutines stay suspended +| Pause and resume the event loop + +| `~io_context` +| No — frames destroyed via `h.destroy()` +| Final cleanup (after `stop()` or natural exit) +|=== + +== Main Function + +[source,cpp] +---- +int main(int argc, char* argv[]) +{ + if (argc != 3) + { + std::cerr << "Usage: reconnect \n"; + return EXIT_FAILURE; + } + + corosio::ipv4_address addr; + if (auto ec = corosio::parse_ipv4_address(argv[1], addr); ec) + { + std::cerr << "Invalid IP address: " << argv[1] << "\n"; + return EXIT_FAILURE; + } + + auto port = static_cast(std::atoi(argv[2])); + + corosio::io_context ioc; + + using namespace std::chrono_literals; + exponential_backoff backoff(500ms, 30s); + + std::stop_source stop_src; + + capy::run_async(ioc.get_executor(), stop_src.get_token())( + connect_with_backoff(ioc, corosio::endpoint(addr, port), backoff, 10)); + + // Run the event loop on a background thread so main + // can signal cancellation after a timeout. + auto worker = std::jthread([&ioc] { ioc.run(); }); + + std::this_thread::sleep_for(5s); + stop_src.request_stop(); +} +---- + +The event loop runs on a background thread. After five seconds the main thread +signals cancellation. The coroutine observes `cond::canceled`, unwinds, the +work count reaches zero, and `run()` returns. The `jthread` destructor joins +automatically. + +== Testing + +Start an echo server on one terminal: + +[source,bash] +---- +$ ./echo_server 8080 10 +Echo server listening on port 8080 with 10 workers +---- + +Run the reconnect client on another: + +[source,bash] +---- +$ ./reconnect 127.0.0.1 8080 +Connected on attempt 1 +---- + +Stop the server and watch the client retry: + +[source,bash] +---- +Attempt 1 failed: Connection refused +Retrying in 500ms +Attempt 2 failed: Connection refused +Retrying in 1000ms +Attempt 3 failed: Connection refused +Retrying in 2000ms +---- + +Restart the server — the client reconnects on the next attempt. + +To test the no-server case, point the client at a port with nothing listening: + +[source,bash] +---- +$ ./reconnect 127.0.0.1 19999 +Attempt 1 failed: Connection refused +Retrying in 500ms +Attempt 2 failed: Connection refused +Retrying in 1000ms +... +Retry cancelled +---- + +After five seconds the stop token fires and the client exits cleanly. + +== Next Steps + +* xref:../4.guide/4h.timers.adoc[Timers Guide] — Timer operations in detail +* xref:../4.guide/4d.sockets.adoc[Sockets Guide] — Socket operations and error handling +* xref:../4.guide/4c.io-context.adoc[I/O Context Guide] — Event loop mechanics, `stop()`, and `restart()` +* xref:../4.guide/4m.error-handling.adoc[Error Handling] — Portable error conditions and `cond` diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 91132a134..995309ebe 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -11,6 +11,7 @@ add_subdirectory(client) add_subdirectory(echo-server) add_subdirectory(hash-server) add_subdirectory(nslookup) +add_subdirectory(reconnect) if(WolfSSL_FOUND) add_subdirectory(https-client) diff --git a/example/reconnect/CMakeLists.txt b/example/reconnect/CMakeLists.txt new file mode 100644 index 000000000..bda301d61 --- /dev/null +++ b/example/reconnect/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Steve Gerbino +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/corosio +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(corosio_example_reconnect ${PFILES}) + +set_property(TARGET corosio_example_reconnect + PROPERTY FOLDER "examples") + +target_link_libraries(corosio_example_reconnect + Boost::corosio) diff --git a/example/reconnect/reconnect.cpp b/example/reconnect/reconnect.cpp new file mode 100644 index 000000000..6f830c7b5 --- /dev/null +++ b/example/reconnect/reconnect.cpp @@ -0,0 +1,227 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace corosio = boost::corosio; +namespace capy = boost::capy; + +/** Exponential backoff delay sequence. + + Produces a series of increasing delays starting from an + initial value and doubling on each call to @ref next, capped + at a configured maximum. Call @ref reset to restart the + sequence after a successful connection. + + @par Example + @code + exponential_backoff backoff(500ms, 30s); + timer.expires_after(backoff.next()); // 500ms + timer.expires_after(backoff.next()); // 1000ms + timer.expires_after(backoff.next()); // 2000ms + backoff.reset(); + timer.expires_after(backoff.next()); // 500ms + @endcode +*/ +struct exponential_backoff +{ + using duration = std::chrono::milliseconds; + +private: + duration initial_; + duration delay_; + duration max_; + +public: + /// Construct a backoff with the given initial and maximum delays. + exponential_backoff(duration initial, duration max) noexcept + : initial_(initial) + , delay_(initial) + , max_(max) + { + } + + /// Return the current delay and advance to the next. + duration next() noexcept + { + auto current = (std::min)(delay_, max_); + delay_ = (std::min)(delay_ * 2, max_); + return current; + } + + /// Restart the sequence from the initial delay. + void reset() noexcept + { + delay_ = initial_; + } +}; + +/// Read from the socket until the peer disconnects. +capy::task<> +do_session(corosio::tcp_socket& sock) +{ + char buf[4096]; + for (;;) + { + auto [ec, n] = + co_await sock.read_some(capy::mutable_buffer(buf, sizeof buf)); + if (ec) + break; + std::cout.write(buf, static_cast(n)); + std::cout.flush(); + } +} + +/** Connect to an endpoint with exponential backoff. + + Attempts to connect to @p ep, doubling the delay between + each retry. If the connection cannot be established before + @p max_attempts are exhausted, the coroutine returns. On a + successful connection, the session runs until the peer + disconnects, then reconnection resumes from the initial + delay. + + The backoff timer is sensitive to the coroutine's stop + token. Cancelling the token (or calling `io_context::stop()`) + will end the retry loop cleanly. + + @param ioc The I/O context to use for socket and timer + operations. + @param ep The endpoint to connect to. + @param backoff The backoff policy to use between retries. + @param max_attempts Maximum connection attempts before + giving up. Zero means unlimited. +*/ +capy::task<> +connect_with_backoff( + corosio::io_context& ioc, + corosio::endpoint ep, + exponential_backoff backoff, + int max_attempts) +{ + corosio::tcp_socket sock(ioc); + corosio::timer delay(ioc); + int attempt = 0; + + for (;;) + { + ++attempt; + + auto [ec] = co_await sock.connect(ep); + if (!ec) + { + std::cout << "Connected on attempt " << attempt << std::endl; + co_await do_session(sock); + + // Peer disconnected — restart the retry sequence + sock.close(); + backoff.reset(); + attempt = 0; + std::cout << "Disconnected, reconnecting..." << std::endl; + continue; + } + + sock.close(); + + std::cout << "Attempt " << attempt << " failed: " << ec.message() + << std::endl; + + if (max_attempts > 0 && attempt >= max_attempts) + { + std::cout << "Giving up after " << attempt << " attempts" + << std::endl; + co_return; + } + + auto wait_for = backoff.next(); + + std::cout << "Retrying in " << wait_for.count() << "ms" << std::endl; + + delay.expires_after(wait_for); + auto [timer_ec] = co_await delay.wait(); + if (timer_ec == capy::cond::canceled) + { + std::cout << "Retry cancelled" << std::endl; + co_return; + } + } +} + +int +main(int argc, char* argv[]) +{ + if (argc != 3) + { + std::cerr << "Usage: reconnect \n" + "Example:\n" + " reconnect 127.0.0.1 8080\n"; + return EXIT_FAILURE; + } + + // Parse IP address + corosio::ipv4_address addr; + if (auto ec = corosio::parse_ipv4_address(argv[1], addr); ec) + { + std::cerr << "Invalid IP address: " << argv[1] << "\n"; + return EXIT_FAILURE; + } + + // Parse port + int port_int = std::atoi(argv[2]); + if (port_int <= 0 || port_int > 65535) + { + std::cerr << "Invalid port: " << argv[2] << "\n"; + return EXIT_FAILURE; + } + auto port = static_cast(port_int); + + // Create I/O context and run + corosio::io_context ioc; + + using namespace std::chrono_literals; + exponential_backoff backoff(500ms, 30s); + + // The stop_source lets us cancel the coroutine gracefully + // from any thread. When signaled, pending timer and connect + // operations return cond::canceled, the coroutine's own loop + // breaks, and it unwinds through normal control flow. + // + // Contrast with io_context::stop(), which yanks the event + // loop out from under suspended coroutines without giving + // them a chance to observe cancellation. stop() is safe + // (pending operations are cleaned up during destruction), + // but coroutine-internal cleanup logic is bypassed. + std::stop_source stop_src; + + capy::run_async(ioc.get_executor(), stop_src.get_token())( + connect_with_backoff(ioc, corosio::endpoint(addr, port), backoff, 10)); + + // Run the event loop on a background thread so main + // can signal cancellation after a timeout. + std::thread worker([&ioc] { ioc.run(); }); + + std::this_thread::sleep_for(5s); + stop_src.request_stop(); + worker.join(); + + return EXIT_SUCCESS; +} diff --git a/include/boost/corosio/detail/timer_service.hpp b/include/boost/corosio/detail/timer_service.hpp index 36a9cdba1..f4db7d219 100644 --- a/include/boost/corosio/detail/timer_service.hpp +++ b/include/boost/corosio/detail/timer_service.hpp @@ -128,6 +128,7 @@ class BOOST_COROSIO_DECL timer_service final implementation* free_list_ = nullptr; waiter_node* waiter_free_list_ = nullptr; callback on_earliest_changed_; + bool shutting_down_ = false; // Avoids mutex in nearest_expiry() and empty() mutable std::atomic cached_nearest_ns_{ (std::numeric_limits::max)()}; @@ -363,16 +364,30 @@ inline void timer_service::shutdown() { timer_service_invalidate_cache(); - - // Cancel waiting timers still in the heap. - // Each waiter called work_started() in implementation::wait(). - // On IOCP the scheduler shutdown loop exits when outstanding_work_ - // reaches zero, so we must call work_finished() here to balance it. - // On other backends this is harmless (their drain loops exit when - // the queue is empty, not based on outstanding_work_). + shutting_down_ = true; + + // Snapshot impls and detach them from the heap so that + // coroutine-owned timer destructors (triggered by h.destroy() + // below) cannot re-enter remove_timer_impl() and mutate the + // vector during iteration. + std::vector impls; + impls.reserve(heap_.size()); for (auto& entry : heap_) { - auto* impl = entry.timer_; + entry.timer_->heap_index_ = (std::numeric_limits::max)(); + impls.push_back(entry.timer_); + } + heap_.clear(); + cached_nearest_ns_.store( + (std::numeric_limits::max)(), std::memory_order_release); + + // Cancel waiting timers. Each waiter called work_started() + // in implementation::wait(). On IOCP the scheduler shutdown + // loop exits when outstanding_work_ reaches zero, so we must + // call work_finished() here to balance it. On other backends + // this is harmless. + for (auto* impl : impls) + { while (auto* w = impl->waiters_.pop_front()) { w->stop_cb_.reset(); @@ -382,12 +397,8 @@ timer_service::shutdown() h.destroy(); delete w; } - impl->heap_index_ = (std::numeric_limits::max)(); delete impl; } - heap_.clear(); - cached_nearest_ns_.store( - (std::numeric_limits::max)(), std::memory_order_release); // Delete free-listed impls while (free_list_) @@ -444,6 +455,13 @@ timer_service::destroy(io_object::implementation* p) inline void timer_service::destroy_impl(implementation& impl) { + // During shutdown the impl is owned by the shutdown loop. + // Re-entering here (from a coroutine-owned timer destructor + // triggered by h.destroy()) must not modify the heap or + // recycle the impl — shutdown deletes it directly. + if (shutting_down_) + return; + cancel_timer(impl); if (impl.heap_index_ != (std::numeric_limits::max)()) diff --git a/test/unit/timer.cpp b/test/unit/timer.cpp index cd6e06b26..fd805b00f 100644 --- a/test/unit/timer.cpp +++ b/test/unit/timer.cpp @@ -1038,6 +1038,58 @@ struct timer_test BOOST_TEST_PASS(); } + void testShutdownWithTimerOwnedByCoroutine() + { + // Reproduces UB: timer_service::shutdown() iterates heap_, + // calls h.destroy() on a waiter. The coroutine frame owns + // a timer whose destructor re-enters destroy_impl() → + // cancel_timer() → remove_timer_impl(), modifying heap_ + // during the range-for iteration. + // + // All timers must be owned by coroutines (not on the stack) + // so that their heap entries survive until shutdown(). + int destroyed = 0; + + { + io_context ioc(Backend); + + auto owning_task = [](io_context& ctx, + int& counter) -> capy::task<> { + struct guard + { + int& c_; + ~guard() + { + ++c_; + } + }; + guard g{counter}; + timer t(ctx); + t.expires_after(std::chrono::hours(1)); + auto [ec] = co_await t.wait(); + (void)ec; + }; + + auto stopper = [](io_context& ctx) -> capy::task<> { + ctx.stop(); + co_return; + }; + + capy::run_async(ioc.get_executor())(owning_task(ioc, destroyed)); + capy::run_async(ioc.get_executor())(owning_task(ioc, destroyed)); + capy::run_async(ioc.get_executor())(owning_task(ioc, destroyed)); + capy::run_async(ioc.get_executor())(stopper(ioc)); + + ioc.run(); + // ~io_context → timer_service::shutdown() iterates heap_ + // with 3 entries. Destroying the first coroutine handle + // triggers its timer destructor which removes an entry + // via remove_timer_impl(), corrupting the iteration. + } + + BOOST_TEST_EQ(destroyed, 3); + } + // Edge cases void testLongDuration() @@ -1156,6 +1208,7 @@ struct timer_test testShutdownDestroysTimerWaiters(); testShutdownDrainsHeapWaiters(); testAbruptStopWithPendingTimerOps(); + testShutdownWithTimerOwnedByCoroutine(); // Edge cases testLongDuration();