Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions include/exec/at_coroutine_exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

// The original idea is taken from libunifex and adapted to stdexec.

#include <exception>

#include "../stdexec/execution.hpp"

#include "any_sender_of.hpp"

#include <exception>
#include <tuple>

namespace experimental::execution
{
namespace __at_coro_exit
Expand Down Expand Up @@ -79,8 +80,8 @@ namespace experimental::execution

template <receiver _Receiver>
requires sender_to<_Sender, __receiver<_Receiver>>
auto
connect(_Receiver __rcvr) && noexcept -> connect_result_t<_Sender, __receiver<_Receiver>>
auto connect(_Receiver __rcvr) && noexcept //
-> connect_result_t<_Sender, __receiver<_Receiver>>
{
return STDEXEC::connect(static_cast<_Sender&&>(__sender_),
__receiver<_Receiver>{static_cast<_Receiver&&>(__rcvr)});
Expand Down Expand Up @@ -137,6 +138,12 @@ namespace experimental::execution
: __coro_(std::exchange(__that.__coro_, {}))
{}

~__task()
{
if (__coro_)
__coro_.destroy();
}

[[nodiscard]]
static constexpr auto await_ready() noexcept -> bool
{
Expand All @@ -148,7 +155,7 @@ namespace experimental::execution
//! coroutine exit; i.e., the coroutine that is co_await-ing the result of calling
//! at_coroutine_exit.
template <__has_continuation _Promise>
auto await_suspend(__std::coroutine_handle<_Promise> __parent) noexcept -> bool
auto await_suspend(__std::coroutine_handle<_Promise> __parent) -> bool
{
// Set the cleanup task's scheduler to the parent coroutine's scheduler.
__coro_.promise().__scheduler_ = get_start_scheduler(get_env(__parent.promise()));
Expand All @@ -163,11 +170,13 @@ namespace experimental::execution

auto await_resume() noexcept -> std::tuple<_Ts&...>
{
// Release the cleanup coroutine. It is now responsible for destroying itself in
// its final suspend.
return std::exchange(__coro_, {}).promise().__args_;
}

private:
struct __final_awaitable
struct __final_awaiter
{
static constexpr auto await_ready() noexcept -> bool
{
Expand All @@ -183,7 +192,7 @@ namespace experimental::execution
return STDEXEC_CORO_DESTROY_AND_CONTINUE(__h, __coro);
}

void await_resume() const noexcept {}
static constexpr void await_resume() noexcept {}
};

struct __env
Expand Down Expand Up @@ -211,7 +220,7 @@ namespace experimental::execution
}

[[nodiscard]]
auto final_suspend() noexcept -> __final_awaitable
auto final_suspend() noexcept -> __final_awaiter
{
return {};
}
Expand Down
47 changes: 40 additions & 7 deletions include/exec/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "../stdexec/__detail/__meta.hpp"
#include "../stdexec/__detail/__optional.hpp"
#include "../stdexec/__detail/__variant.hpp"
#include "../stdexec/coroutine.hpp"
#include "../stdexec/execution.hpp"
#include "../stdexec/functional.hpp"

Expand All @@ -42,10 +43,42 @@ namespace experimental::execution

// The required set_value_t() scheduler-sender completion signature is added in
// any_receiver_ref::any_sender::any_scheduler.
using __any_scheduler_completions =
using __any_scheduler_completions_t =
completion_signatures<set_value_t(), set_error_t(std::exception_ptr), set_stopped_t()>;

using __any_scheduler = any_scheduler<any_sender<any_receiver<__any_scheduler_completions>>>;
using __any_scheduler_impl_t =
any_scheduler<any_sender<any_receiver<__any_scheduler_completions_t>>>;

// A scheduler concept that does not check for copyability since that creates a cycle
// in the type system.
template <class _Scheduler>
concept __semi_scheduler = requires(_Scheduler& __sched) {
typename _Scheduler::scheduler_concept;
requires __std::derived_from<typename _Scheduler::scheduler_concept, scheduler_tag>;
{ schedule(__sched) } -> sender;
};

struct __any_scheduler
{
using scheduler_concept = scheduler_t;

template <__not_same_as<__any_scheduler> _Scheduler>
requires __semi_scheduler<_Scheduler>
constexpr __any_scheduler(_Scheduler __sched) noexcept
: __impl_(std::forward<_Scheduler>(__sched))
{}

bool operator==(__any_scheduler const & __other) const noexcept = default;

[[nodiscard]]
auto schedule() const
{
return __impl_.schedule();
}

private:
__any_scheduler_impl_t __impl_;
};

static_assert(scheduler<__any_scheduler>);

Expand Down Expand Up @@ -358,13 +391,13 @@ namespace experimental::execution
{
// Resuming the continuation of the parent coroutine will cause it to continue
// executing on the new scheduler.
__parent_.resume();
STDEXEC::__coroutine_resume_nothrow(__parent_);
}

void set_error(std::exception_ptr __eptr) noexcept
{
__eptr_ = std::move(__eptr);
__parent_.resume();
STDEXEC::__coroutine_resume_nothrow(__parent_);
}

void set_stopped() noexcept
Expand All @@ -373,7 +406,7 @@ namespace experimental::execution
// a promise that can handle the stopped signal. The coroutine referred to by
// __continuation_ will never be resumed.
__std::coroutine_handle<> __unwind = __parent_.promise().unhandled_stopped();
__unwind.resume();
STDEXEC::__coroutine_resume_nothrow(__unwind);
}

[[nodiscard]]
Expand Down Expand Up @@ -505,7 +538,7 @@ namespace experimental::execution
using __scheduler_t =
__call_result_or_t<get_start_scheduler_t, STDEXEC::inline_scheduler, _Context>;

struct __final_awaitable
struct __final_awaiter
{
static constexpr auto await_ready() noexcept -> bool
{
Expand Down Expand Up @@ -535,7 +568,7 @@ namespace experimental::execution
return {};
}

constexpr auto final_suspend() noexcept -> __final_awaitable
constexpr auto final_suspend() noexcept -> __final_awaiter
{
return {};
}
Expand Down
Loading
Loading