Tasks are a popular concurrent programming mechanism. It allows decomposing a computational problem into a set of isolated sub-problems, i.e. tasks. This decomposition exposes the underlying parallelism of the problem. Meaning that at least some tasks can be executed in parallel, and without any explicit synchronization.

This post covers the design and implementation of a very simple yet useful task library.

Tasks are used both for parallelism (CPU-intensive tasks, think parallel sorting) and concurrency (latency-reducing tasks, think responsive UI).

The beauty of tasks is their composability. One task might compute a value, while others depend on that value. Generally speaking, tasks make up a directed acyclic graph, or DAG.

A task library must consist of two parts. An API to specify the task DAG, and a scheduler that actually executes that DAG.

I will start by describing the desired features of the task library and then explain their implementation.

Creating tasks

There are several essential operations that pop up in almost any task-based program.

Creating a new task is an obvious first step. So we need a task type to work with.

template <class T>
class Task;

A task is parameterized by the type of the value it computes. A task computing a factorial might have the type Task<int>, and a task requesting a file path from the user–Task<std::string>.

It is often possible for a task to outlive the scope where it has been constructed. This means that pointers and dynamic allocation are used in most cases. One way to simplify the memory management would be implementing Task in a way similar to the pimpl idiom, making its state shared among (cheap) copies. Such a feature might be useful and desirable, but it complicates the implementation. For now, I simply define a helper pointer alias to save some typing.

template <class T>
using SPTask = std::shared_ptr<Task<T>>;

To create a new task instance, the make_task function is used.

template <class F, class... Ts>
SPTask<T> make_task(F function, Ts... tasks);

Using a free function instead of constructor makes it possible to create an instance of a class derived from Task<T>. This gives more freedom to the implementation.

make_task takes as an argument a function that does the computation, and a list of predecessor tasks–dependencies, which results will be passed on to function.

For example:

auto getInt = [=] { return 1; };
auto task = make_task(getInt);

This is the simplest use of make_task. getInt doesn’t take any parameters, and as such the resulting task does not depend on any other task.

Here is a more complex example:

auto get3 = make_task([=] { return 3; });
auto get5 = make_task([=] { return 5; });
auto sum = make_task([=] (int i, int j) { return i + j; }, get3, get5);
auto result = sum->result(); // result == 8

Now there is a dependency. The sum task will only start its execution once both get3 and get5 complete. But get3 and get5 do not depend on each other and can execute in parallel.

The last line in the snippet blocks until all the tasks complete and gets the result from sum.

One important case is creating a task inside a task. This might be conditional on the result of a previously completed task.

auto task = make_task([] { return make_task([] { return 1; }; });

What type should task have? SPTask<SPTask<int>> doesn’t look very useful. It would be quite a chore to compose it with other tasks. For now let’s assume that make_task is smart enough to collapse SPTask<SPTask<int>> into SPTask<int> automatically:

SPTask<int> task = make_task([=] { return make_task([=] { return 1; }; });
int result = task->result(); // result == 1

Another useful feature is the ability to specify dependencies at runtime in addition to compile time. For example:

std::vector<SPTask<int>> tasks; // already created
SPTask<std::vector<int>> combined = when_all(tasks);
auto sum = make_task([=] (std::vector<int> ints) {
    return std::accumulate(begin(ints), end(ints), 0);
}, combined);

Note the conversion from std::vector<SPTask<int>> to SPTask<std::vector<int>>. This operation is very common and I feel that providing it is essential even for this simple task library.

make_task and when_all is our complete task-creating API.

Implementing the scheduler

The scheduler does not need to know about task results and their types. It is also oblivious to the dependencies among tasks. This allows for a trivial thread pool implementation. A simpler task interface than Task<T> is sufficient.

struct ITask {
    virtual void operator()() = 0;
    virtual ~ITask() = default;
};

The scheduler has a queue to which new tasks are continuously pushed. Also, there is a predetermined number of threads, that extract tasks from the queue and execute them one by one.

Task Scheduler

The task DAG is assumed to be fed into the queue in topological order (dependencies first). How this can be achieved is the most interesting part of the library and I will discuss it in detail later.

class Scheduler {
    static std::unique_ptr<Scheduler> _instance;
    Queue<std::shared_ptr<ITask>> _queue;
    std::vector<std::thread> _threads;

    Scheduler();

public:
    static Scheduler* instance();
    static void init();
    static void shutdown();
    void schedule(std::shared_ptr<ITask> task);
};

In addition to _queue and _threads, there is a static _instance. init and shutdown need to be called at the startup and shutdown of the application, respectively.

The constructor initializes a thread pool

Scheduler::Scheduler() {
    auto body = [&] {
        for (;;) {
            std::shared_ptr<ITask> task;
            _queue.pop(task);
            if (!task)
                return;
            (*task)();
        }
    };

    _threads.resize(std::thread::hardware_concurrency());
    for (auto& th : _threads) {
        th = std::thread(body);
    }
}

Each thread continues to execute tasks until it receives a nullptr.

The single instance is created by init

void Scheduler::init() {
    _instance.reset(new Scheduler());
}

And the threads are finally joined by shutdown

void Scheduler::shutdown() {
    auto i = _instance.get();

    for (auto j = 0u; j < i->_threads.size(); ++j) {
        i->_queue.push({});
    }

    for (auto& th : i->_threads) {
        th.join();
    }
}

schedule simply pushes the provided task onto the queue

void Scheduler::schedule(std::shared_ptr<ITask> task) {
    _queue.push(task);
}

The Queue type is a simple (and inefficient) thread-safe wrapper of std::queue

template <class T>
class Queue {
    std::queue<T> _queue;
    std::mutex _m;
    std::condition_variable _cv;

public:
    void push(T value) {
        auto lock = std::unique_lock(_m);
        _queue.push(value);
        _cv.notify_one();
    }

    void pop(T& value) {
        auto lock = std::unique_lock(_m);
        _cv.wait(lock, [this] { return !_queue.empty(); });
        value = _queue.front();
        _queue.pop();
    }
};

This is the scheduler in its entirety.

One question remains. If Scheduler doesn’t know anything about task dependencies and blindly schedules tasks on the thread pool, then who does the actual blocking? The answer is: tasks. Tasks themselves are responsible for synchronization.

Scheduling strategies

If we had a separate thread for each task, then we would not need to worry about scheduling at all. No matter how many tasks are waiting for their dependencies, they could be certain that sooner or later those dependencies would complete.

Unfortunately, allocating a separate thread for every task can easily become prohibitively expensive, especially on the platforms where creating new threads is costly. Reusing existing threads is a better idea.

As discussed previously, a thread pool implementation requires the task DAG to be processed in topological order. Otherwise deadlocks might occur. To guarantee the topological order, tasks need to be scheduled with care.

Next I will discuss the simple immediate scheduling strategy and show why it fails to guarantee the right scheduling order. Then I will explore a more complex strategy, that only schedules tasks when all their dependencies have completed.

Scheduling strategy 1. Immediate scheduling.

In this strategy tasks go through a series of states:

  • creation and immediate scheduling (pushing onto the thread pool queue) by make_task
  • blocking while waiting for dependencies and then executing (both inside operator())
  • being in a completed state, holding the result until the last shared pointer has been destroyed

This implementation is very simple, but it creates two serious problems.

First, the waiting for the dependencies is done on the thread pool, meaning that while waiting, no useful work can be done by the occupied thread. The thread executing such a task simply yields to the OS.

Second, and more important, is a potential for deadlocks when tasks are created inside other tasks. To understand how this is possible consider the following code:

auto getIntTask1 = make_task([] { return 0; });
auto getIntTask2 = make_task([] { return 3; });
auto task = make_task([](int x, int y) {
    std::vector<std::shared_ptr<Task<int>>> tasks;
    for (int i = x; i < y; ++i) {
        tasks.push_back(make_task([=]{ return i * 2; }));
    }
    return when_all(tasks);
}, getIntTask1, getIntTask2);

auto sumTask = make_task([](auto vec) {
    return std::accumulate(begin(vec), end(vec), 0);
}, task);

assert(sumTask->result() == 6);

task waits for getIntTask1 and getIntTask2 to complete, then, based on the result, creates three new tasks and waits for their completion, producing a vector of squares.

sumTask waits for task to complete and sums up the resulting squares.

If there are at least two threads in the thread pool, then everything is fine. But if there is only one thread, a deadlock is possible. Furthermore, it is subject to timing.

Look at the Queue (a) below. Arrows show the dependencies among tasks. The dashed rectangles are tasks created dynamically by task.

Dynamic Task Scheduling

Both getIntTask1 and getIntTask2 are scheduled immediately. Then task is scheduled. So far so good, task has not yet created new tasks, including the new task created inside when_all. sumTask gets scheduled next, after task, but before the when_all task.

This leads to a situation in that getIntTask1, getIntTask2 and task complete, and then sumTask’s operator() is called by the scheduler and sumTask blocks, waiting for tasks 1, 2, 3 and when_all. But none of them are ever going to be executed, because task now occupies the only thread in the thread pool. A deadlock.

If, on the other hand, task completes before sumTask has been scheduled, then all is well. sumTask would be placed after when_all and no deadlock occurs.

This situation is particularly insidious given that the number of threads in the thread pool might not be known upfront.

Sometimes it is possible to carefully rewrite the code to guarantee the correct schedule ordering

auto getIntTask1 = make_task([] { return 0; });
auto getIntTask2 = make_task([] { return 3; });
auto task = make_task([] (int x, int y) {
    std::vector<SPTask<int>> tasks;
    for (int i = x; i < y; ++i) {
        tasks.push_back(make_task([=]{ return i * 2; }));
    }
    return make_task([] (auto vec) {
        return std::accumulate(begin(vec), end(vec), 0);
    }, when_all(tasks));
}, getIntTask1, getIntTask2);

assert(task->result() == 6);

This code will always create the ordering shown in Queue (b) above.

There are ways to automatically resolve such conflicts. For example, in addition to the three states scheduled, executing and completed, a new unscheduled state can be introduced. A task with dependencies would not be placed into the queue immediately, but instead each of its dependencies would carry a pointer to it. And the last dependency to complete would trigger the scheduling of the unscheduled task. Such design is safer, but requires additional synchronization and bookkeeping.

Any serious task library should offer a solution to this problem.

Scheduling strategy 2. Continuation-style scheduling.

In this strategy a task is scheduled after the last of its dependencies has been scheduled or executed. If a task does not have any dependencies, it is scheduled immediately upon creation. This solves the problem discussed in the previous section. It makes code independent of the thread pool size and removes the possibility of deadlocks. The downside of this strategy is added complexity.

Any task with dependencies becomes a continuation. Once a continuation is created, it needs to be attached to all its dependencies.

Consider the following code

auto task1 = make_task([] {
    // long computation
    int a = 1;
    return make_task([a] {     // task2
        // long computation
        int b = a + 1;
        return make_task([b] { // task3
            return b + 1;
        });
    });
});

auto task4 = make_task([] (int i) {
    return i * i;
}, task1);

// task4->result() == 9

Visually the task DAG looks like this

Dynamic Tasks

(Dashed outlines mean that the tasks are created by other tasks.)

There are two essential properties that we want to maintain:

  1. task4 must await task3 and not task1.
  2. task4 should only be scheduled after task3.

The first property is guaranteed simply by blocking until the dependency results are available. This property does not depend on scheduling.

The second property is harder to achieve.

In the example above it is assumed that task1 (without considering the tasks it creates) does a long computation before it finally creates and returns task2. This means that task4 is almost certainly created before task2 and task3. At this point task1 is either waiting in the scheduler queue (scheduled) or is being executed (running). In both cases we do not want to schedule task4 immediately. We want to wait until task3 has been scheduled.

After having created task4, we attach the new task to task1 and do not schedule it. Once task1 completes, it still does not schedule task4, instead, it reattaches task4 to task2. When task2 completes, it passes task4 further down to task3. And only when task3 completes, it finally schedules task4 (because task3 does not return any new tasks).

This is one possible sequence of events. It is also possible that by the time task4 is created, the whole chain task1 -> task2 -> task3 has already completed, which means that task4 must be scheduled immediately upon creation. Another possibility is that task4 is created after task1 has completed, but task2 has not. In this case task4 must be attached to task2. In general, it means that to which task a dependency gets attached is decided at runtime and is subject to timing.

To implement this strategy, a new class is created, an instance of which is held by all tasks

class ContinuationHolder {
    std::mutex _m;
    std::vector<std::function<void()>> _actions;
    bool _scheduled = false;
    ContinuationHolder* _transferred = nullptr;

public:
    void attach(std::function<void()> action);
    void schedule();
    void transfer(ContinuationHolder* dest);
};

Instead of attaching actual ITasks, a std::function is used to reduce coupling.

ContinuationHolder has three states, that are related to the state of the associated task:

  1. Accepting new continuations (the task is scheduled or running). New actions in this state are being added to a list.
  2. Scheduled (the task has completed and not returned a new task). New actions are executed immediately.
  3. Transferred (the task has completed and returned a new task, to which it reattached its continuations). New actions are delegated to the next instance of ContinuationHolder.

The implementation is straightforward. schedule changes the state and executes all previously attached actions:

void ContinuationHolder::schedule() {
    {
        auto lock = std::lock_guard(_m);
        assert(!_scheduled);
        _scheduled = true;
    }
    for (auto& action : _actions) {
        action();
    }
}

transfer works similarly

void ContinuationHolder::transfer(ContinuationHolder* dest) {
    {
        auto lock = std::lock_guard(_m);
        assert(!_scheduled);
        _transferred = dest;
    }
    for (auto& action : _actions) {
        dest->attach(action);
    }
}

And attach dispatches action based on the state:

void ContinuationHolder::attach(std::function<void()> action) {
    auto lock = std::lock_guard(_m);
    if (_transferred) {
        _transferred->attach(action);
    } else if (_scheduled) {
        action();
    } else {
        _actions.push_back(action);
    }
}

Now we have a suitable scheduling strategy and the tools to finally implement Task<T>.

Implementing Task<T>

The implementation will be split between Task<T> and TaskImpl<F, Ps...>. Task<T> is the user-facing part. It is abstract (it doesn’t implement operator()) and never created directly.

template <class T>
class Task : public ITask, public std::enable_shared_from_this<Task<T>> {
protected:
    std::shared_future<T> _future;
    std::atomic<int> _unscheduledDeps = 0;
    ContinuationHolder _ch;

    template <class Ff, class... Psf>
    friend class TaskImpl;

public:
    using R = T;

    Task(int unscheduledDeps) : _unscheduledDeps(unscheduledDeps) { }

    R result() {
        return _future.get();
    }

    void wait() override {
        _future.get();
    }

    void addOnSchedule(std::function<void()> action) {
        _ch.attach(action);
    }

    ContinuationHolder* ch() {
        return &_ch;
    }

    void signalDependency() {
        if (!--_unscheduledDeps) {
            Scheduler::instance()->schedule(this->shared_from_this());
        }
    }
};

A shared future is used because several tasks might await the same dependency task.

_unscheduledDeps is the number of currently unscheduled dependencies. This number is decremented each time a dependency completes. Once the number reaches zero, ContinuationHolder schedules all attached continuations.

The friend declaration is necessary to allow Task<T> access the _future member of another Task<Z>, where T and Z are different types. Say, Task<int> should be able to access Task<float>::_future.

Task<T> also exposes the type of its value, R.

template <class F, class... Ps>
class TaskImpl : public Task<std::invoke_result_t<F, typename Ps::element_type::R...>> {
public:
    using R = std::invoke_result_t<F, typename Ps::element_type::R...>;

private:
    std::promise<R> _promise;
    F _body;
    std::tuple<Ps...> _deps;

public:
    TaskImpl(int depCount, F body, Ps... ps)
        : Task<R>(depCount), _body(body), _deps(ps...) {
        Task<R>::_future = _promise.get_future();
    }

    void operator()() override {
        auto i = [&](Ps... ps) {
            _promise.set_value(_body(ps->_future.get()...));
        };
        std::apply(i, _deps);
        if constexpr (TaskTraits<R>::isTask) {
            auto& task = Task<R>::_future.get();
            Task<R>::_ch.transfer(task->ch());
        } else {
            Task<R>::_ch.schedule();
        }
    }
};

Note that the type T in Task<T> is not specified explicitly in TaskImpl<F, Ps...>. Instead it is the result type of callable F.

The dependencies Ps... are assumed to be shared pointers: std::shared_ptr<Task<T>>. The type T is then computed as

std::invoke_result_t<F, typename Ps::element_type::R...>

_promise and _body should be self-explanatory.

_deps is a heterogeneous container of the task dependencies. Given that the number of dependencies and their types are known at compile time, std::tuple<> is the obvious choice.

The constructor simply initializes all the variables.

The operator() is the heart of TaskImpl<>. It first gets the results of all dependencies, blocking if they are not yet ready. It then passes the results of all dependencies to _body, and stores the returned value inside this task’s _promise. Then, depending on the return type of F, either transfers the attached continuations to the result task or schedules them directly.

TaskTraits<T> is a simple trait template:

template <class T>
struct TaskTraits {
    static constexpr bool isTask = false;
};

template <class T>
struct TaskTraits<SPTask<T>> {
    static constexpr bool isTask = true;
};

Implementing make_task

make_task is a utility function that creates an instance of TaskImpl<>, schedules it, and collapses values of type SPTask<SPTask<T>> into SPTask<T>. The last part is done using TaskJoiner:

template <class T>
struct TaskJoiner {
    T operator()(T t) {
        return t;
    }
};

template <class T>
struct TaskJoiner<SPTask<SPTask<T>>> {
    auto operator()(auto task) {
        return make_task([=] (auto x) { return x->result(); }, task);
    }
};

TaskJoiner<> uses partial template specialization to pattern-match SPTask<SPTask<T>>. Notably, in the general case the T type is SPTask<>, while in the specialized case it is not a task at all, but a concrete return type like int or float.

template <class F, class... T>
auto make_task(F f, T... x) {
    const auto depCount = sizeof...(T);
    auto task = std::make_shared<TaskImpl<F, T...>>(depCount, f, x...);
    if constexpr (depCount == 0) {
        Scheduler::instance()->schedule(task);
    } else {
        (..., x->addOnSchedule([=] {
            task->signalDependency();
        }));
    }
    using TaskType = SPTask<typename TaskImpl<F, T...>::R>;
    return TaskJoiner<TaskType>()(task);
}

Now make_task needs some unpacking. First, it always creates a new task, passing to it the number of dependencies. The number of dependencies in this case is known at compile time.

If there are no dependencies, it means the task can be scheduled immediately. Otherwise the newly created task is attached to each dependency in the parameter pack using a folding expression.

The last step applies TaskJoiner<T> to the result type.

Implementing when_all

when_all is very similar to make_task, except that the dependencies are only known at runtime, and the resulting task is always of type std::vector<T> and as such does not need TaskJoiner<T>.

template <class R>
auto when_all(std::vector<SPTask<R>> deps) {
    assert(!deps.empty());
    auto collect = [=] {
        std::vector<R> vec;
        for (auto dep : deps) {
            vec.push_back(dep->result());
        }
        return vec;
    };
    auto task = std::make_shared<TaskImpl<decltype(collect)>>(
        deps.size(), collect);
    for (auto dep : deps) {
        dep->addOnSchedule([=] {
            task->signalDependency();
        });
    }
    return static_cast<SPTask<std::vector<R>>>(task);
}

Now the complete task-creating API is implemented.

The lack of exception handling.

So far I completely ignored exceptions. Any exception inside the thread pool will just terminate the program. One way to handle exceptions is to catch exceptions thrown inside task bodies and put them into std::promise. Then instead of passing unpacked dependency results as arguments, the std::futures could be passed directly. The dependant tasks’ bodies would then access the passed std::futures and trigger the previously captured exception, allowing them to either handle it, or pass it further down the task DAG.

For the sake of simplicity I omitted exception handling.

Bonus: It looks suspiciously monadic!

Indeed, make_task combines the function of monadic return and bind

bind :: M T -> (T -> M R) -> M R

Which is the same as

std::function<SPTask<R>(T)> f;
SPTask<T> t;
SPTask<R> = make_task(f, t);

thanks to the collapsing rule Task<Task<T>> => Task<T>, which is itself the monadic join

join :: M M T -> M T

Of course, there is also the monadic return

return :: T -> M T

The function of which is achieved by

SPTask<T> task = make_task([=] { return T{}; });

I have not checked the monad laws, so I am not sure Task<T> is a true monad, but in any case, it feels like one.

An example: implementing quicksort using tasks

Quicksort is an easy algorithm to parallelize. The sequential version looks like this:

template <class Iter>
Iter qsPartition(Iter begin, Iter end) {
    auto i = begin, p = end - 1;
    for (auto j = begin; j < p; ++j) {
        if (*j <= *p) {
            std::iter_swap(i, j);
            ++i;
        }
    }
    std::iter_swap(p, i);
    return i;
}

template <class Iter>
void quickSortSeq(Iter begin, Iter end) {
    if (begin >= end)
        return;
    auto p = qsPartition(begin, end);
    quickSortSeq(begin, p),
    quickSortSeq(p + 1, end);
}

Once the partition step is done, the elements to the left and right of the pivot element can be sorted independently. Here is a parallel version:

template <class Iter>
SPTask<int> quickSort(Iter begin, Iter end) {
    if (begin >= end)
        return make_task([] { return 0; });
    auto partition = make_task([=] { return qsPartition(begin, end); });
    auto recurse = make_task([=] (auto p) {
        return when_all(std::vector{
            quickSort(begin, p),
            quickSort(p + 1, end)});
    }, partition);
    return make_task([] (auto) { return 0; }, recurse);
}

The partition step is done in a separate task. Once it completes, two new quickSort task are created. When both of them complete, the whole initial invocation of quickSort completes.

If when_all had supported ITask in addition to Task<T> and also accepted variadic packs (and not only std::vectors), we could improve the code, making it more natural:

template <class Iter>
std::shared_ptr<ITask> quickSort(Iter begin, Iter end) {
    if (begin >= end)
        return make_task([] { return 0; });
    auto partition = make_task([=] { return qsPartition(begin, end); });
    return make_task([] (auto p) {
        return when_all(
            quickSort(begin, p),
            quickSort(p + 1, end));
    }, partition);
}

As is, this parallel version is unlikely to be faster than the sequential one, due to the overhead created by tasks. A simple way to overcome this problem is to fallback to the sequential version when the array size is small, say 1000 elements or less.

Conclusion

In under 300 lines of code, we got ourselves a surprisingly capable task library. There are, of course, many improvements to be made. The queue might benefit from some performance optimization, the API might use some polish, more features such as when_any might be provided, as well as the ability to query the task state: unscheduled, scheduled, running, completed. Making ITask a first-class citizen would allow working more easily with tasks that do not return any value. Exception handling and propagation is also an obvious next step.

Still, the library is done completely using standard C++ and is pretty expressive thanks to the use of some basic template metaprogramming.

The complete code with a few test cases can be found on github.