TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_socket_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 :
25 : #include <boost/corosio/detail/endpoint_convert.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/make_err.hpp>
28 :
29 : #include <errno.h>
30 : #include <fcntl.h>
31 : #include <netinet/in.h>
32 : #include <sys/socket.h>
33 : #include <unistd.h>
34 :
35 : #include <memory>
36 : #include <mutex>
37 : #include <unordered_map>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /** State for select acceptor service. */
42 : class select_acceptor_state
43 : {
44 : public:
45 HIT 135 : explicit select_acceptor_state(select_scheduler& sched) noexcept
46 135 : : sched_(sched)
47 : {
48 135 : }
49 :
50 : select_scheduler& sched_;
51 : std::mutex mutex_;
52 : intrusive_list<select_acceptor> acceptor_list_;
53 : std::unordered_map<select_acceptor*, std::shared_ptr<select_acceptor>>
54 : acceptor_ptrs_;
55 : };
56 :
57 : /** select acceptor service implementation.
58 :
59 : Inherits from acceptor_service to enable runtime polymorphism.
60 : Uses key_type = acceptor_service for service lookup.
61 : */
62 : class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
63 : {
64 : public:
65 : explicit select_acceptor_service(capy::execution_context& ctx);
66 : ~select_acceptor_service() override;
67 :
68 : select_acceptor_service(select_acceptor_service const&) = delete;
69 : select_acceptor_service& operator=(select_acceptor_service const&) = delete;
70 :
71 : void shutdown() override;
72 :
73 : io_object::implementation* construct() override;
74 : void destroy(io_object::implementation*) override;
75 : void close(io_object::handle&) override;
76 : std::error_code open_acceptor(
77 : tcp_acceptor::implementation& impl, endpoint ep, int backlog) override;
78 :
79 3184 : select_scheduler& scheduler() const noexcept
80 : {
81 3184 : return state_->sched_;
82 : }
83 : void post(select_op* op);
84 : void work_started() noexcept;
85 : void work_finished() noexcept;
86 :
87 : /** Get the socket service for creating peer sockets during accept. */
88 : select_socket_service* socket_service() const noexcept;
89 :
90 : private:
91 : capy::execution_context& ctx_;
92 : std::unique_ptr<select_acceptor_state> state_;
93 : };
94 :
95 : inline void
96 MIS 0 : select_accept_op::cancel() noexcept
97 : {
98 0 : if (acceptor_impl_)
99 0 : acceptor_impl_->cancel_single_op(*this);
100 : else
101 0 : request_cancel();
102 0 : }
103 :
104 : inline void
105 HIT 3139 : select_accept_op::operator()()
106 : {
107 3139 : stop_cb.reset();
108 :
109 3139 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
110 :
111 3139 : if (ec_out)
112 : {
113 3139 : if (cancelled.load(std::memory_order_acquire))
114 3 : *ec_out = capy::error::canceled;
115 3136 : else if (errn != 0)
116 MIS 0 : *ec_out = make_err(errn);
117 : else
118 HIT 3136 : *ec_out = {};
119 : }
120 :
121 3139 : if (success && accepted_fd >= 0)
122 : {
123 3136 : if (acceptor_impl_)
124 : {
125 3136 : auto* socket_svc = static_cast<select_acceptor*>(acceptor_impl_)
126 3136 : ->service()
127 3136 : .socket_service();
128 3136 : if (socket_svc)
129 : {
130 : auto& impl =
131 3136 : static_cast<select_socket&>(*socket_svc->construct());
132 3136 : impl.set_socket(accepted_fd);
133 :
134 3136 : sockaddr_in local_addr{};
135 3136 : socklen_t local_len = sizeof(local_addr);
136 3136 : sockaddr_in remote_addr{};
137 3136 : socklen_t remote_len = sizeof(remote_addr);
138 :
139 3136 : endpoint local_ep, remote_ep;
140 3136 : if (::getsockname(
141 : accepted_fd, reinterpret_cast<sockaddr*>(&local_addr),
142 3136 : &local_len) == 0)
143 3136 : local_ep = from_sockaddr_in(local_addr);
144 3136 : if (::getpeername(
145 : accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr),
146 3136 : &remote_len) == 0)
147 3136 : remote_ep = from_sockaddr_in(remote_addr);
148 :
149 3136 : impl.set_endpoints(local_ep, remote_ep);
150 :
151 3136 : if (impl_out)
152 3136 : *impl_out = &impl;
153 :
154 3136 : accepted_fd = -1;
155 : }
156 : else
157 : {
158 MIS 0 : if (ec_out && !*ec_out)
159 0 : *ec_out = make_err(ENOENT);
160 0 : ::close(accepted_fd);
161 0 : accepted_fd = -1;
162 0 : if (impl_out)
163 0 : *impl_out = nullptr;
164 : }
165 : }
166 : else
167 : {
168 0 : ::close(accepted_fd);
169 0 : accepted_fd = -1;
170 0 : if (impl_out)
171 0 : *impl_out = nullptr;
172 : }
173 HIT 3136 : }
174 : else
175 : {
176 3 : if (accepted_fd >= 0)
177 : {
178 MIS 0 : ::close(accepted_fd);
179 0 : accepted_fd = -1;
180 : }
181 :
182 HIT 3 : if (peer_impl)
183 : {
184 : auto* socket_svc_cleanup =
185 MIS 0 : static_cast<select_acceptor*>(acceptor_impl_)
186 0 : ->service()
187 0 : .socket_service();
188 0 : if (socket_svc_cleanup)
189 0 : socket_svc_cleanup->destroy(peer_impl);
190 0 : peer_impl = nullptr;
191 : }
192 :
193 HIT 3 : if (impl_out)
194 3 : *impl_out = nullptr;
195 : }
196 :
197 : // Move to stack before destroying the frame
198 3139 : capy::executor_ref saved_ex(ex);
199 3139 : std::coroutine_handle<> saved_h(h);
200 3139 : impl_ptr.reset();
201 3139 : dispatch_coro(saved_ex, saved_h).resume();
202 3139 : }
203 :
204 46 : inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
205 46 : : svc_(svc)
206 : {
207 46 : }
208 :
209 : inline std::coroutine_handle<>
210 3139 : select_acceptor::accept(
211 : std::coroutine_handle<> h,
212 : capy::executor_ref ex,
213 : std::stop_token token,
214 : std::error_code* ec,
215 : io_object::implementation** impl_out)
216 : {
217 3139 : auto& op = acc_;
218 3139 : op.reset();
219 3139 : op.h = h;
220 3139 : op.ex = ex;
221 3139 : op.ec_out = ec;
222 3139 : op.impl_out = impl_out;
223 3139 : op.fd = fd_;
224 3139 : op.start(token, this);
225 :
226 3139 : sockaddr_in addr{};
227 3139 : socklen_t addrlen = sizeof(addr);
228 3139 : int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
229 :
230 3139 : if (accepted >= 0)
231 : {
232 : // Reject fds that exceed select()'s FD_SETSIZE limit.
233 2 : if (accepted >= FD_SETSIZE)
234 : {
235 MIS 0 : ::close(accepted);
236 0 : op.accepted_fd = -1;
237 0 : op.complete(EINVAL, 0);
238 0 : op.impl_ptr = shared_from_this();
239 0 : svc_.post(&op);
240 0 : return std::noop_coroutine();
241 : }
242 :
243 : // Set non-blocking and close-on-exec flags.
244 HIT 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
245 2 : if (flags == -1)
246 : {
247 MIS 0 : int err = errno;
248 0 : ::close(accepted);
249 0 : op.accepted_fd = -1;
250 0 : op.complete(err, 0);
251 0 : op.impl_ptr = shared_from_this();
252 0 : svc_.post(&op);
253 0 : return std::noop_coroutine();
254 : }
255 :
256 HIT 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
257 : {
258 MIS 0 : int err = errno;
259 0 : ::close(accepted);
260 0 : op.accepted_fd = -1;
261 0 : op.complete(err, 0);
262 0 : op.impl_ptr = shared_from_this();
263 0 : svc_.post(&op);
264 0 : return std::noop_coroutine();
265 : }
266 :
267 HIT 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
268 : {
269 MIS 0 : int err = errno;
270 0 : ::close(accepted);
271 0 : op.accepted_fd = -1;
272 0 : op.complete(err, 0);
273 0 : op.impl_ptr = shared_from_this();
274 0 : svc_.post(&op);
275 0 : return std::noop_coroutine();
276 : }
277 :
278 HIT 2 : op.accepted_fd = accepted;
279 2 : op.complete(0, 0);
280 2 : op.impl_ptr = shared_from_this();
281 2 : svc_.post(&op);
282 2 : return std::noop_coroutine();
283 : }
284 :
285 3137 : if (errno == EAGAIN || errno == EWOULDBLOCK)
286 : {
287 3137 : svc_.work_started();
288 3137 : op.impl_ptr = shared_from_this();
289 :
290 : // Set registering BEFORE register_fd to close the race window where
291 : // reactor sees an event before we set registered.
292 3137 : op.registered.store(
293 : select_registration_state::registering, std::memory_order_release);
294 3137 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
295 :
296 : // Transition to registered. If this fails, reactor or cancel already
297 : // claimed the op (state is now unregistered), so we're done. However,
298 : // we must still deregister the fd because cancel's deregister_fd may
299 : // have run before our register_fd, leaving the fd orphaned.
300 3137 : auto expected = select_registration_state::registering;
301 3137 : if (!op.registered.compare_exchange_strong(
302 : expected, select_registration_state::registered,
303 : std::memory_order_acq_rel))
304 : {
305 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
306 0 : return std::noop_coroutine();
307 : }
308 :
309 : // If cancelled was set before we registered, handle it now.
310 HIT 3137 : if (op.cancelled.load(std::memory_order_acquire))
311 : {
312 MIS 0 : auto prev = op.registered.exchange(
313 : select_registration_state::unregistered,
314 : std::memory_order_acq_rel);
315 0 : if (prev != select_registration_state::unregistered)
316 : {
317 0 : svc_.scheduler().deregister_fd(
318 : fd_, select_scheduler::event_read);
319 0 : op.impl_ptr = shared_from_this();
320 0 : svc_.post(&op);
321 0 : svc_.work_finished();
322 : }
323 : }
324 HIT 3137 : return std::noop_coroutine();
325 : }
326 :
327 MIS 0 : op.complete(errno, 0);
328 0 : op.impl_ptr = shared_from_this();
329 0 : svc_.post(&op);
330 0 : return std::noop_coroutine();
331 : }
332 :
333 : inline void
334 HIT 1 : select_acceptor::cancel() noexcept
335 : {
336 1 : auto self = weak_from_this().lock();
337 1 : if (!self)
338 MIS 0 : return;
339 :
340 HIT 1 : auto prev = acc_.registered.exchange(
341 : select_registration_state::unregistered, std::memory_order_acq_rel);
342 1 : acc_.request_cancel();
343 :
344 1 : if (prev != select_registration_state::unregistered)
345 : {
346 1 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
347 1 : acc_.impl_ptr = self;
348 1 : svc_.post(&acc_);
349 1 : svc_.work_finished();
350 : }
351 1 : }
352 :
353 : inline void
354 MIS 0 : select_acceptor::cancel_single_op(select_op& op) noexcept
355 : {
356 0 : auto self = weak_from_this().lock();
357 0 : if (!self)
358 0 : return;
359 :
360 0 : auto prev = op.registered.exchange(
361 : select_registration_state::unregistered, std::memory_order_acq_rel);
362 0 : op.request_cancel();
363 :
364 0 : if (prev != select_registration_state::unregistered)
365 : {
366 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
367 :
368 0 : op.impl_ptr = self;
369 0 : svc_.post(&op);
370 0 : svc_.work_finished();
371 : }
372 0 : }
373 :
374 : inline void
375 HIT 180 : select_acceptor::close_socket() noexcept
376 : {
377 180 : auto self = weak_from_this().lock();
378 180 : if (self)
379 : {
380 180 : auto prev = acc_.registered.exchange(
381 : select_registration_state::unregistered, std::memory_order_acq_rel);
382 180 : acc_.request_cancel();
383 :
384 180 : if (prev != select_registration_state::unregistered)
385 : {
386 2 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
387 2 : acc_.impl_ptr = self;
388 2 : svc_.post(&acc_);
389 2 : svc_.work_finished();
390 : }
391 : }
392 :
393 180 : if (fd_ >= 0)
394 : {
395 44 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
396 44 : ::close(fd_);
397 44 : fd_ = -1;
398 : }
399 :
400 180 : local_endpoint_ = endpoint{};
401 180 : }
402 :
403 135 : inline select_acceptor_service::select_acceptor_service(
404 135 : capy::execution_context& ctx)
405 135 : : ctx_(ctx)
406 135 : , state_(
407 : std::make_unique<select_acceptor_state>(
408 135 : ctx.use_service<select_scheduler>()))
409 : {
410 135 : }
411 :
412 270 : inline select_acceptor_service::~select_acceptor_service() {}
413 :
414 : inline void
415 135 : select_acceptor_service::shutdown()
416 : {
417 135 : std::lock_guard lock(state_->mutex_);
418 :
419 135 : while (auto* impl = state_->acceptor_list_.pop_front())
420 MIS 0 : impl->close_socket();
421 :
422 : // Don't clear acceptor_ptrs_ here — same rationale as
423 : // select_socket_service::shutdown(). Let ~state_ release ptrs
424 : // after scheduler shutdown has drained all queued ops.
425 HIT 135 : }
426 :
427 : inline io_object::implementation*
428 46 : select_acceptor_service::construct()
429 : {
430 46 : auto impl = std::make_shared<select_acceptor>(*this);
431 46 : auto* raw = impl.get();
432 :
433 46 : std::lock_guard lock(state_->mutex_);
434 46 : state_->acceptor_list_.push_back(raw);
435 46 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
436 :
437 46 : return raw;
438 46 : }
439 :
440 : inline void
441 46 : select_acceptor_service::destroy(io_object::implementation* impl)
442 : {
443 46 : auto* select_impl = static_cast<select_acceptor*>(impl);
444 46 : select_impl->close_socket();
445 46 : std::lock_guard lock(state_->mutex_);
446 46 : state_->acceptor_list_.remove(select_impl);
447 46 : state_->acceptor_ptrs_.erase(select_impl);
448 46 : }
449 :
450 : inline void
451 90 : select_acceptor_service::close(io_object::handle& h)
452 : {
453 90 : static_cast<select_acceptor*>(h.get())->close_socket();
454 90 : }
455 :
456 : inline std::error_code
457 44 : select_acceptor_service::open_acceptor(
458 : tcp_acceptor::implementation& impl, endpoint ep, int backlog)
459 : {
460 44 : auto* select_impl = static_cast<select_acceptor*>(&impl);
461 44 : select_impl->close_socket();
462 :
463 44 : int fd = ::socket(AF_INET, SOCK_STREAM, 0);
464 44 : if (fd < 0)
465 MIS 0 : return make_err(errno);
466 :
467 : // Set non-blocking and close-on-exec
468 HIT 44 : int flags = ::fcntl(fd, F_GETFL, 0);
469 44 : if (flags == -1)
470 : {
471 MIS 0 : int errn = errno;
472 0 : ::close(fd);
473 0 : return make_err(errn);
474 : }
475 HIT 44 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
476 : {
477 MIS 0 : int errn = errno;
478 0 : ::close(fd);
479 0 : return make_err(errn);
480 : }
481 HIT 44 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
482 : {
483 MIS 0 : int errn = errno;
484 0 : ::close(fd);
485 0 : return make_err(errn);
486 : }
487 :
488 : // Check fd is within select() limits
489 HIT 44 : if (fd >= FD_SETSIZE)
490 : {
491 MIS 0 : ::close(fd);
492 0 : return make_err(EMFILE);
493 : }
494 :
495 HIT 44 : int reuse = 1;
496 44 : ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
497 :
498 44 : sockaddr_in addr = detail::to_sockaddr_in(ep);
499 44 : if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
500 : {
501 MIS 0 : int errn = errno;
502 0 : ::close(fd);
503 0 : return make_err(errn);
504 : }
505 :
506 HIT 44 : if (::listen(fd, backlog) < 0)
507 : {
508 MIS 0 : int errn = errno;
509 0 : ::close(fd);
510 0 : return make_err(errn);
511 : }
512 :
513 HIT 44 : select_impl->fd_ = fd;
514 :
515 : // Cache the local endpoint (queries OS for ephemeral port if port was 0)
516 44 : sockaddr_in local_addr{};
517 44 : socklen_t local_len = sizeof(local_addr);
518 44 : if (::getsockname(
519 44 : fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
520 44 : select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
521 :
522 44 : return {};
523 : }
524 :
525 : inline void
526 5 : select_acceptor_service::post(select_op* op)
527 : {
528 5 : state_->sched_.post(op);
529 5 : }
530 :
531 : inline void
532 3137 : select_acceptor_service::work_started() noexcept
533 : {
534 3137 : state_->sched_.work_started();
535 3137 : }
536 :
537 : inline void
538 3 : select_acceptor_service::work_finished() noexcept
539 : {
540 3 : state_->sched_.work_finished();
541 3 : }
542 :
543 : inline select_socket_service*
544 3136 : select_acceptor_service::socket_service() const noexcept
545 : {
546 3136 : auto* svc = ctx_.find_service<detail::socket_service>();
547 3136 : return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
548 : }
549 :
550 : } // namespace boost::corosio::detail
551 :
552 : #endif // BOOST_COROSIO_HAS_SELECT
553 :
554 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
|