LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_socket_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 80.3 % 467 375 92
Test Date: 2026-02-18 18:41:52 Functions: 92.5 % 40 37 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/socket_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
      22                 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
      23                 : 
      24                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      25                 : #include <boost/corosio/detail/make_err.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/except.hpp>
      28                 : #include <boost/capy/buffers.hpp>
      29                 : 
      30                 : #include <coroutine>
      31                 : #include <mutex>
      32                 : #include <unordered_map>
      33                 : #include <utility>
      34                 : 
      35                 : #include <errno.h>
      36                 : #include <netinet/in.h>
      37                 : #include <netinet/tcp.h>
      38                 : #include <sys/epoll.h>
      39                 : #include <sys/socket.h>
      40                 : #include <unistd.h>
      41                 : 
      42                 : /*
      43                 :     epoll Socket Implementation
      44                 :     ===========================
      45                 : 
      46                 :     Each I/O operation follows the same pattern:
      47                 :       1. Try the syscall immediately (non-blocking socket)
      48                 :       2. If it succeeds or fails with a real error, post to completion queue
      49                 :       3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
      50                 : 
      51                 :     This "try first" approach avoids unnecessary epoll round-trips for
      52                 :     operations that can complete immediately (common for small reads/writes
      53                 :     on fast local connections).
      54                 : 
      55                 :     One-Shot Registration
      56                 :     ---------------------
      57                 :     We use one-shot epoll registration: each operation registers, waits for
      58                 :     one event, then unregisters. This simplifies the state machine since we
      59                 :     don't need to track whether an fd is currently registered or handle
      60                 :     re-arming. The tradeoff is slightly more epoll_ctl calls, but the
      61                 :     simplicity is worth it.
      62                 : 
      63                 :     Cancellation
      64                 :     ------------
      65                 :     See op.hpp for the completion/cancellation race handling via the
      66                 :     `registered` atomic. cancel() must complete pending operations (post
      67                 :     them with cancelled flag) so coroutines waiting on them can resume.
      68                 :     close_socket() calls cancel() first to ensure this.
      69                 : 
      70                 :     Impl Lifetime with shared_ptr
      71                 :     -----------------------------
      72                 :     Socket impls use enable_shared_from_this. The service owns impls via
      73                 :     shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
      74                 :     removal. When a user calls close(), we call cancel() which posts pending
      75                 :     ops to the scheduler.
      76                 : 
      77                 :     CRITICAL: The posted ops must keep the impl alive until they complete.
      78                 :     Otherwise the scheduler would process a freed op (use-after-free). The
      79                 :     cancel() method captures shared_from_this() into op.impl_ptr before
      80                 :     posting. When the op completes, impl_ptr is cleared, allowing the impl
      81                 :     to be destroyed if no other references exist.
      82                 : 
      83                 :     Service Ownership
      84                 :     -----------------
      85                 :     epoll_socket_service owns all socket impls. destroy_impl() removes the
      86                 :     shared_ptr from the map, but the impl may survive if ops still hold
      87                 :     impl_ptr refs. shutdown() closes all sockets and clears the map; any
      88                 :     in-flight ops will complete and release their refs.
      89                 : */
      90                 : 
      91                 : namespace boost::corosio::detail {
      92                 : 
      93                 : /** State for epoll socket service. */
      94                 : class epoll_socket_state
      95                 : {
      96                 : public:
      97 HIT         205 :     explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
      98                 :     {
      99             205 :     }
     100                 : 
     101                 :     epoll_scheduler& sched_;
     102                 :     std::mutex mutex_;
     103                 :     intrusive_list<epoll_socket> socket_list_;
     104                 :     std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
     105                 :         socket_ptrs_;
     106                 : };
     107                 : 
     108                 : /** epoll socket service implementation.
     109                 : 
     110                 :     Inherits from socket_service to enable runtime polymorphism.
     111                 :     Uses key_type = socket_service for service lookup.
     112                 : */
     113                 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
     114                 : {
     115                 : public:
     116                 :     explicit epoll_socket_service(capy::execution_context& ctx);
     117                 :     ~epoll_socket_service() override;
     118                 : 
     119                 :     epoll_socket_service(epoll_socket_service const&)            = delete;
     120                 :     epoll_socket_service& operator=(epoll_socket_service const&) = delete;
     121                 : 
     122                 :     void shutdown() override;
     123                 : 
     124                 :     io_object::implementation* construct() override;
     125                 :     void destroy(io_object::implementation*) override;
     126                 :     void close(io_object::handle&) override;
     127                 :     std::error_code open_socket(tcp_socket::implementation& impl) override;
     128                 : 
     129          324184 :     epoll_scheduler& scheduler() const noexcept
     130                 :     {
     131          324184 :         return state_->sched_;
     132                 :     }
     133                 :     void post(epoll_op* op);
     134                 :     void work_started() noexcept;
     135                 :     void work_finished() noexcept;
     136                 : 
     137                 : private:
     138                 :     std::unique_ptr<epoll_socket_state> state_;
     139                 : };
     140                 : 
     141                 : //--------------------------------------------------------------------------
     142                 : //
     143                 : // Implementation
     144                 : //
     145                 : //--------------------------------------------------------------------------
     146                 : 
     147                 : // Register an op with the reactor, handling cached edge events.
     148                 : // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
     149                 : inline void
     150            4318 : epoll_socket::register_op(
     151                 :     epoll_op& op,
     152                 :     epoll_op*& desc_slot,
     153                 :     bool& ready_flag,
     154                 :     bool& cancel_flag) noexcept
     155                 : {
     156            4318 :     svc_.work_started();
     157                 : 
     158            4318 :     std::lock_guard lock(desc_state_.mutex);
     159            4318 :     bool io_done = false;
     160            4318 :     if (ready_flag)
     161                 :     {
     162             140 :         ready_flag = false;
     163             140 :         op.perform_io();
     164             140 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     165             140 :         if (!io_done)
     166             140 :             op.errn = 0;
     167                 :     }
     168                 : 
     169            4318 :     if (cancel_flag)
     170                 :     {
     171              93 :         cancel_flag = false;
     172              93 :         op.cancelled.store(true, std::memory_order_relaxed);
     173                 :     }
     174                 : 
     175            4318 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     176                 :     {
     177              93 :         svc_.post(&op);
     178              93 :         svc_.work_finished();
     179                 :     }
     180                 :     else
     181                 :     {
     182            4225 :         desc_slot = &op;
     183                 :     }
     184            4318 : }
     185                 : 
     186                 : inline void
     187             104 : epoll_op::canceller::operator()() const noexcept
     188                 : {
     189             104 :     op->cancel();
     190             104 : }
     191                 : 
     192                 : inline void
     193 MIS           0 : epoll_connect_op::cancel() noexcept
     194                 : {
     195               0 :     if (socket_impl_)
     196               0 :         socket_impl_->cancel_single_op(*this);
     197                 :     else
     198               0 :         request_cancel();
     199               0 : }
     200                 : 
     201                 : inline void
     202 HIT          98 : epoll_read_op::cancel() noexcept
     203                 : {
     204              98 :     if (socket_impl_)
     205              98 :         socket_impl_->cancel_single_op(*this);
     206                 :     else
     207 MIS           0 :         request_cancel();
     208 HIT          98 : }
     209                 : 
     210                 : inline void
     211 MIS           0 : epoll_write_op::cancel() noexcept
     212                 : {
     213               0 :     if (socket_impl_)
     214               0 :         socket_impl_->cancel_single_op(*this);
     215                 :     else
     216               0 :         request_cancel();
     217               0 : }
     218                 : 
     219                 : inline void
     220 HIT       50700 : epoll_op::operator()()
     221                 : {
     222           50700 :     stop_cb.reset();
     223                 : 
     224           50700 :     socket_impl_->svc_.scheduler().reset_inline_budget();
     225                 : 
     226           50700 :     if (cancelled.load(std::memory_order_acquire))
     227             203 :         *ec_out = capy::error::canceled;
     228           50497 :     else if (errn != 0)
     229 MIS           0 :         *ec_out = make_err(errn);
     230 HIT       50497 :     else if (is_read_operation() && bytes_transferred == 0)
     231 MIS           0 :         *ec_out = capy::error::eof;
     232                 :     else
     233 HIT       50497 :         *ec_out = {};
     234                 : 
     235           50700 :     *bytes_out = bytes_transferred;
     236                 : 
     237                 :     // Move to stack before resuming coroutine. The coroutine might close
     238                 :     // the socket, releasing the last wrapper ref. If impl_ptr were the
     239                 :     // last ref and we destroyed it while still in operator(), we'd have
     240                 :     // use-after-free. Moving to local ensures destruction happens at
     241                 :     // function exit, after all member accesses are complete.
     242           50700 :     capy::executor_ref saved_ex(ex);
     243           50700 :     std::coroutine_handle<> saved_h(h);
     244           50700 :     auto prevent_premature_destruction = std::move(impl_ptr);
     245           50700 :     dispatch_coro(saved_ex, saved_h).resume();
     246           50700 : }
     247                 : 
     248                 : inline void
     249            4119 : epoll_connect_op::operator()()
     250                 : {
     251            4119 :     stop_cb.reset();
     252                 : 
     253            4119 :     socket_impl_->svc_.scheduler().reset_inline_budget();
     254                 : 
     255            4119 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     256                 : 
     257                 :     // Cache endpoints on successful connect
     258            4119 :     if (success && socket_impl_)
     259                 :     {
     260                 :         // Query local endpoint via getsockname (may fail, but remote is always known)
     261            4118 :         endpoint local_ep;
     262            4118 :         sockaddr_in local_addr{};
     263            4118 :         socklen_t local_len = sizeof(local_addr);
     264            4118 :         if (::getsockname(
     265            4118 :                 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     266            4118 :             local_ep = from_sockaddr_in(local_addr);
     267                 :         // Always cache remote endpoint; local may be default if getsockname failed
     268            4118 :         static_cast<epoll_socket*>(socket_impl_)
     269            4118 :             ->set_endpoints(local_ep, target_endpoint);
     270                 :     }
     271                 : 
     272            4119 :     if (cancelled.load(std::memory_order_acquire))
     273 MIS           0 :         *ec_out = capy::error::canceled;
     274 HIT        4119 :     else if (errn != 0)
     275               1 :         *ec_out = make_err(errn);
     276                 :     else
     277            4118 :         *ec_out = {};
     278                 : 
     279                 :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     280            4119 :     capy::executor_ref saved_ex(ex);
     281            4119 :     std::coroutine_handle<> saved_h(h);
     282            4119 :     auto prevent_premature_destruction = std::move(impl_ptr);
     283            4119 :     dispatch_coro(saved_ex, saved_h).resume();
     284            4119 : }
     285                 : 
     286           12411 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
     287           12411 :     : svc_(svc)
     288                 : {
     289           12411 : }
     290                 : 
     291           12411 : inline epoll_socket::~epoll_socket() = default;
     292                 : 
     293                 : inline std::coroutine_handle<>
     294            4119 : epoll_socket::connect(
     295                 :     std::coroutine_handle<> h,
     296                 :     capy::executor_ref ex,
     297                 :     endpoint ep,
     298                 :     std::stop_token token,
     299                 :     std::error_code* ec)
     300                 : {
     301            4119 :     auto& op = conn_;
     302                 : 
     303            4119 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     304                 :     int result =
     305            4119 :         ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
     306                 : 
     307            4119 :     if (result == 0)
     308                 :     {
     309 MIS           0 :         sockaddr_in local_addr{};
     310               0 :         socklen_t local_len = sizeof(local_addr);
     311               0 :         if (::getsockname(
     312               0 :                 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     313               0 :             local_endpoint_ = detail::from_sockaddr_in(local_addr);
     314               0 :         remote_endpoint_ = ep;
     315                 :     }
     316                 : 
     317 HIT        4119 :     if (result == 0 || errno != EINPROGRESS)
     318                 :     {
     319 MIS           0 :         int err = (result < 0) ? errno : 0;
     320               0 :         if (svc_.scheduler().try_consume_inline_budget())
     321                 :         {
     322               0 :             *ec = err ? make_err(err) : std::error_code{};
     323               0 :             return dispatch_coro(ex, h);
     324                 :         }
     325               0 :         op.reset();
     326               0 :         op.h               = h;
     327               0 :         op.ex              = ex;
     328               0 :         op.ec_out          = ec;
     329               0 :         op.fd              = fd_;
     330               0 :         op.target_endpoint = ep;
     331               0 :         op.start(token, this);
     332               0 :         op.impl_ptr = shared_from_this();
     333               0 :         op.complete(err, 0);
     334               0 :         svc_.post(&op);
     335               0 :         return std::noop_coroutine();
     336                 :     }
     337                 : 
     338                 :     // EINPROGRESS — register with reactor
     339 HIT        4119 :     op.reset();
     340            4119 :     op.h               = h;
     341            4119 :     op.ex              = ex;
     342            4119 :     op.ec_out          = ec;
     343            4119 :     op.fd              = fd_;
     344            4119 :     op.target_endpoint = ep;
     345            4119 :     op.start(token, this);
     346            4119 :     op.impl_ptr = shared_from_this();
     347                 : 
     348            4119 :     register_op(
     349            4119 :         op, desc_state_.connect_op, desc_state_.write_ready,
     350            4119 :         desc_state_.connect_cancel_pending);
     351            4119 :     return std::noop_coroutine();
     352                 : }
     353                 : 
     354                 : inline std::coroutine_handle<>
     355          126634 : epoll_socket::read_some(
     356                 :     std::coroutine_handle<> h,
     357                 :     capy::executor_ref ex,
     358                 :     io_buffer_param param,
     359                 :     std::stop_token token,
     360                 :     std::error_code* ec,
     361                 :     std::size_t* bytes_out)
     362                 : {
     363          126634 :     auto& op = rd_;
     364          126634 :     op.reset();
     365                 : 
     366          126634 :     capy::mutable_buffer bufs[epoll_read_op::max_buffers];
     367          126634 :     op.iovec_count =
     368          126634 :         static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
     369                 : 
     370          126634 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     371                 :     {
     372               1 :         op.empty_buffer_read = true;
     373               1 :         op.h                 = h;
     374               1 :         op.ex                = ex;
     375               1 :         op.ec_out            = ec;
     376               1 :         op.bytes_out         = bytes_out;
     377               1 :         op.start(token, this);
     378               1 :         op.impl_ptr = shared_from_this();
     379               1 :         op.complete(0, 0);
     380               1 :         svc_.post(&op);
     381               1 :         return std::noop_coroutine();
     382                 :     }
     383                 : 
     384          253266 :     for (int i = 0; i < op.iovec_count; ++i)
     385                 :     {
     386          126633 :         op.iovecs[i].iov_base = bufs[i].data();
     387          126633 :         op.iovecs[i].iov_len  = bufs[i].size();
     388                 :     }
     389                 : 
     390                 :     // Speculative read
     391                 :     ssize_t n;
     392                 :     do
     393                 :     {
     394          126633 :         n = ::readv(fd_, op.iovecs, op.iovec_count);
     395                 :     }
     396          126633 :     while (n < 0 && errno == EINTR);
     397                 : 
     398          126633 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     399                 :     {
     400          126434 :         int err    = (n < 0) ? errno : 0;
     401          126434 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     402                 : 
     403          126434 :         if (svc_.scheduler().try_consume_inline_budget())
     404                 :         {
     405          101194 :             if (err)
     406 MIS           0 :                 *ec = make_err(err);
     407 HIT      101194 :             else if (n == 0)
     408               5 :                 *ec = capy::error::eof;
     409                 :             else
     410          101189 :                 *ec = {};
     411          101194 :             *bytes_out = bytes;
     412          101194 :             return dispatch_coro(ex, h);
     413                 :         }
     414           25240 :         op.h         = h;
     415           25240 :         op.ex        = ex;
     416           25240 :         op.ec_out    = ec;
     417           25240 :         op.bytes_out = bytes_out;
     418           25240 :         op.start(token, this);
     419           25240 :         op.impl_ptr = shared_from_this();
     420           25240 :         op.complete(err, bytes);
     421           25240 :         svc_.post(&op);
     422           25240 :         return std::noop_coroutine();
     423                 :     }
     424                 : 
     425                 :     // EAGAIN — register with reactor
     426             199 :     op.h         = h;
     427             199 :     op.ex        = ex;
     428             199 :     op.ec_out    = ec;
     429             199 :     op.bytes_out = bytes_out;
     430             199 :     op.fd        = fd_;
     431             199 :     op.start(token, this);
     432             199 :     op.impl_ptr = shared_from_this();
     433                 : 
     434             199 :     register_op(
     435             199 :         op, desc_state_.read_op, desc_state_.read_ready,
     436             199 :         desc_state_.read_cancel_pending);
     437             199 :     return std::noop_coroutine();
     438                 : }
     439                 : 
     440                 : inline std::coroutine_handle<>
     441          126436 : epoll_socket::write_some(
     442                 :     std::coroutine_handle<> h,
     443                 :     capy::executor_ref ex,
     444                 :     io_buffer_param param,
     445                 :     std::stop_token token,
     446                 :     std::error_code* ec,
     447                 :     std::size_t* bytes_out)
     448                 : {
     449          126436 :     auto& op = wr_;
     450          126436 :     op.reset();
     451                 : 
     452          126436 :     capy::mutable_buffer bufs[epoll_write_op::max_buffers];
     453          126436 :     op.iovec_count =
     454          126436 :         static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
     455                 : 
     456          126436 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     457                 :     {
     458               1 :         op.h         = h;
     459               1 :         op.ex        = ex;
     460               1 :         op.ec_out    = ec;
     461               1 :         op.bytes_out = bytes_out;
     462               1 :         op.start(token, this);
     463               1 :         op.impl_ptr = shared_from_this();
     464               1 :         op.complete(0, 0);
     465               1 :         svc_.post(&op);
     466               1 :         return std::noop_coroutine();
     467                 :     }
     468                 : 
     469          252870 :     for (int i = 0; i < op.iovec_count; ++i)
     470                 :     {
     471          126435 :         op.iovecs[i].iov_base = bufs[i].data();
     472          126435 :         op.iovecs[i].iov_len  = bufs[i].size();
     473                 :     }
     474                 : 
     475                 :     // Speculative write
     476          126435 :     msghdr msg{};
     477          126435 :     msg.msg_iov    = op.iovecs;
     478          126435 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     479                 : 
     480                 :     ssize_t n;
     481                 :     do
     482                 :     {
     483          126435 :         n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     484                 :     }
     485          126435 :     while (n < 0 && errno == EINTR);
     486                 : 
     487          126435 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     488                 :     {
     489          126435 :         int err    = (n < 0) ? errno : 0;
     490          126435 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     491                 : 
     492          126435 :         if (svc_.scheduler().try_consume_inline_budget())
     493                 :         {
     494          101176 :             *ec        = err ? make_err(err) : std::error_code{};
     495          101176 :             *bytes_out = bytes;
     496          101176 :             return dispatch_coro(ex, h);
     497                 :         }
     498           25259 :         op.h         = h;
     499           25259 :         op.ex        = ex;
     500           25259 :         op.ec_out    = ec;
     501           25259 :         op.bytes_out = bytes_out;
     502           25259 :         op.start(token, this);
     503           25259 :         op.impl_ptr = shared_from_this();
     504           25259 :         op.complete(err, bytes);
     505           25259 :         svc_.post(&op);
     506           25259 :         return std::noop_coroutine();
     507                 :     }
     508                 : 
     509                 :     // EAGAIN — register with reactor
     510 MIS           0 :     op.h         = h;
     511               0 :     op.ex        = ex;
     512               0 :     op.ec_out    = ec;
     513               0 :     op.bytes_out = bytes_out;
     514               0 :     op.fd        = fd_;
     515               0 :     op.start(token, this);
     516               0 :     op.impl_ptr = shared_from_this();
     517                 : 
     518               0 :     register_op(
     519               0 :         op, desc_state_.write_op, desc_state_.write_ready,
     520               0 :         desc_state_.write_cancel_pending);
     521               0 :     return std::noop_coroutine();
     522                 : }
     523                 : 
     524                 : inline std::error_code
     525 HIT           3 : epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
     526                 : {
     527                 :     int how;
     528               3 :     switch (what)
     529                 :     {
     530               1 :     case tcp_socket::shutdown_receive:
     531               1 :         how = SHUT_RD;
     532               1 :         break;
     533               1 :     case tcp_socket::shutdown_send:
     534               1 :         how = SHUT_WR;
     535               1 :         break;
     536               1 :     case tcp_socket::shutdown_both:
     537               1 :         how = SHUT_RDWR;
     538               1 :         break;
     539 MIS           0 :     default:
     540               0 :         return make_err(EINVAL);
     541                 :     }
     542 HIT           3 :     if (::shutdown(fd_, how) != 0)
     543 MIS           0 :         return make_err(errno);
     544 HIT           3 :     return {};
     545                 : }
     546                 : 
     547                 : inline std::error_code
     548               5 : epoll_socket::set_no_delay(bool value) noexcept
     549                 : {
     550               5 :     int flag = value ? 1 : 0;
     551               5 :     if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
     552 MIS           0 :         return make_err(errno);
     553 HIT           5 :     return {};
     554                 : }
     555                 : 
     556                 : inline bool
     557               5 : epoll_socket::no_delay(std::error_code& ec) const noexcept
     558                 : {
     559               5 :     int flag      = 0;
     560               5 :     socklen_t len = sizeof(flag);
     561               5 :     if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
     562                 :     {
     563 MIS           0 :         ec = make_err(errno);
     564               0 :         return false;
     565                 :     }
     566 HIT           5 :     ec = {};
     567               5 :     return flag != 0;
     568                 : }
     569                 : 
     570                 : inline std::error_code
     571               4 : epoll_socket::set_keep_alive(bool value) noexcept
     572                 : {
     573               4 :     int flag = value ? 1 : 0;
     574               4 :     if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
     575 MIS           0 :         return make_err(errno);
     576 HIT           4 :     return {};
     577                 : }
     578                 : 
     579                 : inline bool
     580               4 : epoll_socket::keep_alive(std::error_code& ec) const noexcept
     581                 : {
     582               4 :     int flag      = 0;
     583               4 :     socklen_t len = sizeof(flag);
     584               4 :     if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
     585                 :     {
     586 MIS           0 :         ec = make_err(errno);
     587               0 :         return false;
     588                 :     }
     589 HIT           4 :     ec = {};
     590               4 :     return flag != 0;
     591                 : }
     592                 : 
     593                 : inline std::error_code
     594               1 : epoll_socket::set_receive_buffer_size(int size) noexcept
     595                 : {
     596               1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
     597 MIS           0 :         return make_err(errno);
     598 HIT           1 :     return {};
     599                 : }
     600                 : 
     601                 : inline int
     602               3 : epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
     603                 : {
     604               3 :     int size      = 0;
     605               3 :     socklen_t len = sizeof(size);
     606               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
     607                 :     {
     608 MIS           0 :         ec = make_err(errno);
     609               0 :         return 0;
     610                 :     }
     611 HIT           3 :     ec = {};
     612               3 :     return size;
     613                 : }
     614                 : 
     615                 : inline std::error_code
     616               1 : epoll_socket::set_send_buffer_size(int size) noexcept
     617                 : {
     618               1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
     619 MIS           0 :         return make_err(errno);
     620 HIT           1 :     return {};
     621                 : }
     622                 : 
     623                 : inline int
     624               3 : epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
     625                 : {
     626               3 :     int size      = 0;
     627               3 :     socklen_t len = sizeof(size);
     628               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
     629                 :     {
     630 MIS           0 :         ec = make_err(errno);
     631               0 :         return 0;
     632                 :     }
     633 HIT           3 :     ec = {};
     634               3 :     return size;
     635                 : }
     636                 : 
     637                 : inline std::error_code
     638              10 : epoll_socket::set_linger(bool enabled, int timeout) noexcept
     639                 : {
     640              10 :     if (timeout < 0)
     641               1 :         return make_err(EINVAL);
     642                 :     struct ::linger lg;
     643               9 :     lg.l_onoff  = enabled ? 1 : 0;
     644               9 :     lg.l_linger = timeout;
     645               9 :     if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
     646 MIS           0 :         return make_err(errno);
     647 HIT           9 :     return {};
     648                 : }
     649                 : 
     650                 : inline tcp_socket::linger_options
     651               3 : epoll_socket::linger(std::error_code& ec) const noexcept
     652                 : {
     653               3 :     struct ::linger lg{};
     654               3 :     socklen_t len = sizeof(lg);
     655               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
     656                 :     {
     657 MIS           0 :         ec = make_err(errno);
     658               0 :         return {};
     659                 :     }
     660 HIT           3 :     ec = {};
     661               3 :     return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
     662                 : }
     663                 : 
     664                 : inline void
     665             183 : epoll_socket::cancel() noexcept
     666                 : {
     667             183 :     auto self = weak_from_this().lock();
     668             183 :     if (!self)
     669 MIS           0 :         return;
     670                 : 
     671 HIT         183 :     conn_.request_cancel();
     672             183 :     rd_.request_cancel();
     673             183 :     wr_.request_cancel();
     674                 : 
     675             183 :     epoll_op* conn_claimed = nullptr;
     676             183 :     epoll_op* rd_claimed   = nullptr;
     677             183 :     epoll_op* wr_claimed   = nullptr;
     678                 :     {
     679             183 :         std::lock_guard lock(desc_state_.mutex);
     680             183 :         if (desc_state_.connect_op == &conn_)
     681 MIS           0 :             conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
     682                 :         else
     683 HIT         183 :             desc_state_.connect_cancel_pending = true;
     684             183 :         if (desc_state_.read_op == &rd_)
     685               3 :             rd_claimed = std::exchange(desc_state_.read_op, nullptr);
     686                 :         else
     687             180 :             desc_state_.read_cancel_pending = true;
     688             183 :         if (desc_state_.write_op == &wr_)
     689 MIS           0 :             wr_claimed = std::exchange(desc_state_.write_op, nullptr);
     690                 :         else
     691 HIT         183 :             desc_state_.write_cancel_pending = true;
     692             183 :     }
     693                 : 
     694             183 :     if (conn_claimed)
     695                 :     {
     696 MIS           0 :         conn_.impl_ptr = self;
     697               0 :         svc_.post(&conn_);
     698               0 :         svc_.work_finished();
     699                 :     }
     700 HIT         183 :     if (rd_claimed)
     701                 :     {
     702               3 :         rd_.impl_ptr = self;
     703               3 :         svc_.post(&rd_);
     704               3 :         svc_.work_finished();
     705                 :     }
     706             183 :     if (wr_claimed)
     707                 :     {
     708 MIS           0 :         wr_.impl_ptr = self;
     709               0 :         svc_.post(&wr_);
     710               0 :         svc_.work_finished();
     711                 :     }
     712 HIT         183 : }
     713                 : 
     714                 : inline void
     715              98 : epoll_socket::cancel_single_op(epoll_op& op) noexcept
     716                 : {
     717              98 :     auto self = weak_from_this().lock();
     718              98 :     if (!self)
     719 MIS           0 :         return;
     720                 : 
     721 HIT          98 :     op.request_cancel();
     722                 : 
     723              98 :     epoll_op** desc_op_ptr = nullptr;
     724              98 :     if (&op == &conn_)
     725 MIS           0 :         desc_op_ptr = &desc_state_.connect_op;
     726 HIT          98 :     else if (&op == &rd_)
     727              98 :         desc_op_ptr = &desc_state_.read_op;
     728 MIS           0 :     else if (&op == &wr_)
     729               0 :         desc_op_ptr = &desc_state_.write_op;
     730                 : 
     731 HIT          98 :     if (desc_op_ptr)
     732                 :     {
     733              98 :         epoll_op* claimed = nullptr;
     734                 :         {
     735              98 :             std::lock_guard lock(desc_state_.mutex);
     736              98 :             if (*desc_op_ptr == &op)
     737              98 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     738 MIS           0 :             else if (&op == &conn_)
     739               0 :                 desc_state_.connect_cancel_pending = true;
     740               0 :             else if (&op == &rd_)
     741               0 :                 desc_state_.read_cancel_pending = true;
     742               0 :             else if (&op == &wr_)
     743               0 :                 desc_state_.write_cancel_pending = true;
     744 HIT          98 :         }
     745              98 :         if (claimed)
     746                 :         {
     747              98 :             op.impl_ptr = self;
     748              98 :             svc_.post(&op);
     749              98 :             svc_.work_finished();
     750                 :         }
     751                 :     }
     752              98 : }
     753                 : 
     754                 : inline void
     755           37200 : epoll_socket::close_socket() noexcept
     756                 : {
     757           37200 :     auto self = weak_from_this().lock();
     758           37200 :     if (self)
     759                 :     {
     760           37200 :         conn_.request_cancel();
     761           37200 :         rd_.request_cancel();
     762           37200 :         wr_.request_cancel();
     763                 : 
     764           37200 :         epoll_op* conn_claimed = nullptr;
     765           37200 :         epoll_op* rd_claimed   = nullptr;
     766           37200 :         epoll_op* wr_claimed   = nullptr;
     767                 :         {
     768           37200 :             std::lock_guard lock(desc_state_.mutex);
     769           37200 :             conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
     770           37200 :             rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
     771           37200 :             wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
     772           37200 :             desc_state_.read_ready             = false;
     773           37200 :             desc_state_.write_ready            = false;
     774           37200 :             desc_state_.read_cancel_pending    = false;
     775           37200 :             desc_state_.write_cancel_pending   = false;
     776           37200 :             desc_state_.connect_cancel_pending = false;
     777           37200 :         }
     778                 : 
     779           37200 :         if (conn_claimed)
     780                 :         {
     781 MIS           0 :             conn_.impl_ptr = self;
     782               0 :             svc_.post(&conn_);
     783               0 :             svc_.work_finished();
     784                 :         }
     785 HIT       37200 :         if (rd_claimed)
     786                 :         {
     787               1 :             rd_.impl_ptr = self;
     788               1 :             svc_.post(&rd_);
     789               1 :             svc_.work_finished();
     790                 :         }
     791           37200 :         if (wr_claimed)
     792                 :         {
     793 MIS           0 :             wr_.impl_ptr = self;
     794               0 :             svc_.post(&wr_);
     795               0 :             svc_.work_finished();
     796                 :         }
     797                 : 
     798 HIT       37200 :         if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     799              94 :             desc_state_.impl_ref_ = self;
     800                 :     }
     801                 : 
     802           37200 :     if (fd_ >= 0)
     803                 :     {
     804            8248 :         if (desc_state_.registered_events != 0)
     805            8248 :             svc_.scheduler().deregister_descriptor(fd_);
     806            8248 :         ::close(fd_);
     807            8248 :         fd_ = -1;
     808                 :     }
     809                 : 
     810           37200 :     desc_state_.fd                = -1;
     811           37200 :     desc_state_.registered_events = 0;
     812                 : 
     813           37200 :     local_endpoint_  = endpoint{};
     814           37200 :     remote_endpoint_ = endpoint{};
     815           37200 : }
     816                 : 
     817             205 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
     818             205 :     : state_(
     819                 :           std::make_unique<epoll_socket_state>(
     820             205 :               ctx.use_service<epoll_scheduler>()))
     821                 : {
     822             205 : }
     823                 : 
     824             410 : inline epoll_socket_service::~epoll_socket_service() {}
     825                 : 
     826                 : inline void
     827             205 : epoll_socket_service::shutdown()
     828                 : {
     829             205 :     std::lock_guard lock(state_->mutex_);
     830                 : 
     831             205 :     while (auto* impl = state_->socket_list_.pop_front())
     832 MIS           0 :         impl->close_socket();
     833                 : 
     834                 :     // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
     835                 :     // drains completed_ops_, calling destroy() on each queued op. If we
     836                 :     // released our shared_ptrs now, an epoll_op::destroy() could free the
     837                 :     // last ref to an impl whose embedded descriptor_state is still linked
     838                 :     // in the queue — use-after-free on the next pop(). Letting ~state_
     839                 :     // release the ptrs (during service destruction, after scheduler
     840                 :     // shutdown) keeps every impl alive until all ops have been drained.
     841 HIT         205 : }
     842                 : 
     843                 : inline io_object::implementation*
     844           12411 : epoll_socket_service::construct()
     845                 : {
     846           12411 :     auto impl = std::make_shared<epoll_socket>(*this);
     847           12411 :     auto* raw = impl.get();
     848                 : 
     849                 :     {
     850           12411 :         std::lock_guard lock(state_->mutex_);
     851           12411 :         state_->socket_list_.push_back(raw);
     852           12411 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     853           12411 :     }
     854                 : 
     855           12411 :     return raw;
     856           12411 : }
     857                 : 
     858                 : inline void
     859           12411 : epoll_socket_service::destroy(io_object::implementation* impl)
     860                 : {
     861           12411 :     auto* epoll_impl = static_cast<epoll_socket*>(impl);
     862           12411 :     epoll_impl->close_socket();
     863           12411 :     std::lock_guard lock(state_->mutex_);
     864           12411 :     state_->socket_list_.remove(epoll_impl);
     865           12411 :     state_->socket_ptrs_.erase(epoll_impl);
     866           12411 : }
     867                 : 
     868                 : inline std::error_code
     869            4130 : epoll_socket_service::open_socket(tcp_socket::implementation& impl)
     870                 : {
     871            4130 :     auto* epoll_impl = static_cast<epoll_socket*>(&impl);
     872            4130 :     epoll_impl->close_socket();
     873                 : 
     874            4130 :     int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
     875            4130 :     if (fd < 0)
     876 MIS           0 :         return make_err(errno);
     877                 : 
     878 HIT        4130 :     epoll_impl->fd_ = fd;
     879                 : 
     880                 :     // Register fd with epoll (edge-triggered mode)
     881            4130 :     epoll_impl->desc_state_.fd = fd;
     882                 :     {
     883            4130 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     884            4130 :         epoll_impl->desc_state_.read_op    = nullptr;
     885            4130 :         epoll_impl->desc_state_.write_op   = nullptr;
     886            4130 :         epoll_impl->desc_state_.connect_op = nullptr;
     887            4130 :     }
     888            4130 :     scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
     889                 : 
     890            4130 :     return {};
     891                 : }
     892                 : 
     893                 : inline void
     894           20659 : epoll_socket_service::close(io_object::handle& h)
     895                 : {
     896           20659 :     static_cast<epoll_socket*>(h.get())->close_socket();
     897           20659 : }
     898                 : 
     899                 : inline void
     900           50696 : epoll_socket_service::post(epoll_op* op)
     901                 : {
     902           50696 :     state_->sched_.post(op);
     903           50696 : }
     904                 : 
     905                 : inline void
     906            4318 : epoll_socket_service::work_started() noexcept
     907                 : {
     908            4318 :     state_->sched_.work_started();
     909            4318 : }
     910                 : 
     911                 : inline void
     912             195 : epoll_socket_service::work_finished() noexcept
     913                 : {
     914             195 :     state_->sched_.work_finished();
     915             195 : }
     916                 : 
     917                 : } // namespace boost::corosio::detail
     918                 : 
     919                 : #endif // BOOST_COROSIO_HAS_EPOLL
     920                 : 
     921                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
        

Generated by: LCOV version 2.3