LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 64.3 % 266 171 95
Test Date: 2026-02-18 18:41:52 Functions: 90.5 % 21 19 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_socket_service.hpp>
      23                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      24                 : 
      25                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/make_err.hpp>
      28                 : 
      29                 : #include <errno.h>
      30                 : #include <fcntl.h>
      31                 : #include <netinet/in.h>
      32                 : #include <sys/socket.h>
      33                 : #include <unistd.h>
      34                 : 
      35                 : #include <memory>
      36                 : #include <mutex>
      37                 : #include <unordered_map>
      38                 : 
      39                 : namespace boost::corosio::detail {
      40                 : 
      41                 : /** State for select acceptor service. */
      42                 : class select_acceptor_state
      43                 : {
      44                 : public:
      45 HIT         135 :     explicit select_acceptor_state(select_scheduler& sched) noexcept
      46             135 :         : sched_(sched)
      47                 :     {
      48             135 :     }
      49                 : 
      50                 :     select_scheduler& sched_;
      51                 :     std::mutex mutex_;
      52                 :     intrusive_list<select_acceptor> acceptor_list_;
      53                 :     std::unordered_map<select_acceptor*, std::shared_ptr<select_acceptor>>
      54                 :         acceptor_ptrs_;
      55                 : };
      56                 : 
      57                 : /** select acceptor service implementation.
      58                 : 
      59                 :     Inherits from acceptor_service to enable runtime polymorphism.
      60                 :     Uses key_type = acceptor_service for service lookup.
      61                 : */
      62                 : class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
      63                 : {
      64                 : public:
      65                 :     explicit select_acceptor_service(capy::execution_context& ctx);
      66                 :     ~select_acceptor_service() override;
      67                 : 
      68                 :     select_acceptor_service(select_acceptor_service const&)            = delete;
      69                 :     select_acceptor_service& operator=(select_acceptor_service const&) = delete;
      70                 : 
      71                 :     void shutdown() override;
      72                 : 
      73                 :     io_object::implementation* construct() override;
      74                 :     void destroy(io_object::implementation*) override;
      75                 :     void close(io_object::handle&) override;
      76                 :     std::error_code open_acceptor(
      77                 :         tcp_acceptor::implementation& impl, endpoint ep, int backlog) override;
      78                 : 
      79            3184 :     select_scheduler& scheduler() const noexcept
      80                 :     {
      81            3184 :         return state_->sched_;
      82                 :     }
      83                 :     void post(select_op* op);
      84                 :     void work_started() noexcept;
      85                 :     void work_finished() noexcept;
      86                 : 
      87                 :     /** Get the socket service for creating peer sockets during accept. */
      88                 :     select_socket_service* socket_service() const noexcept;
      89                 : 
      90                 : private:
      91                 :     capy::execution_context& ctx_;
      92                 :     std::unique_ptr<select_acceptor_state> state_;
      93                 : };
      94                 : 
      95                 : inline void
      96 MIS           0 : select_accept_op::cancel() noexcept
      97                 : {
      98               0 :     if (acceptor_impl_)
      99               0 :         acceptor_impl_->cancel_single_op(*this);
     100                 :     else
     101               0 :         request_cancel();
     102               0 : }
     103                 : 
     104                 : inline void
     105 HIT        3139 : select_accept_op::operator()()
     106                 : {
     107            3139 :     stop_cb.reset();
     108                 : 
     109            3139 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     110                 : 
     111            3139 :     if (ec_out)
     112                 :     {
     113            3139 :         if (cancelled.load(std::memory_order_acquire))
     114               3 :             *ec_out = capy::error::canceled;
     115            3136 :         else if (errn != 0)
     116 MIS           0 :             *ec_out = make_err(errn);
     117                 :         else
     118 HIT        3136 :             *ec_out = {};
     119                 :     }
     120                 : 
     121            3139 :     if (success && accepted_fd >= 0)
     122                 :     {
     123            3136 :         if (acceptor_impl_)
     124                 :         {
     125            3136 :             auto* socket_svc = static_cast<select_acceptor*>(acceptor_impl_)
     126            3136 :                                    ->service()
     127            3136 :                                    .socket_service();
     128            3136 :             if (socket_svc)
     129                 :             {
     130                 :                 auto& impl =
     131            3136 :                     static_cast<select_socket&>(*socket_svc->construct());
     132            3136 :                 impl.set_socket(accepted_fd);
     133                 : 
     134            3136 :                 sockaddr_in local_addr{};
     135            3136 :                 socklen_t local_len = sizeof(local_addr);
     136            3136 :                 sockaddr_in remote_addr{};
     137            3136 :                 socklen_t remote_len = sizeof(remote_addr);
     138                 : 
     139            3136 :                 endpoint local_ep, remote_ep;
     140            3136 :                 if (::getsockname(
     141                 :                         accepted_fd, reinterpret_cast<sockaddr*>(&local_addr),
     142            3136 :                         &local_len) == 0)
     143            3136 :                     local_ep = from_sockaddr_in(local_addr);
     144            3136 :                 if (::getpeername(
     145                 :                         accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr),
     146            3136 :                         &remote_len) == 0)
     147            3136 :                     remote_ep = from_sockaddr_in(remote_addr);
     148                 : 
     149            3136 :                 impl.set_endpoints(local_ep, remote_ep);
     150                 : 
     151            3136 :                 if (impl_out)
     152            3136 :                     *impl_out = &impl;
     153                 : 
     154            3136 :                 accepted_fd = -1;
     155                 :             }
     156                 :             else
     157                 :             {
     158 MIS           0 :                 if (ec_out && !*ec_out)
     159               0 :                     *ec_out = make_err(ENOENT);
     160               0 :                 ::close(accepted_fd);
     161               0 :                 accepted_fd = -1;
     162               0 :                 if (impl_out)
     163               0 :                     *impl_out = nullptr;
     164                 :             }
     165                 :         }
     166                 :         else
     167                 :         {
     168               0 :             ::close(accepted_fd);
     169               0 :             accepted_fd = -1;
     170               0 :             if (impl_out)
     171               0 :                 *impl_out = nullptr;
     172                 :         }
     173 HIT        3136 :     }
     174                 :     else
     175                 :     {
     176               3 :         if (accepted_fd >= 0)
     177                 :         {
     178 MIS           0 :             ::close(accepted_fd);
     179               0 :             accepted_fd = -1;
     180                 :         }
     181                 : 
     182 HIT           3 :         if (peer_impl)
     183                 :         {
     184                 :             auto* socket_svc_cleanup =
     185 MIS           0 :                 static_cast<select_acceptor*>(acceptor_impl_)
     186               0 :                     ->service()
     187               0 :                     .socket_service();
     188               0 :             if (socket_svc_cleanup)
     189               0 :                 socket_svc_cleanup->destroy(peer_impl);
     190               0 :             peer_impl = nullptr;
     191                 :         }
     192                 : 
     193 HIT           3 :         if (impl_out)
     194               3 :             *impl_out = nullptr;
     195                 :     }
     196                 : 
     197                 :     // Move to stack before destroying the frame
     198            3139 :     capy::executor_ref saved_ex(ex);
     199            3139 :     std::coroutine_handle<> saved_h(h);
     200            3139 :     impl_ptr.reset();
     201            3139 :     dispatch_coro(saved_ex, saved_h).resume();
     202            3139 : }
     203                 : 
     204              46 : inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
     205              46 :     : svc_(svc)
     206                 : {
     207              46 : }
     208                 : 
     209                 : inline std::coroutine_handle<>
     210            3139 : select_acceptor::accept(
     211                 :     std::coroutine_handle<> h,
     212                 :     capy::executor_ref ex,
     213                 :     std::stop_token token,
     214                 :     std::error_code* ec,
     215                 :     io_object::implementation** impl_out)
     216                 : {
     217            3139 :     auto& op = acc_;
     218            3139 :     op.reset();
     219            3139 :     op.h        = h;
     220            3139 :     op.ex       = ex;
     221            3139 :     op.ec_out   = ec;
     222            3139 :     op.impl_out = impl_out;
     223            3139 :     op.fd       = fd_;
     224            3139 :     op.start(token, this);
     225                 : 
     226            3139 :     sockaddr_in addr{};
     227            3139 :     socklen_t addrlen = sizeof(addr);
     228            3139 :     int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
     229                 : 
     230            3139 :     if (accepted >= 0)
     231                 :     {
     232                 :         // Reject fds that exceed select()'s FD_SETSIZE limit.
     233               2 :         if (accepted >= FD_SETSIZE)
     234                 :         {
     235 MIS           0 :             ::close(accepted);
     236               0 :             op.accepted_fd = -1;
     237               0 :             op.complete(EINVAL, 0);
     238               0 :             op.impl_ptr = shared_from_this();
     239               0 :             svc_.post(&op);
     240               0 :             return std::noop_coroutine();
     241                 :         }
     242                 : 
     243                 :         // Set non-blocking and close-on-exec flags.
     244 HIT           2 :         int flags = ::fcntl(accepted, F_GETFL, 0);
     245               2 :         if (flags == -1)
     246                 :         {
     247 MIS           0 :             int err = errno;
     248               0 :             ::close(accepted);
     249               0 :             op.accepted_fd = -1;
     250               0 :             op.complete(err, 0);
     251               0 :             op.impl_ptr = shared_from_this();
     252               0 :             svc_.post(&op);
     253               0 :             return std::noop_coroutine();
     254                 :         }
     255                 : 
     256 HIT           2 :         if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
     257                 :         {
     258 MIS           0 :             int err = errno;
     259               0 :             ::close(accepted);
     260               0 :             op.accepted_fd = -1;
     261               0 :             op.complete(err, 0);
     262               0 :             op.impl_ptr = shared_from_this();
     263               0 :             svc_.post(&op);
     264               0 :             return std::noop_coroutine();
     265                 :         }
     266                 : 
     267 HIT           2 :         if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
     268                 :         {
     269 MIS           0 :             int err = errno;
     270               0 :             ::close(accepted);
     271               0 :             op.accepted_fd = -1;
     272               0 :             op.complete(err, 0);
     273               0 :             op.impl_ptr = shared_from_this();
     274               0 :             svc_.post(&op);
     275               0 :             return std::noop_coroutine();
     276                 :         }
     277                 : 
     278 HIT           2 :         op.accepted_fd = accepted;
     279               2 :         op.complete(0, 0);
     280               2 :         op.impl_ptr = shared_from_this();
     281               2 :         svc_.post(&op);
     282               2 :         return std::noop_coroutine();
     283                 :     }
     284                 : 
     285            3137 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     286                 :     {
     287            3137 :         svc_.work_started();
     288            3137 :         op.impl_ptr = shared_from_this();
     289                 : 
     290                 :         // Set registering BEFORE register_fd to close the race window where
     291                 :         // reactor sees an event before we set registered.
     292            3137 :         op.registered.store(
     293                 :             select_registration_state::registering, std::memory_order_release);
     294            3137 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     295                 : 
     296                 :         // Transition to registered. If this fails, reactor or cancel already
     297                 :         // claimed the op (state is now unregistered), so we're done. However,
     298                 :         // we must still deregister the fd because cancel's deregister_fd may
     299                 :         // have run before our register_fd, leaving the fd orphaned.
     300            3137 :         auto expected = select_registration_state::registering;
     301            3137 :         if (!op.registered.compare_exchange_strong(
     302                 :                 expected, select_registration_state::registered,
     303                 :                 std::memory_order_acq_rel))
     304                 :         {
     305 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     306               0 :             return std::noop_coroutine();
     307                 :         }
     308                 : 
     309                 :         // If cancelled was set before we registered, handle it now.
     310 HIT        3137 :         if (op.cancelled.load(std::memory_order_acquire))
     311                 :         {
     312 MIS           0 :             auto prev = op.registered.exchange(
     313                 :                 select_registration_state::unregistered,
     314                 :                 std::memory_order_acq_rel);
     315               0 :             if (prev != select_registration_state::unregistered)
     316                 :             {
     317               0 :                 svc_.scheduler().deregister_fd(
     318                 :                     fd_, select_scheduler::event_read);
     319               0 :                 op.impl_ptr = shared_from_this();
     320               0 :                 svc_.post(&op);
     321               0 :                 svc_.work_finished();
     322                 :             }
     323                 :         }
     324 HIT        3137 :         return std::noop_coroutine();
     325                 :     }
     326                 : 
     327 MIS           0 :     op.complete(errno, 0);
     328               0 :     op.impl_ptr = shared_from_this();
     329               0 :     svc_.post(&op);
     330               0 :     return std::noop_coroutine();
     331                 : }
     332                 : 
     333                 : inline void
     334 HIT           1 : select_acceptor::cancel() noexcept
     335                 : {
     336               1 :     auto self = weak_from_this().lock();
     337               1 :     if (!self)
     338 MIS           0 :         return;
     339                 : 
     340 HIT           1 :     auto prev = acc_.registered.exchange(
     341                 :         select_registration_state::unregistered, std::memory_order_acq_rel);
     342               1 :     acc_.request_cancel();
     343                 : 
     344               1 :     if (prev != select_registration_state::unregistered)
     345                 :     {
     346               1 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     347               1 :         acc_.impl_ptr = self;
     348               1 :         svc_.post(&acc_);
     349               1 :         svc_.work_finished();
     350                 :     }
     351               1 : }
     352                 : 
     353                 : inline void
     354 MIS           0 : select_acceptor::cancel_single_op(select_op& op) noexcept
     355                 : {
     356               0 :     auto self = weak_from_this().lock();
     357               0 :     if (!self)
     358               0 :         return;
     359                 : 
     360               0 :     auto prev = op.registered.exchange(
     361                 :         select_registration_state::unregistered, std::memory_order_acq_rel);
     362               0 :     op.request_cancel();
     363                 : 
     364               0 :     if (prev != select_registration_state::unregistered)
     365                 :     {
     366               0 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     367                 : 
     368               0 :         op.impl_ptr = self;
     369               0 :         svc_.post(&op);
     370               0 :         svc_.work_finished();
     371                 :     }
     372               0 : }
     373                 : 
     374                 : inline void
     375 HIT         180 : select_acceptor::close_socket() noexcept
     376                 : {
     377             180 :     auto self = weak_from_this().lock();
     378             180 :     if (self)
     379                 :     {
     380             180 :         auto prev = acc_.registered.exchange(
     381                 :             select_registration_state::unregistered, std::memory_order_acq_rel);
     382             180 :         acc_.request_cancel();
     383                 : 
     384             180 :         if (prev != select_registration_state::unregistered)
     385                 :         {
     386               2 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     387               2 :             acc_.impl_ptr = self;
     388               2 :             svc_.post(&acc_);
     389               2 :             svc_.work_finished();
     390                 :         }
     391                 :     }
     392                 : 
     393             180 :     if (fd_ >= 0)
     394                 :     {
     395              44 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     396              44 :         ::close(fd_);
     397              44 :         fd_ = -1;
     398                 :     }
     399                 : 
     400             180 :     local_endpoint_ = endpoint{};
     401             180 : }
     402                 : 
     403             135 : inline select_acceptor_service::select_acceptor_service(
     404             135 :     capy::execution_context& ctx)
     405             135 :     : ctx_(ctx)
     406             135 :     , state_(
     407                 :           std::make_unique<select_acceptor_state>(
     408             135 :               ctx.use_service<select_scheduler>()))
     409                 : {
     410             135 : }
     411                 : 
     412             270 : inline select_acceptor_service::~select_acceptor_service() {}
     413                 : 
     414                 : inline void
     415             135 : select_acceptor_service::shutdown()
     416                 : {
     417             135 :     std::lock_guard lock(state_->mutex_);
     418                 : 
     419             135 :     while (auto* impl = state_->acceptor_list_.pop_front())
     420 MIS           0 :         impl->close_socket();
     421                 : 
     422                 :     // Don't clear acceptor_ptrs_ here — same rationale as
     423                 :     // select_socket_service::shutdown(). Let ~state_ release ptrs
     424                 :     // after scheduler shutdown has drained all queued ops.
     425 HIT         135 : }
     426                 : 
     427                 : inline io_object::implementation*
     428              46 : select_acceptor_service::construct()
     429                 : {
     430              46 :     auto impl = std::make_shared<select_acceptor>(*this);
     431              46 :     auto* raw = impl.get();
     432                 : 
     433              46 :     std::lock_guard lock(state_->mutex_);
     434              46 :     state_->acceptor_list_.push_back(raw);
     435              46 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     436                 : 
     437              46 :     return raw;
     438              46 : }
     439                 : 
     440                 : inline void
     441              46 : select_acceptor_service::destroy(io_object::implementation* impl)
     442                 : {
     443              46 :     auto* select_impl = static_cast<select_acceptor*>(impl);
     444              46 :     select_impl->close_socket();
     445              46 :     std::lock_guard lock(state_->mutex_);
     446              46 :     state_->acceptor_list_.remove(select_impl);
     447              46 :     state_->acceptor_ptrs_.erase(select_impl);
     448              46 : }
     449                 : 
     450                 : inline void
     451              90 : select_acceptor_service::close(io_object::handle& h)
     452                 : {
     453              90 :     static_cast<select_acceptor*>(h.get())->close_socket();
     454              90 : }
     455                 : 
     456                 : inline std::error_code
     457              44 : select_acceptor_service::open_acceptor(
     458                 :     tcp_acceptor::implementation& impl, endpoint ep, int backlog)
     459                 : {
     460              44 :     auto* select_impl = static_cast<select_acceptor*>(&impl);
     461              44 :     select_impl->close_socket();
     462                 : 
     463              44 :     int fd = ::socket(AF_INET, SOCK_STREAM, 0);
     464              44 :     if (fd < 0)
     465 MIS           0 :         return make_err(errno);
     466                 : 
     467                 :     // Set non-blocking and close-on-exec
     468 HIT          44 :     int flags = ::fcntl(fd, F_GETFL, 0);
     469              44 :     if (flags == -1)
     470                 :     {
     471 MIS           0 :         int errn = errno;
     472               0 :         ::close(fd);
     473               0 :         return make_err(errn);
     474                 :     }
     475 HIT          44 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     476                 :     {
     477 MIS           0 :         int errn = errno;
     478               0 :         ::close(fd);
     479               0 :         return make_err(errn);
     480                 :     }
     481 HIT          44 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     482                 :     {
     483 MIS           0 :         int errn = errno;
     484               0 :         ::close(fd);
     485               0 :         return make_err(errn);
     486                 :     }
     487                 : 
     488                 :     // Check fd is within select() limits
     489 HIT          44 :     if (fd >= FD_SETSIZE)
     490                 :     {
     491 MIS           0 :         ::close(fd);
     492               0 :         return make_err(EMFILE);
     493                 :     }
     494                 : 
     495 HIT          44 :     int reuse = 1;
     496              44 :     ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
     497                 : 
     498              44 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     499              44 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
     500                 :     {
     501 MIS           0 :         int errn = errno;
     502               0 :         ::close(fd);
     503               0 :         return make_err(errn);
     504                 :     }
     505                 : 
     506 HIT          44 :     if (::listen(fd, backlog) < 0)
     507                 :     {
     508 MIS           0 :         int errn = errno;
     509               0 :         ::close(fd);
     510               0 :         return make_err(errn);
     511                 :     }
     512                 : 
     513 HIT          44 :     select_impl->fd_ = fd;
     514                 : 
     515                 :     // Cache the local endpoint (queries OS for ephemeral port if port was 0)
     516              44 :     sockaddr_in local_addr{};
     517              44 :     socklen_t local_len = sizeof(local_addr);
     518              44 :     if (::getsockname(
     519              44 :             fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     520              44 :         select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
     521                 : 
     522              44 :     return {};
     523                 : }
     524                 : 
     525                 : inline void
     526               5 : select_acceptor_service::post(select_op* op)
     527                 : {
     528               5 :     state_->sched_.post(op);
     529               5 : }
     530                 : 
     531                 : inline void
     532            3137 : select_acceptor_service::work_started() noexcept
     533                 : {
     534            3137 :     state_->sched_.work_started();
     535            3137 : }
     536                 : 
     537                 : inline void
     538               3 : select_acceptor_service::work_finished() noexcept
     539                 : {
     540               3 :     state_->sched_.work_finished();
     541               3 : }
     542                 : 
     543                 : inline select_socket_service*
     544            3136 : select_acceptor_service::socket_service() const noexcept
     545                 : {
     546            3136 :     auto* svc = ctx_.find_service<detail::socket_service>();
     547            3136 :     return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
     548                 : }
     549                 : 
     550                 : } // namespace boost::corosio::detail
     551                 : 
     552                 : #endif // BOOST_COROSIO_HAS_SELECT
     553                 : 
     554                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3