Skip to content

Submitting task to a specific thread inbox with tmc::post with tid might still lead to sub-optimal utilization #218

@fede-vaccaro

Description

@fede-vaccaro

Hello there!

While I was doing benchmarks on my project with tmc I noticed that after the threads belonging to a ex_cpu go to sleep, there was sub-optimal thread utilization, meaning that (for example) out of 12 threads, only 8 were used, leading to worse performance.

If I was iterating over the same job

for(int i = 0; i < n_iters; i++)
{
   tmc.post_waitable(make_task()).wait();
   std::this_thread::sleep(std::chrono::seconds{1}): // Not necessary, but is enough for the entire threadpool to go to sleep. 
}

So I noticed that after the first run the performance were noticeably degradating. Increasing the number of spins before sleep would help; I don't consider it a solution though as it looks related to the "sleep/awake" mechanism.

Following I started investigating (or, I'll be honest, it was mostly about Claude investigating in this direction) and came with a broad analysis and a toy example that follows my tmc usage pattern, plus a small work-around for preventing this situation.

The toy example

// tmc_migration_toy.cc
//
// Minimal reproduction of the "worker drifts away from its initial thread"
// phenomenon in TMC, intended to be sent to the TMC author.
//
// SETUP:
//   - N worker tasks, each posted to a specific thread `tid` via
//     post_waitable(pool, task, 0, tid).
//   - Each worker uses a sliding-window pattern: tmc::semaphore + fork_group.
//     Each forked child does `co_await tmc::yield()` (which re-posts itself
//     to the executor with NO_HINT, so it can migrate), then sem.release().
//   - We record at every loop iteration which thread the worker is currently
//     running on, via tmc::current_thread_index(). With perfect affinity to
//     the originally-requested thread, the resulting matrix should be diagonal.
//
// Build (header-only, no hwloc):
//   g++ -std=c++23 -O2 -DTMC_IMPL -I /path/to/TooManyCooks/include tmc_migration_toy.cc -lpthread -o tmc_migration_toy
//
// (add -DTMC_USE_HWLOC -lhwloc if your TMC uses hwloc — required if HW topology
//  pinning is desired; not strictly necessary for this demo.)
//
// EXPECTED (if TMC preserved a task's original thread across suspend/resume):
//   Diagonal matrix — each worker runs every iteration on its original thread.
//
// ACTUAL:
//   Far-from-diagonal — workers drift to whichever thread happens to release
//   the semaphore. After enough iterations the distribution clusters around a
//   small number of "winner" threads while others sit idle.
//
// CAUSE:
//   TMC tasks have no concept of a "home thread." A task lives in an *executor*;
//   which thread within that executor runs it at any moment is the scheduler's
//   choice.
//
//   When a task suspends on co_await sem, the waiter saves
//       continuation_executor = this_thread::executor()        (waiter_list.ipp:28)
//   That's an executor pointer — there's no thread index field. When
//   sem.release() fires, it routes the resumed task back through
//       post_checked(continuation_executor, h, prio)           (waiter_list.ipp:45)
//   with default ThreadHint = NO_HINT. From a TMC thread, that falls into
//   ex_cpu::post's enqueue_ex_cpu branch (ex_cpu.ipp:469), which puts the work
//   on *the releaser's* local LIFO queue. The releaser then most likely picks
//   the work up itself (its own LIFO is the first thing try_run_some checks),
//   so the resumed waiter runs on the releaser's thread — not its original one.
//
//   post_waitable(pool, task, 0, tid) only pins the *initial* placement of the
//   wrapper task. Once the inner task suspends and resumes, the thread index is
//   gone — there is nowhere to store it across the suspension.
//
//   In a sliding-window I/O pipeline (one io_uring context per thread — the
//   natural pattern for high-throughput async I/O): the forked child issues I/O
//   on the worker's current thread, the CQE is reaped on that thread, child
//   resumes there, sem.release fires from there — and the worker migrates to
//   that thread. The migration is self-reinforcing: every subsequent fork goes
//   to the new thread, every I/O issues on its context, every CQE reaps there,
//   every release fires from there. Workers cluster on a small number of
//   threads while others sit idle, and total throughput drops accordingly.
//
//   ex_any::post(h, prio, ThreadHint) is the *only* TMC API that takes a thread
//   hint. No high-level awaitable (semaphore acquire/release, yield, resume_on,
//   spawn, fork_group::fork) exposes it. So user code that needs thread
//   affinity across suspend points must bypass the high-level awaitables.
//
// FIX:
//   Re-pin the worker to its original thread after every co_await sem, via a
//   tiny custom awaitable that uses the ThreadHint argument of ex_any::post
//   (which the existing TMC awaitables — semaphore release, yield, resume_on,
//   spawn, fork_group::fork — do not expose). With that re-pin in place, the
//   matrix is diagonal as expected.

#include "tmc/all_headers.hpp"

#include <atomic>
#include <coroutine>
#include <cstddef>
#include <future>
#include <iomanip>
#include <iostream>
#include <vector>

namespace
{
    constexpr size_t NUM_WORKERS = 8;
    constexpr size_t NUM_ITERS = 5000;
    constexpr size_t WINDOW = 64;

    struct stats
    {
        std::vector<std::vector<std::atomic_size_t>> samples;

        explicit stats(size_t n) : samples(n)
        {
            for(auto& v : samples)
                v = std::vector<std::atomic_size_t>(n);
        }
    };

    // Custom awaitable that re-pins the continuation to a specific thread,
    // by using ex_any::post's ThreadHint argument. Workaround for the
    // phenomenon this toy demonstrates.
    struct pin_to_thread
    {
        tmc::ex_any* executor;
        size_t thread_hint;

        bool await_ready() const noexcept { return false; }
        void await_suspend(std::coroutine_handle<> h) const noexcept
        {
            executor->post(std::move(h), 0, thread_hint); // NOLINT(performance-move-const-arg)
        }
        void await_resume() const noexcept {}
    };

    tmc::task<void> child(tmc::semaphore& sem)
    {
        // co_await yield re-posts this coroutine to the executor with NO_HINT
        // (see aw_yield.hpp::yield_impl), giving the scheduler an opportunity
        // to resume it on a different thread.
        co_await tmc::yield();
        sem.release();
    }

    tmc::task<void> worker_drifting(size_t worker_id, stats& s)
    {
        auto fg = tmc::fork_group();
        tmc::semaphore sem(WINDOW);

        for(size_t i = 0; i < NUM_ITERS; ++i)
        {
            co_await sem;
            s.samples[worker_id][tmc::current_thread_index()]
                .fetch_add(1, std::memory_order_relaxed);
            fg.fork(child(sem));
        }
        co_await std::move(fg);
    }

    tmc::task<void> worker_pinned(size_t worker_id, stats& s, tmc::ex_any* executor)
    {
        auto fg = tmc::fork_group();
        tmc::semaphore sem(WINDOW);

        for(size_t i = 0; i < NUM_ITERS; ++i)
        {
            co_await sem;
            co_await pin_to_thread{executor, worker_id};
            s.samples[worker_id][tmc::current_thread_index()]
                .fetch_add(1, std::memory_order_relaxed);
            fg.fork(child(sem));
        }
        co_await std::move(fg);
    }

    void print_matrix(const char* title, const stats& s)
    {
        std::cout << "\n=== " << title << " ===\n";
        std::cout << "rows = worker_id (= original thread requested via post_waitable)\n";
        std::cout << "cols = tmc::current_thread_index() observed at iteration\n\n";
        std::cout << "         ";
        for(size_t t = 0; t < NUM_WORKERS; ++t)
            std::cout << " th" << std::setw(2) << t << "  ";
        std::cout << "\n";
        for(size_t w = 0; w < NUM_WORKERS; ++w)
        {
            std::cout << "worker " << w << ": ";
            for(size_t t = 0; t < NUM_WORKERS; ++t)
                std::cout << std::setw(5) << s.samples[w][t].load() << "  ";
            std::cout << "\n";
        }
    }
} // namespace

int main()
{
    tmc::ex_cpu pool;
    pool.set_thread_count(NUM_WORKERS).init();
    tmc::ex_any* executor = pool.type_erased();

    // --- Run 1: drifting workers ---
    {
        stats s(NUM_WORKERS);
        std::vector<std::future<void>> futures;
        futures.reserve(NUM_WORKERS);
        for(size_t tid = 0; tid < NUM_WORKERS; ++tid)
            futures.push_back(tmc::post_waitable(pool, worker_drifting(tid, s), 0, tid));
        for(auto& f : futures)
            f.get();
        print_matrix("DRIFTING (no thread pin) — expect off-diagonal", s);
    }

    // --- Run 2: re-pinned workers ---
    {
        stats s(NUM_WORKERS);
        std::vector<std::future<void>> futures;
        futures.reserve(NUM_WORKERS);
        for(size_t tid = 0; tid < NUM_WORKERS; ++tid)
            futures.push_back(tmc::post_waitable(pool, worker_pinned(tid, s, executor), 0, tid));
        for(auto& f : futures)
            f.get();
        print_matrix("PINNED (re-pin every iteration) — expect diagonal", s);
    }

    pool.teardown();
    return 0;
}

Possible output

=== DRIFTING (no thread pin) — expect off-diagonal ===
rows = worker_id (= original thread requested via post_waitable)
cols = tmc::current_thread_index() observed at iteration

          th 0   th 1   th 2   th 3   th 4   th 5   th 6   th 7  
worker 0:     0      0      0   5000      0      0      0      0  
worker 1:     0      0      0      0      0    653   4347      0  
worker 2:     0      0      0      0      0   5000      0      0  
worker 3:     0    596   3657     40    288      0      0    419  
worker 4:     0      0      0      0      0    273    553   4174  
worker 5:   252   4012    415      0     20    101      0    200  
worker 6:  3961      0      0     10      0     31    746    252  
worker 7:   259    345     92     85   4213      0      0      6  

=== PINNED (re-pin every iteration) — expect diagonal ===
rows = worker_id (= original thread requested via post_waitable)
cols = tmc::current_thread_index() observed at iteration

          th 0   th 1   th 2   th 3   th 4   th 5   th 6   th 7  
worker 0:   110    716    152   1020    152    991    930    929  
worker 1:   146    818    118    972    158    957    935    896  
worker 2:   148    819    128   1089    133    891    890    902  
worker 3:   114    822    134   1015    126    926    957    906  
worker 4:   170    737    132   1049    137    895    980    900  
worker 5:   152    680    154   1140    143    984    912    835  
worker 6:   143    819    164   1146    121    890    893    824  
worker 7:   159    721    138   1052    115    961    977    877  

We can that, despite hinting to a specific thread, the distribution is skewed towards a subset of worker threads.

EDIT: removed unhelping wall of text

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions