TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_socket.hpp>
22 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23 :
24 : #include <boost/corosio/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/detail/dispatch_coro.hpp>
26 : #include <boost/corosio/detail/make_err.hpp>
27 :
28 : #include <boost/corosio/detail/except.hpp>
29 :
30 : #include <boost/capy/buffers.hpp>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <netinet/tcp.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : #include <memory>
40 : #include <mutex>
41 : #include <unordered_map>
42 :
43 : /*
44 : select Socket Implementation
45 : ============================
46 :
47 : This mirrors the epoll_sockets design for behavioral consistency.
48 : Each I/O operation follows the same pattern:
49 : 1. Try the syscall immediately (non-blocking socket)
50 : 2. If it succeeds or fails with a real error, post to completion queue
51 : 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52 :
53 : Cancellation
54 : ------------
55 : See op.hpp for the completion/cancellation race handling via the
56 : `registered` atomic. cancel() must complete pending operations (post
57 : them with cancelled flag) so coroutines waiting on them can resume.
58 : close_socket() calls cancel() first to ensure this.
59 :
60 : Impl Lifetime with shared_ptr
61 : -----------------------------
62 : Socket impls use enable_shared_from_this. The service owns impls via
63 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 : removal. When a user calls close(), we call cancel() which posts pending
65 : ops to the scheduler.
66 :
67 : CRITICAL: The posted ops must keep the impl alive until they complete.
68 : Otherwise the scheduler would process a freed op (use-after-free). The
69 : cancel() method captures shared_from_this() into op.impl_ptr before
70 : posting. When the op completes, impl_ptr is cleared, allowing the impl
71 : to be destroyed if no other references exist.
72 :
73 : Service Ownership
74 : -----------------
75 : select_socket_service owns all socket impls. destroy() removes the
76 : shared_ptr from the map, but the impl may survive if ops still hold
77 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 : in-flight ops will complete and release their refs.
79 : */
80 :
81 : namespace boost::corosio::detail {
82 :
83 : /** State for select socket service. */
84 : class select_socket_state
85 : {
86 : public:
87 HIT 135 : explicit select_socket_state(select_scheduler& sched) noexcept
88 135 : : sched_(sched)
89 : {
90 135 : }
91 :
92 : select_scheduler& sched_;
93 : std::mutex mutex_;
94 : intrusive_list<select_socket> socket_list_;
95 : std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 : socket_ptrs_;
97 : };
98 :
99 : /** select socket service implementation.
100 :
101 : Inherits from socket_service to enable runtime polymorphism.
102 : Uses key_type = socket_service for service lookup.
103 : */
104 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 : {
106 : public:
107 : explicit select_socket_service(capy::execution_context& ctx);
108 : ~select_socket_service() override;
109 :
110 : select_socket_service(select_socket_service const&) = delete;
111 : select_socket_service& operator=(select_socket_service const&) = delete;
112 :
113 : void shutdown() override;
114 :
115 : io_object::implementation* construct() override;
116 : void destroy(io_object::implementation*) override;
117 : void close(io_object::handle&) override;
118 : std::error_code open_socket(tcp_socket::implementation& impl) override;
119 :
120 9854 : select_scheduler& scheduler() const noexcept
121 : {
122 9854 : return state_->sched_;
123 : }
124 : void post(select_op* op);
125 : void work_started() noexcept;
126 : void work_finished() noexcept;
127 :
128 : private:
129 : std::unique_ptr<select_socket_state> state_;
130 : };
131 :
132 : // Backward compatibility alias
133 : using select_sockets = select_socket_service;
134 :
135 : inline void
136 97 : select_op::canceller::operator()() const noexcept
137 : {
138 97 : op->cancel();
139 97 : }
140 :
141 : inline void
142 MIS 0 : select_connect_op::cancel() noexcept
143 : {
144 0 : if (socket_impl_)
145 0 : socket_impl_->cancel_single_op(*this);
146 : else
147 0 : request_cancel();
148 0 : }
149 :
150 : inline void
151 HIT 97 : select_read_op::cancel() noexcept
152 : {
153 97 : if (socket_impl_)
154 97 : socket_impl_->cancel_single_op(*this);
155 : else
156 MIS 0 : request_cancel();
157 HIT 97 : }
158 :
159 : inline void
160 MIS 0 : select_write_op::cancel() noexcept
161 : {
162 0 : if (socket_impl_)
163 0 : socket_impl_->cancel_single_op(*this);
164 : else
165 0 : request_cancel();
166 0 : }
167 :
168 : inline void
169 HIT 3137 : select_connect_op::operator()()
170 : {
171 3137 : stop_cb.reset();
172 :
173 3137 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
174 :
175 : // Cache endpoints on successful connect
176 3137 : if (success && socket_impl_)
177 : {
178 : // Query local endpoint via getsockname (may fail, but remote is always known)
179 3136 : endpoint local_ep;
180 3136 : sockaddr_in local_addr{};
181 3136 : socklen_t local_len = sizeof(local_addr);
182 3136 : if (::getsockname(
183 3136 : fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
184 3136 : local_ep = from_sockaddr_in(local_addr);
185 : // Always cache remote endpoint; local may be default if getsockname failed
186 3136 : static_cast<select_socket*>(socket_impl_)
187 3136 : ->set_endpoints(local_ep, target_endpoint);
188 : }
189 :
190 3137 : if (ec_out)
191 : {
192 3137 : if (cancelled.load(std::memory_order_acquire))
193 MIS 0 : *ec_out = capy::error::canceled;
194 HIT 3137 : else if (errn != 0)
195 1 : *ec_out = make_err(errn);
196 : else
197 3136 : *ec_out = {};
198 : }
199 :
200 3137 : if (bytes_out)
201 MIS 0 : *bytes_out = bytes_transferred;
202 :
203 : // Move to stack before destroying the frame
204 HIT 3137 : capy::executor_ref saved_ex(ex);
205 3137 : std::coroutine_handle<> saved_h(h);
206 3137 : impl_ptr.reset();
207 3137 : dispatch_coro(saved_ex, saved_h).resume();
208 3137 : }
209 :
210 9428 : inline select_socket::select_socket(select_socket_service& svc) noexcept
211 9428 : : svc_(svc)
212 : {
213 9428 : }
214 :
215 : inline std::coroutine_handle<>
216 3137 : select_socket::connect(
217 : std::coroutine_handle<> h,
218 : capy::executor_ref ex,
219 : endpoint ep,
220 : std::stop_token token,
221 : std::error_code* ec)
222 : {
223 3137 : auto& op = conn_;
224 3137 : op.reset();
225 3137 : op.h = h;
226 3137 : op.ex = ex;
227 3137 : op.ec_out = ec;
228 3137 : op.fd = fd_;
229 3137 : op.target_endpoint = ep; // Store target for endpoint caching
230 3137 : op.start(token, this);
231 :
232 3137 : sockaddr_in addr = detail::to_sockaddr_in(ep);
233 : int result =
234 3137 : ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
235 :
236 3137 : if (result == 0)
237 : {
238 : // Sync success - cache endpoints immediately
239 MIS 0 : sockaddr_in local_addr{};
240 0 : socklen_t local_len = sizeof(local_addr);
241 0 : if (::getsockname(
242 0 : fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
243 0 : local_endpoint_ = detail::from_sockaddr_in(local_addr);
244 0 : remote_endpoint_ = ep;
245 :
246 0 : op.complete(0, 0);
247 0 : op.impl_ptr = shared_from_this();
248 0 : svc_.post(&op);
249 : // completion is always posted to scheduler queue, never inline.
250 0 : return std::noop_coroutine();
251 : }
252 :
253 HIT 3137 : if (errno == EINPROGRESS)
254 : {
255 3137 : svc_.work_started();
256 3137 : op.impl_ptr = shared_from_this();
257 :
258 : // Set registering BEFORE register_fd to close the race window where
259 : // reactor sees an event before we set registered. The reactor treats
260 : // registering the same as registered when claiming the op.
261 3137 : op.registered.store(
262 : select_registration_state::registering, std::memory_order_release);
263 3137 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
264 :
265 : // Transition to registered. If this fails, reactor or cancel already
266 : // claimed the op (state is now unregistered), so we're done. However,
267 : // we must still deregister the fd because cancel's deregister_fd may
268 : // have run before our register_fd, leaving the fd orphaned.
269 3137 : auto expected = select_registration_state::registering;
270 3137 : if (!op.registered.compare_exchange_strong(
271 : expected, select_registration_state::registered,
272 : std::memory_order_acq_rel))
273 : {
274 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
275 : // completion is always posted to scheduler queue, never inline.
276 0 : return std::noop_coroutine();
277 : }
278 :
279 : // If cancelled was set before we registered, handle it now.
280 HIT 3137 : if (op.cancelled.load(std::memory_order_acquire))
281 : {
282 MIS 0 : auto prev = op.registered.exchange(
283 : select_registration_state::unregistered,
284 : std::memory_order_acq_rel);
285 0 : if (prev != select_registration_state::unregistered)
286 : {
287 0 : svc_.scheduler().deregister_fd(
288 : fd_, select_scheduler::event_write);
289 0 : op.impl_ptr = shared_from_this();
290 0 : svc_.post(&op);
291 0 : svc_.work_finished();
292 : }
293 : }
294 : // completion is always posted to scheduler queue, never inline.
295 HIT 3137 : return std::noop_coroutine();
296 : }
297 :
298 MIS 0 : op.complete(errno, 0);
299 0 : op.impl_ptr = shared_from_this();
300 0 : svc_.post(&op);
301 : // completion is always posted to scheduler queue, never inline.
302 0 : return std::noop_coroutine();
303 : }
304 :
305 : inline std::coroutine_handle<>
306 HIT 79154 : select_socket::read_some(
307 : std::coroutine_handle<> h,
308 : capy::executor_ref ex,
309 : io_buffer_param param,
310 : std::stop_token token,
311 : std::error_code* ec,
312 : std::size_t* bytes_out)
313 : {
314 79154 : auto& op = rd_;
315 79154 : op.reset();
316 79154 : op.h = h;
317 79154 : op.ex = ex;
318 79154 : op.ec_out = ec;
319 79154 : op.bytes_out = bytes_out;
320 79154 : op.fd = fd_;
321 79154 : op.start(token, this);
322 :
323 79154 : capy::mutable_buffer bufs[select_read_op::max_buffers];
324 79154 : op.iovec_count =
325 79154 : static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
326 :
327 79154 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
328 : {
329 1 : op.empty_buffer_read = true;
330 1 : op.complete(0, 0);
331 1 : op.impl_ptr = shared_from_this();
332 1 : svc_.post(&op);
333 1 : return std::noop_coroutine();
334 : }
335 :
336 158306 : for (int i = 0; i < op.iovec_count; ++i)
337 : {
338 79153 : op.iovecs[i].iov_base = bufs[i].data();
339 79153 : op.iovecs[i].iov_len = bufs[i].size();
340 : }
341 :
342 79153 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
343 :
344 79153 : if (n > 0)
345 : {
346 78871 : op.complete(0, static_cast<std::size_t>(n));
347 78871 : op.impl_ptr = shared_from_this();
348 78871 : svc_.post(&op);
349 78871 : return std::noop_coroutine();
350 : }
351 :
352 282 : if (n == 0)
353 : {
354 5 : op.complete(0, 0);
355 5 : op.impl_ptr = shared_from_this();
356 5 : svc_.post(&op);
357 5 : return std::noop_coroutine();
358 : }
359 :
360 277 : if (errno == EAGAIN || errno == EWOULDBLOCK)
361 : {
362 277 : svc_.work_started();
363 277 : op.impl_ptr = shared_from_this();
364 :
365 : // Set registering BEFORE register_fd to close the race window where
366 : // reactor sees an event before we set registered.
367 277 : op.registered.store(
368 : select_registration_state::registering, std::memory_order_release);
369 277 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
370 :
371 : // Transition to registered. If this fails, reactor or cancel already
372 : // claimed the op (state is now unregistered), so we're done. However,
373 : // we must still deregister the fd because cancel's deregister_fd may
374 : // have run before our register_fd, leaving the fd orphaned.
375 277 : auto expected = select_registration_state::registering;
376 277 : if (!op.registered.compare_exchange_strong(
377 : expected, select_registration_state::registered,
378 : std::memory_order_acq_rel))
379 : {
380 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
381 0 : return std::noop_coroutine();
382 : }
383 :
384 : // If cancelled was set before we registered, handle it now.
385 HIT 277 : if (op.cancelled.load(std::memory_order_acquire))
386 : {
387 MIS 0 : auto prev = op.registered.exchange(
388 : select_registration_state::unregistered,
389 : std::memory_order_acq_rel);
390 0 : if (prev != select_registration_state::unregistered)
391 : {
392 0 : svc_.scheduler().deregister_fd(
393 : fd_, select_scheduler::event_read);
394 0 : op.impl_ptr = shared_from_this();
395 0 : svc_.post(&op);
396 0 : svc_.work_finished();
397 : }
398 : }
399 HIT 277 : return std::noop_coroutine();
400 : }
401 :
402 MIS 0 : op.complete(errno, 0);
403 0 : op.impl_ptr = shared_from_this();
404 0 : svc_.post(&op);
405 0 : return std::noop_coroutine();
406 : }
407 :
408 : inline std::coroutine_handle<>
409 HIT 78994 : select_socket::write_some(
410 : std::coroutine_handle<> h,
411 : capy::executor_ref ex,
412 : io_buffer_param param,
413 : std::stop_token token,
414 : std::error_code* ec,
415 : std::size_t* bytes_out)
416 : {
417 78994 : auto& op = wr_;
418 78994 : op.reset();
419 78994 : op.h = h;
420 78994 : op.ex = ex;
421 78994 : op.ec_out = ec;
422 78994 : op.bytes_out = bytes_out;
423 78994 : op.fd = fd_;
424 78994 : op.start(token, this);
425 :
426 78994 : capy::mutable_buffer bufs[select_write_op::max_buffers];
427 78994 : op.iovec_count =
428 78994 : static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
429 :
430 78994 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
431 : {
432 1 : op.complete(0, 0);
433 1 : op.impl_ptr = shared_from_this();
434 1 : svc_.post(&op);
435 1 : return std::noop_coroutine();
436 : }
437 :
438 157986 : for (int i = 0; i < op.iovec_count; ++i)
439 : {
440 78993 : op.iovecs[i].iov_base = bufs[i].data();
441 78993 : op.iovecs[i].iov_len = bufs[i].size();
442 : }
443 :
444 78993 : msghdr msg{};
445 78993 : msg.msg_iov = op.iovecs;
446 78993 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
447 :
448 78993 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
449 :
450 78993 : if (n > 0)
451 : {
452 78992 : op.complete(0, static_cast<std::size_t>(n));
453 78992 : op.impl_ptr = shared_from_this();
454 78992 : svc_.post(&op);
455 78992 : return std::noop_coroutine();
456 : }
457 :
458 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
459 : {
460 MIS 0 : svc_.work_started();
461 0 : op.impl_ptr = shared_from_this();
462 :
463 : // Set registering BEFORE register_fd to close the race window where
464 : // reactor sees an event before we set registered.
465 0 : op.registered.store(
466 : select_registration_state::registering, std::memory_order_release);
467 0 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
468 :
469 : // Transition to registered. If this fails, reactor or cancel already
470 : // claimed the op (state is now unregistered), so we're done. However,
471 : // we must still deregister the fd because cancel's deregister_fd may
472 : // have run before our register_fd, leaving the fd orphaned.
473 0 : auto expected = select_registration_state::registering;
474 0 : if (!op.registered.compare_exchange_strong(
475 : expected, select_registration_state::registered,
476 : std::memory_order_acq_rel))
477 : {
478 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
479 0 : return std::noop_coroutine();
480 : }
481 :
482 : // If cancelled was set before we registered, handle it now.
483 0 : if (op.cancelled.load(std::memory_order_acquire))
484 : {
485 0 : auto prev = op.registered.exchange(
486 : select_registration_state::unregistered,
487 : std::memory_order_acq_rel);
488 0 : if (prev != select_registration_state::unregistered)
489 : {
490 0 : svc_.scheduler().deregister_fd(
491 : fd_, select_scheduler::event_write);
492 0 : op.impl_ptr = shared_from_this();
493 0 : svc_.post(&op);
494 0 : svc_.work_finished();
495 : }
496 : }
497 0 : return std::noop_coroutine();
498 : }
499 :
500 HIT 1 : op.complete(errno ? errno : EIO, 0);
501 1 : op.impl_ptr = shared_from_this();
502 1 : svc_.post(&op);
503 1 : return std::noop_coroutine();
504 : }
505 :
506 : inline std::error_code
507 3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
508 : {
509 : int how;
510 3 : switch (what)
511 : {
512 1 : case tcp_socket::shutdown_receive:
513 1 : how = SHUT_RD;
514 1 : break;
515 1 : case tcp_socket::shutdown_send:
516 1 : how = SHUT_WR;
517 1 : break;
518 1 : case tcp_socket::shutdown_both:
519 1 : how = SHUT_RDWR;
520 1 : break;
521 MIS 0 : default:
522 0 : return make_err(EINVAL);
523 : }
524 HIT 3 : if (::shutdown(fd_, how) != 0)
525 MIS 0 : return make_err(errno);
526 HIT 3 : return {};
527 : }
528 :
529 : inline std::error_code
530 5 : select_socket::set_no_delay(bool value) noexcept
531 : {
532 5 : int flag = value ? 1 : 0;
533 5 : if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
534 MIS 0 : return make_err(errno);
535 HIT 5 : return {};
536 : }
537 :
538 : inline bool
539 5 : select_socket::no_delay(std::error_code& ec) const noexcept
540 : {
541 5 : int flag = 0;
542 5 : socklen_t len = sizeof(flag);
543 5 : if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
544 : {
545 MIS 0 : ec = make_err(errno);
546 0 : return false;
547 : }
548 HIT 5 : ec = {};
549 5 : return flag != 0;
550 : }
551 :
552 : inline std::error_code
553 4 : select_socket::set_keep_alive(bool value) noexcept
554 : {
555 4 : int flag = value ? 1 : 0;
556 4 : if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
557 MIS 0 : return make_err(errno);
558 HIT 4 : return {};
559 : }
560 :
561 : inline bool
562 4 : select_socket::keep_alive(std::error_code& ec) const noexcept
563 : {
564 4 : int flag = 0;
565 4 : socklen_t len = sizeof(flag);
566 4 : if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
567 : {
568 MIS 0 : ec = make_err(errno);
569 0 : return false;
570 : }
571 HIT 4 : ec = {};
572 4 : return flag != 0;
573 : }
574 :
575 : inline std::error_code
576 1 : select_socket::set_receive_buffer_size(int size) noexcept
577 : {
578 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
579 MIS 0 : return make_err(errno);
580 HIT 1 : return {};
581 : }
582 :
583 : inline int
584 3 : select_socket::receive_buffer_size(std::error_code& ec) const noexcept
585 : {
586 3 : int size = 0;
587 3 : socklen_t len = sizeof(size);
588 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
589 : {
590 MIS 0 : ec = make_err(errno);
591 0 : return 0;
592 : }
593 HIT 3 : ec = {};
594 3 : return size;
595 : }
596 :
597 : inline std::error_code
598 1 : select_socket::set_send_buffer_size(int size) noexcept
599 : {
600 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
601 MIS 0 : return make_err(errno);
602 HIT 1 : return {};
603 : }
604 :
605 : inline int
606 3 : select_socket::send_buffer_size(std::error_code& ec) const noexcept
607 : {
608 3 : int size = 0;
609 3 : socklen_t len = sizeof(size);
610 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
611 : {
612 MIS 0 : ec = make_err(errno);
613 0 : return 0;
614 : }
615 HIT 3 : ec = {};
616 3 : return size;
617 : }
618 :
619 : inline std::error_code
620 6 : select_socket::set_linger(bool enabled, int timeout) noexcept
621 : {
622 6 : if (timeout < 0)
623 1 : return make_err(EINVAL);
624 : struct ::linger lg;
625 5 : lg.l_onoff = enabled ? 1 : 0;
626 5 : lg.l_linger = timeout;
627 5 : if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
628 MIS 0 : return make_err(errno);
629 HIT 5 : return {};
630 : }
631 :
632 : inline tcp_socket::linger_options
633 3 : select_socket::linger(std::error_code& ec) const noexcept
634 : {
635 3 : struct ::linger lg{};
636 3 : socklen_t len = sizeof(lg);
637 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
638 : {
639 MIS 0 : ec = make_err(errno);
640 0 : return {};
641 : }
642 HIT 3 : ec = {};
643 3 : return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
644 : }
645 :
646 : inline void
647 173 : select_socket::cancel() noexcept
648 : {
649 173 : auto self = weak_from_this().lock();
650 173 : if (!self)
651 MIS 0 : return;
652 :
653 HIT 519 : auto cancel_op = [this, &self](select_op& op, int events) {
654 519 : auto prev = op.registered.exchange(
655 : select_registration_state::unregistered, std::memory_order_acq_rel);
656 519 : op.request_cancel();
657 519 : if (prev != select_registration_state::unregistered)
658 : {
659 90 : svc_.scheduler().deregister_fd(fd_, events);
660 90 : op.impl_ptr = self;
661 90 : svc_.post(&op);
662 90 : svc_.work_finished();
663 : }
664 692 : };
665 :
666 173 : cancel_op(conn_, select_scheduler::event_write);
667 173 : cancel_op(rd_, select_scheduler::event_read);
668 173 : cancel_op(wr_, select_scheduler::event_write);
669 173 : }
670 :
671 : inline void
672 97 : select_socket::cancel_single_op(select_op& op) noexcept
673 : {
674 97 : auto self = weak_from_this().lock();
675 97 : if (!self)
676 MIS 0 : return;
677 :
678 : // Called from stop_token callback to cancel a specific pending operation.
679 HIT 97 : auto prev = op.registered.exchange(
680 : select_registration_state::unregistered, std::memory_order_acq_rel);
681 97 : op.request_cancel();
682 :
683 97 : if (prev != select_registration_state::unregistered)
684 : {
685 : // Determine which event type to deregister
686 65 : int events = 0;
687 65 : if (&op == &conn_ || &op == &wr_)
688 MIS 0 : events = select_scheduler::event_write;
689 HIT 65 : else if (&op == &rd_)
690 65 : events = select_scheduler::event_read;
691 :
692 65 : svc_.scheduler().deregister_fd(fd_, events);
693 :
694 65 : op.impl_ptr = self;
695 65 : svc_.post(&op);
696 65 : svc_.work_finished();
697 : }
698 97 : }
699 :
700 : inline void
701 28288 : select_socket::close_socket() noexcept
702 : {
703 28288 : auto self = weak_from_this().lock();
704 28288 : if (self)
705 : {
706 84864 : auto cancel_op = [this, &self](select_op& op, int events) {
707 84864 : auto prev = op.registered.exchange(
708 : select_registration_state::unregistered,
709 : std::memory_order_acq_rel);
710 84864 : op.request_cancel();
711 84864 : if (prev != select_registration_state::unregistered)
712 : {
713 1 : svc_.scheduler().deregister_fd(fd_, events);
714 1 : op.impl_ptr = self;
715 1 : svc_.post(&op);
716 1 : svc_.work_finished();
717 : }
718 113152 : };
719 :
720 28288 : cancel_op(conn_, select_scheduler::event_write);
721 28288 : cancel_op(rd_, select_scheduler::event_read);
722 28288 : cancel_op(wr_, select_scheduler::event_write);
723 : }
724 :
725 28288 : if (fd_ >= 0)
726 : {
727 6284 : svc_.scheduler().deregister_fd(
728 : fd_, select_scheduler::event_read | select_scheduler::event_write);
729 6284 : ::close(fd_);
730 6284 : fd_ = -1;
731 : }
732 :
733 28288 : local_endpoint_ = endpoint{};
734 28288 : remote_endpoint_ = endpoint{};
735 28288 : }
736 :
737 135 : inline select_socket_service::select_socket_service(
738 135 : capy::execution_context& ctx)
739 135 : : state_(
740 : std::make_unique<select_socket_state>(
741 135 : ctx.use_service<select_scheduler>()))
742 : {
743 135 : }
744 :
745 270 : inline select_socket_service::~select_socket_service() {}
746 :
747 : inline void
748 135 : select_socket_service::shutdown()
749 : {
750 135 : std::lock_guard lock(state_->mutex_);
751 :
752 135 : while (auto* impl = state_->socket_list_.pop_front())
753 MIS 0 : impl->close_socket();
754 :
755 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
756 : // drains completed_ops_, calling destroy() on each queued op. Letting
757 : // ~state_ release the ptrs (during service destruction, after scheduler
758 : // shutdown) keeps every impl alive until all ops have been drained.
759 HIT 135 : }
760 :
761 : inline io_object::implementation*
762 9428 : select_socket_service::construct()
763 : {
764 9428 : auto impl = std::make_shared<select_socket>(*this);
765 9428 : auto* raw = impl.get();
766 :
767 : {
768 9428 : std::lock_guard lock(state_->mutex_);
769 9428 : state_->socket_list_.push_back(raw);
770 9428 : state_->socket_ptrs_.emplace(raw, std::move(impl));
771 9428 : }
772 :
773 9428 : return raw;
774 9428 : }
775 :
776 : inline void
777 9428 : select_socket_service::destroy(io_object::implementation* impl)
778 : {
779 9428 : auto* select_impl = static_cast<select_socket*>(impl);
780 9428 : select_impl->close_socket();
781 9428 : std::lock_guard lock(state_->mutex_);
782 9428 : state_->socket_list_.remove(select_impl);
783 9428 : state_->socket_ptrs_.erase(select_impl);
784 9428 : }
785 :
786 : inline std::error_code
787 3148 : select_socket_service::open_socket(tcp_socket::implementation& impl)
788 : {
789 3148 : auto* select_impl = static_cast<select_socket*>(&impl);
790 3148 : select_impl->close_socket();
791 :
792 3148 : int fd = ::socket(AF_INET, SOCK_STREAM, 0);
793 3148 : if (fd < 0)
794 MIS 0 : return make_err(errno);
795 :
796 : // Set non-blocking and close-on-exec
797 HIT 3148 : int flags = ::fcntl(fd, F_GETFL, 0);
798 3148 : if (flags == -1)
799 : {
800 MIS 0 : int errn = errno;
801 0 : ::close(fd);
802 0 : return make_err(errn);
803 : }
804 HIT 3148 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
805 : {
806 MIS 0 : int errn = errno;
807 0 : ::close(fd);
808 0 : return make_err(errn);
809 : }
810 HIT 3148 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
811 : {
812 MIS 0 : int errn = errno;
813 0 : ::close(fd);
814 0 : return make_err(errn);
815 : }
816 :
817 : // Check fd is within select() limits
818 HIT 3148 : if (fd >= FD_SETSIZE)
819 : {
820 MIS 0 : ::close(fd);
821 0 : return make_err(EMFILE); // Too many open files
822 : }
823 :
824 HIT 3148 : select_impl->fd_ = fd;
825 3148 : return {};
826 : }
827 :
828 : inline void
829 15712 : select_socket_service::close(io_object::handle& h)
830 : {
831 15712 : static_cast<select_socket*>(h.get())->close_socket();
832 15712 : }
833 :
834 : inline void
835 158027 : select_socket_service::post(select_op* op)
836 : {
837 158027 : state_->sched_.post(op);
838 158027 : }
839 :
840 : inline void
841 3414 : select_socket_service::work_started() noexcept
842 : {
843 3414 : state_->sched_.work_started();
844 3414 : }
845 :
846 : inline void
847 156 : select_socket_service::work_finished() noexcept
848 : {
849 156 : state_->sched_.work_finished();
850 156 : }
851 :
852 : } // namespace boost::corosio::detail
853 :
854 : #endif // BOOST_COROSIO_HAS_SELECT
855 :
856 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
|