include/boost/corosio/native/detail/epoll/epoll_op.hpp

83.2% Lines (84/101) 85.0% Functions (17/20)
include/boost/corosio/native/detail/epoll/epoll_op.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/scheduler_op.hpp>
28 #include <boost/corosio/detail/endpoint_convert.hpp>
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket;
83 class epoll_acceptor;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state final : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Deferred cancellation: set by cancel() when the target op is not
125 // parked (e.g. completing inline via speculative I/O). Checked when
126 // the next op parks; if set, the op is immediately self-cancelled.
127 // This matches IOCP semantics where CancelIoEx always succeeds.
128 bool read_cancel_pending = false;
129 bool write_cancel_pending = false;
130 bool connect_cancel_pending = false;
131
132 // Set during registration only (no mutex needed)
133 std::uint32_t registered_events = 0;
134 int fd = -1;
135
136 // For deferred I/O - set by reactor, read by scheduler
137 std::atomic<std::uint32_t> ready_events_{0};
138 std::atomic<bool> is_enqueued_{false};
139 epoll_scheduler const* scheduler_ = nullptr;
140
141 // Prevents impl destruction while this descriptor_state is queued.
142 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
143 std::shared_ptr<void> impl_ref_;
144
145 /// Add ready events atomically.
146 42347 void add_ready_events(std::uint32_t ev) noexcept
147 {
148 42347 ready_events_.fetch_or(ev, std::memory_order_relaxed);
149 42347 }
150
151 /// Perform deferred I/O and queue completions.
152 void operator()() override;
153
154 /// Destroy without invoking.
155 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
156 /// the self-referential cycle set by close_socket().
157 29 void destroy() override
158 {
159 29 impl_ref_.reset();
160 29 }
161 };
162
163 struct epoll_op : scheduler_op
164 {
165 struct canceller
166 {
167 epoll_op* op;
168 void operator()() const noexcept;
169 };
170
171 std::coroutine_handle<> h;
172 capy::executor_ref ex;
173 std::error_code* ec_out = nullptr;
174 std::size_t* bytes_out = nullptr;
175
176 int fd = -1;
177 int errn = 0;
178 std::size_t bytes_transferred = 0;
179
180 std::atomic<bool> cancelled{false};
181 std::optional<std::stop_callback<canceller>> stop_cb;
182
183 // Prevents use-after-free when socket is closed with pending ops.
184 // See "Impl Lifetime Management" in file header.
185 std::shared_ptr<void> impl_ptr;
186
187 // For stop_token cancellation - pointer to owning socket/acceptor impl.
188 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
189 epoll_socket* socket_impl_ = nullptr;
190 epoll_acceptor* acceptor_impl_ = nullptr;
191
192 37300 epoll_op() = default;
193
194 261316 void reset() noexcept
195 {
196 261316 fd = -1;
197 261316 errn = 0;
198 261316 bytes_transferred = 0;
199 261316 cancelled.store(false, std::memory_order_relaxed);
200 261316 impl_ptr.reset();
201 261316 socket_impl_ = nullptr;
202 261316 acceptor_impl_ = nullptr;
203 261316 }
204
205 // Defined in sockets.cpp where epoll_socket is complete
206 void operator()() override;
207
208 25256 virtual bool is_read_operation() const noexcept
209 {
210 25256 return false;
211 }
212 virtual void cancel() noexcept = 0;
213
214 void destroy() override
215 {
216 stop_cb.reset();
217 impl_ptr.reset();
218 }
219
220 112518 void request_cancel() noexcept
221 {
222 112518 cancelled.store(true, std::memory_order_release);
223 112518 }
224
225 // NOLINTNEXTLINE(performance-unnecessary-value-param)
226 54819 void start(std::stop_token token, epoll_socket* impl)
227 {
228 54819 cancelled.store(false, std::memory_order_release);
229 54819 stop_cb.reset();
230 54819 socket_impl_ = impl;
231 54819 acceptor_impl_ = nullptr;
232
233 54819 if (token.stop_possible())
234 99 stop_cb.emplace(token, canceller{this});
235 54819 }
236
237 // NOLINTNEXTLINE(performance-unnecessary-value-param)
238 4127 void start(std::stop_token token, epoll_acceptor* impl)
239 {
240 4127 cancelled.store(false, std::memory_order_release);
241 4127 stop_cb.reset();
242 4127 socket_impl_ = nullptr;
243 4127 acceptor_impl_ = impl;
244
245 4127 if (token.stop_possible())
246 9 stop_cb.emplace(token, canceller{this});
247 4127 }
248
249 58882 void complete(int err, std::size_t bytes) noexcept
250 {
251 58882 errn = err;
252 58882 bytes_transferred = bytes;
253 58882 }
254
255 virtual void perform_io() noexcept {}
256 };
257
258 struct epoll_connect_op final : epoll_op
259 {
260 endpoint target_endpoint;
261
262 4119 void reset() noexcept
263 {
264 4119 epoll_op::reset();
265 4119 target_endpoint = endpoint{};
266 4119 }
267
268 4119 void perform_io() noexcept override
269 {
270 // connect() completion status is retrieved via SO_ERROR, not return value
271 4119 int err = 0;
272 4119 socklen_t len = sizeof(err);
273 4119 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
274 err = errno;
275 4119 complete(err, 0);
276 4119 }
277
278 // Defined in sockets.cpp where epoll_socket is complete
279 void operator()() override;
280 void cancel() noexcept override;
281 };
282
283 struct epoll_read_op final : epoll_op
284 {
285 static constexpr std::size_t max_buffers = 16;
286 iovec iovecs[max_buffers];
287 int iovec_count = 0;
288 bool empty_buffer_read = false;
289
290 25241 bool is_read_operation() const noexcept override
291 {
292 25241 return !empty_buffer_read;
293 }
294
295 126634 void reset() noexcept
296 {
297 126634 epoll_op::reset();
298 126634 iovec_count = 0;
299 126634 empty_buffer_read = false;
300 126634 }
301
302 144 void perform_io() noexcept override
303 {
304 ssize_t n;
305 do
306 {
307 144 n = ::readv(fd, iovecs, iovec_count);
308 }
309 144 while (n < 0 && errno == EINTR);
310
311 144 if (n >= 0)
312 4 complete(0, static_cast<std::size_t>(n));
313 else
314 140 complete(errno, 0);
315 144 }
316
317 void cancel() noexcept override;
318 };
319
320 struct epoll_write_op final : epoll_op
321 {
322 static constexpr std::size_t max_buffers = 16;
323 iovec iovecs[max_buffers];
324 int iovec_count = 0;
325
326 126436 void reset() noexcept
327 {
328 126436 epoll_op::reset();
329 126436 iovec_count = 0;
330 126436 }
331
332 void perform_io() noexcept override
333 {
334 msghdr msg{};
335 msg.msg_iov = iovecs;
336 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
337
338 ssize_t n;
339 do
340 {
341 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
342 }
343 while (n < 0 && errno == EINTR);
344
345 if (n >= 0)
346 complete(0, static_cast<std::size_t>(n));
347 else
348 complete(errno, 0);
349 }
350
351 void cancel() noexcept override;
352 };
353
354 struct epoll_accept_op final : epoll_op
355 {
356 int accepted_fd = -1;
357 io_object::implementation** impl_out = nullptr;
358 sockaddr_in peer_addr{};
359
360 4127 void reset() noexcept
361 {
362 4127 epoll_op::reset();
363 4127 accepted_fd = -1;
364 4127 impl_out = nullptr;
365 4127 peer_addr = {};
366 4127 }
367
368 4116 void perform_io() noexcept override
369 {
370 4116 socklen_t addrlen = sizeof(peer_addr);
371 int new_fd;
372 do
373 {
374 8232 new_fd = ::accept4(
375 4116 fd, reinterpret_cast<sockaddr*>(&peer_addr), &addrlen,
376 SOCK_NONBLOCK | SOCK_CLOEXEC);
377 }
378 4116 while (new_fd < 0 && errno == EINTR);
379
380 4116 if (new_fd >= 0)
381 {
382 4116 accepted_fd = new_fd;
383 4116 complete(0, 0);
384 }
385 else
386 {
387 complete(errno, 0);
388 }
389 4116 }
390
391 // Defined in acceptors.cpp where epoll_acceptor is complete
392 void operator()() override;
393 void cancel() noexcept override;
394 };
395
396 } // namespace boost::corosio::detail
397
398 #endif // BOOST_COROSIO_HAS_EPOLL
399
400 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
401