Creating a tiny C++ task library
Tasks are a popular concurrent programming mechanism. It allows decomposing a computational problem into a set of isolated sub-problems, i.e. tasks. This decomposition exposes the underlying parallelism of the problem. Meaning that at least some tasks can be executed in parallel, and without any explicit synchronization.
This post covers the design and implementation of a very simple yet useful task library.
Tasks are used both for parallelism (CPU-intensive tasks, think parallel sorting) and concurrency (latency-reducing tasks, think responsive UI).
The beauty of tasks is their composability. One task might compute a value, while others depend on that value. Generally speaking, tasks make up a directed acyclic graph, or DAG.
A task library must consist of two parts. An API to specify the task DAG, and a scheduler that actually executes that DAG.
I will start by describing the desired features of the task library and then explain their implementation.
Creating tasks
There are several essential operations that pop up in almost any task-based program.
Creating a new task is an obvious first step. So we need a task type to work with.
template <class T>
class Task;
A task is parameterized by the type of the value it computes. A task computing a factorial might have the type Task<int>
, and a task requesting a file path from the user–Task<std::string>
.
It is often possible for a task to outlive the scope where it has been constructed. This means that pointers and dynamic allocation are used in most cases. One way to simplify the memory management would be implementing Task
in a way similar to the pimpl idiom, making its state shared among (cheap) copies. Such a feature might be useful and desirable, but it complicates the implementation. For now, I simply define a helper pointer alias to save some typing.
template <class T>
using SPTask = std::shared_ptr<Task<T>>;
To create a new task instance, the make_task
function is used.
template <class F, class... Ts>
SPTask<T> make_task(F function, Ts... tasks);
Using a free function instead of constructor makes it possible to create an instance of a class derived from Task<T>
. This gives more freedom to the implementation.
make_task
takes as an argument a function that does the computation, and a list of predecessor tasks–dependencies, which results will be passed on to function
.
For example:
auto getInt = [=] { return 1; };
auto task = make_task(getInt);
This is the simplest use of make_task
. getInt
doesn’t take any parameters, and as such the resulting task does not depend on any other task.
Here is a more complex example:
auto get3 = make_task([=] { return 3; });
auto get5 = make_task([=] { return 5; });
auto sum = make_task([=] (int i, int j) { return i + j; }, get3, get5);
auto result = sum->result(); // result == 8
Now there is a dependency. The sum
task will only start its execution once both get3
and get5
complete. But get3
and get5
do not depend on each other and can execute in parallel.
The last line in the snippet blocks until all the tasks complete and gets the result from sum
.
One important case is creating a task inside a task. This might be conditional on the result of a previously completed task.
auto task = make_task([] { return make_task([] { return 1; }; });
What type should task
have? SPTask<SPTask<int>>
doesn’t look very useful. It would be quite a chore to compose it with other tasks. For now let’s assume that make_task
is smart enough to collapse SPTask<SPTask<int>>
into SPTask<int>
automatically:
SPTask<int> task = make_task([=] { return make_task([=] { return 1; }; });
int result = task->result(); // result == 1
Another useful feature is the ability to specify dependencies at runtime in addition to compile time. For example:
std::vector<SPTask<int>> tasks; // already created
SPTask<std::vector<int>> combined = when_all(tasks);
auto sum = make_task([=] (std::vector<int> ints) {
return std::accumulate(begin(ints), end(ints), 0);
}, combined);
Note the conversion from std::vector<SPTask<int>>
to SPTask<std::vector<int>>
. This operation is very common and I feel that providing it is essential even for this simple task library.
make_task
and when_all
is our complete task-creating API.
Implementing the scheduler
The scheduler does not need to know about task results and their types. It is also oblivious to the dependencies among tasks. This allows for a trivial thread pool implementation. A simpler task interface than Task<T>
is sufficient.
struct ITask {
virtual void operator()() = 0;
virtual ~ITask() = default;
};
The scheduler has a queue to which new tasks are continuously pushed. Also, there is a predetermined number of threads, that extract tasks from the queue and execute them one by one.
The task DAG is assumed to be fed into the queue in topological order (dependencies first). How this can be achieved is the most interesting part of the library and I will discuss it in detail later.
class Scheduler {
static std::unique_ptr<Scheduler> _instance;
Queue<std::shared_ptr<ITask>> _queue;
std::vector<std::thread> _threads;
Scheduler();
public:
static Scheduler* instance();
static void init();
static void shutdown();
void schedule(std::shared_ptr<ITask> task);
};
In addition to _queue
and _threads
, there is a static _instance
. init
and shutdown
need to be called at the startup and shutdown of the application, respectively.
The constructor initializes a thread pool
Scheduler::Scheduler() {
auto body = [&] {
for (;;) {
std::shared_ptr<ITask> task;
_queue.pop(task);
if (!task)
return;
(*task)();
}
};
_threads.resize(std::thread::hardware_concurrency());
for (auto& th : _threads) {
th = std::thread(body);
}
}
Each thread continues to execute tasks until it receives a nullptr
.
The single instance is created by init
void Scheduler::init() {
_instance.reset(new Scheduler());
}
And the threads are finally joined by shutdown
void Scheduler::shutdown() {
auto i = _instance.get();
for (auto j = 0u; j < i->_threads.size(); ++j) {
i->_queue.push({});
}
for (auto& th : i->_threads) {
th.join();
}
}
schedule
simply pushes the provided task onto the queue
void Scheduler::schedule(std::shared_ptr<ITask> task) {
_queue.push(task);
}
The Queue
type is a simple (and inefficient) thread-safe wrapper of std::queue
template <class T>
class Queue {
std::queue<T> _queue;
std::mutex _m;
std::condition_variable _cv;
public:
void push(T value) {
auto lock = std::unique_lock(_m);
_queue.push(value);
_cv.notify_one();
}
void pop(T& value) {
auto lock = std::unique_lock(_m);
_cv.wait(lock, [this] { return !_queue.empty(); });
value = _queue.front();
_queue.pop();
}
};
This is the scheduler in its entirety.
One question remains. If Scheduler doesn’t know anything about task dependencies and blindly schedules tasks on the thread pool, then who does the actual blocking? The answer is: tasks. Tasks themselves are responsible for synchronization.
Scheduling strategies
If we had a separate thread for each task, then we would not need to worry about scheduling at all. No matter how many tasks are waiting for their dependencies, they could be certain that sooner or later those dependencies would complete.
Unfortunately, allocating a separate thread for every task can easily become prohibitively expensive, especially on the platforms where creating new threads is costly. Reusing existing threads is a better idea.
As discussed previously, a thread pool implementation requires the task DAG to be processed in topological order. Otherwise deadlocks might occur. To guarantee the topological order, tasks need to be scheduled with care.
Next I will discuss the simple immediate scheduling strategy and show why it fails to guarantee the right scheduling order. Then I will explore a more complex strategy, that only schedules tasks when all their dependencies have completed.
Scheduling strategy 1. Immediate scheduling.
In this strategy tasks go through a series of states:
- creation and immediate scheduling (pushing onto the thread pool queue) by
make_task
- blocking while waiting for dependencies and then executing (both inside
operator()
) - being in a completed state, holding the result until the last shared pointer has been destroyed
This implementation is very simple, but it creates two serious problems.
First, the waiting for the dependencies is done on the thread pool, meaning that while waiting, no useful work can be done by the occupied thread. The thread executing such a task simply yields to the OS.
Second, and more important, is a potential for deadlocks when tasks are created inside other tasks. To understand how this is possible consider the following code:
auto getIntTask1 = make_task([] { return 0; });
auto getIntTask2 = make_task([] { return 3; });
auto task = make_task([](int x, int y) {
std::vector<std::shared_ptr<Task<int>>> tasks;
for (int i = x; i < y; ++i) {
tasks.push_back(make_task([=]{ return i * 2; }));
}
return when_all(tasks);
}, getIntTask1, getIntTask2);
auto sumTask = make_task([](auto vec) {
return std::accumulate(begin(vec), end(vec), 0);
}, task);
assert(sumTask->result() == 6);
task
waits for getIntTask1
and getIntTask2
to complete, then, based on the result, creates three new tasks and waits for their completion, producing a vector of squares.
sumTask
waits for task
to complete and sums up the resulting squares.
If there are at least two threads in the thread pool, then everything is fine. But if there is only one thread, a deadlock is possible. Furthermore, it is subject to timing.
Look at the Queue (a) below. Arrows show the dependencies among tasks. The dashed rectangles are tasks created dynamically by task
.
Both getIntTask1
and getIntTask2
are scheduled immediately. Then task
is scheduled. So far so good, task
has not yet created new tasks, including the new task created inside when_all
. sumTask
gets scheduled next, after task
, but before the when_all
task.
This leads to a situation in that getIntTask1
, getIntTask2
and task
complete, and then sumTask
’s operator()
is called by the scheduler and sumTask
blocks, waiting for tasks 1, 2, 3 and when_all
. But none of them are ever going to be executed, because task
now occupies the only thread in the thread pool. A deadlock.
If, on the other hand, task
completes before sumTask
has been scheduled, then all is well. sumTask
would be placed after when_all
and no deadlock occurs.
This situation is particularly insidious given that the number of threads in the thread pool might not be known upfront.
Sometimes it is possible to carefully rewrite the code to guarantee the correct schedule ordering
auto getIntTask1 = make_task([] { return 0; });
auto getIntTask2 = make_task([] { return 3; });
auto task = make_task([] (int x, int y) {
std::vector<SPTask<int>> tasks;
for (int i = x; i < y; ++i) {
tasks.push_back(make_task([=]{ return i * 2; }));
}
return make_task([] (auto vec) {
return std::accumulate(begin(vec), end(vec), 0);
}, when_all(tasks));
}, getIntTask1, getIntTask2);
assert(task->result() == 6);
This code will always create the ordering shown in Queue (b) above.
There are ways to automatically resolve such conflicts. For example, in addition to the three states scheduled, executing and completed, a new unscheduled state can be introduced. A task with dependencies would not be placed into the queue immediately, but instead each of its dependencies would carry a pointer to it. And the last dependency to complete would trigger the scheduling of the unscheduled task. Such design is safer, but requires additional synchronization and bookkeeping.
Any serious task library should offer a solution to this problem.
Scheduling strategy 2. Continuation-style scheduling.
In this strategy a task is scheduled after the last of its dependencies has been scheduled or executed. If a task does not have any dependencies, it is scheduled immediately upon creation. This solves the problem discussed in the previous section. It makes code independent of the thread pool size and removes the possibility of deadlocks. The downside of this strategy is added complexity.
Any task with dependencies becomes a continuation. Once a continuation is created, it needs to be attached to all its dependencies.
Consider the following code
auto task1 = make_task([] {
// long computation
int a = 1;
return make_task([a] { // task2
// long computation
int b = a + 1;
return make_task([b] { // task3
return b + 1;
});
});
});
auto task4 = make_task([] (int i) {
return i * i;
}, task1);
// task4->result() == 9
Visually the task DAG looks like this
(Dashed outlines mean that the tasks are created by other tasks.)
There are two essential properties that we want to maintain:
task4
must awaittask3
and nottask1
.task4
should only be scheduled aftertask3
.
The first property is guaranteed simply by blocking until the dependency results are available. This property does not depend on scheduling.
The second property is harder to achieve.
In the example above it is assumed that task1
(without considering the tasks it creates) does a long computation before it finally creates and returns task2
. This means that task4
is almost certainly created before task2
and task3
. At this point task1
is either waiting in the scheduler queue (scheduled) or is being executed (running). In both cases we do not want to schedule task4
immediately. We want to wait until task3
has been scheduled.
After having created task4
, we attach the new task to task1
and do not schedule it. Once task1
completes, it still does not schedule task4
, instead, it reattaches task4
to task2
. When task2
completes, it passes task4
further down to task3
. And only when task3
completes, it finally schedules task4
(because task3
does not return any new tasks).
This is one possible sequence of events. It is also possible that by the time task4
is created, the whole chain task1
-> task2
-> task3
has already completed, which means that task4
must be scheduled immediately upon creation. Another possibility is that task4
is created after task1
has completed, but task2
has not. In this case task4
must be attached to task2
. In general, it means that to which task a dependency gets attached is decided at runtime and is subject to timing.
To implement this strategy, a new class is created, an instance of which is held by all tasks
class ContinuationHolder {
std::mutex _m;
std::vector<std::function<void()>> _actions;
bool _scheduled = false;
ContinuationHolder* _transferred = nullptr;
public:
void attach(std::function<void()> action);
void schedule();
void transfer(ContinuationHolder* dest);
};
Instead of attaching actual ITask
s, a std::function
is used to reduce coupling.
ContinuationHolder
has three states, that are related to the state of the associated task:
- Accepting new continuations (the task is scheduled or running). New actions in this state are being added to a list.
- Scheduled (the task has completed and not returned a new task). New actions are executed immediately.
- Transferred (the task has completed and returned a new task, to which it reattached its continuations). New actions are delegated to the next instance of
ContinuationHolder
.
The implementation is straightforward. schedule
changes the state and executes all previously attached actions:
void ContinuationHolder::schedule() {
{
auto lock = std::lock_guard(_m);
assert(!_scheduled);
_scheduled = true;
}
for (auto& action : _actions) {
action();
}
}
transfer
works similarly
void ContinuationHolder::transfer(ContinuationHolder* dest) {
{
auto lock = std::lock_guard(_m);
assert(!_scheduled);
_transferred = dest;
}
for (auto& action : _actions) {
dest->attach(action);
}
}
And attach
dispatches action
based on the state:
void ContinuationHolder::attach(std::function<void()> action) {
auto lock = std::lock_guard(_m);
if (_transferred) {
_transferred->attach(action);
} else if (_scheduled) {
action();
} else {
_actions.push_back(action);
}
}
Now we have a suitable scheduling strategy and the tools to finally implement Task<T>
.
Implementing Task<T>
The implementation will be split between Task<T>
and TaskImpl<F, Ps...>
. Task<T>
is the user-facing part. It is abstract (it doesn’t implement operator()
) and never created directly.
template <class T>
class Task : public ITask, public std::enable_shared_from_this<Task<T>> {
protected:
std::shared_future<T> _future;
std::atomic<int> _unscheduledDeps = 0;
ContinuationHolder _ch;
template <class Ff, class... Psf>
friend class TaskImpl;
public:
using R = T;
Task(int unscheduledDeps) : _unscheduledDeps(unscheduledDeps) { }
R result() {
return _future.get();
}
void wait() override {
_future.get();
}
void addOnSchedule(std::function<void()> action) {
_ch.attach(action);
}
ContinuationHolder* ch() {
return &_ch;
}
void signalDependency() {
if (!--_unscheduledDeps) {
Scheduler::instance()->schedule(this->shared_from_this());
}
}
};
A shared future is used because several tasks might await the same dependency task.
_unscheduledDeps
is the number of currently unscheduled dependencies. This number is decremented each time a dependency completes. Once the number reaches zero, ContinuationHolder
schedules all attached continuations.
The friend declaration is necessary to allow Task<T>
access the _future
member of another Task<Z>
, where T
and Z
are different types. Say, Task<int>
should be able to access Task<float>::_future
.
Task<T>
also exposes the type of its value, R
.
template <class F, class... Ps>
class TaskImpl : public Task<std::invoke_result_t<F, typename Ps::element_type::R...>> {
public:
using R = std::invoke_result_t<F, typename Ps::element_type::R...>;
private:
std::promise<R> _promise;
F _body;
std::tuple<Ps...> _deps;
public:
TaskImpl(int depCount, F body, Ps... ps)
: Task<R>(depCount), _body(body), _deps(ps...) {
Task<R>::_future = _promise.get_future();
}
void operator()() override {
auto i = [&](Ps... ps) {
_promise.set_value(_body(ps->_future.get()...));
};
std::apply(i, _deps);
if constexpr (TaskTraits<R>::isTask) {
auto& task = Task<R>::_future.get();
Task<R>::_ch.transfer(task->ch());
} else {
Task<R>::_ch.schedule();
}
}
};
Note that the type T
in Task<T>
is not specified explicitly in TaskImpl<F, Ps...>
. Instead it is the result type of callable F
.
The dependencies Ps...
are assumed to be shared pointers: std::shared_ptr<Task<T>>
. The type T
is then computed as
std::invoke_result_t<F, typename Ps::element_type::R...>
_promise
and _body
should be self-explanatory.
_deps
is a heterogeneous container of the task dependencies. Given that the number of dependencies and their types are known at compile time, std::tuple<>
is the obvious choice.
The constructor simply initializes all the variables.
The operator()
is the heart of TaskImpl<>
. It first gets the results of all dependencies, blocking if they are not yet ready. It then passes the results of all dependencies to _body
, and stores the returned value inside this task’s _promise
. Then, depending on the return type of F
, either transfers the attached continuations to the result task or schedules them directly.
TaskTraits<T>
is a simple trait template:
template <class T>
struct TaskTraits {
static constexpr bool isTask = false;
};
template <class T>
struct TaskTraits<SPTask<T>> {
static constexpr bool isTask = true;
};
Implementing make_task
make_task
is a utility function that creates an instance of TaskImpl<>
, schedules it, and collapses values of type SPTask<SPTask<T>>
into SPTask<T>
. The last part is done using TaskJoiner
:
template <class T>
struct TaskJoiner {
T operator()(T t) {
return t;
}
};
template <class T>
struct TaskJoiner<SPTask<SPTask<T>>> {
auto operator()(auto task) {
return make_task([=] (auto x) { return x->result(); }, task);
}
};
TaskJoiner<>
uses partial template specialization to pattern-match SPTask<SPTask<T>>
. Notably, in the general case the T
type is SPTask<>
, while in the specialized case it is not a task at all, but a concrete return type like int
or float
.
template <class F, class... T>
auto make_task(F f, T... x) {
const auto depCount = sizeof...(T);
auto task = std::make_shared<TaskImpl<F, T...>>(depCount, f, x...);
if constexpr (depCount == 0) {
Scheduler::instance()->schedule(task);
} else {
(..., x->addOnSchedule([=] {
task->signalDependency();
}));
}
using TaskType = SPTask<typename TaskImpl<F, T...>::R>;
return TaskJoiner<TaskType>()(task);
}
Now make_task
needs some unpacking. First, it always creates a new task, passing to it the number of dependencies. The number of dependencies in this case is known at compile time.
If there are no dependencies, it means the task can be scheduled immediately. Otherwise the newly created task is attached to each dependency in the parameter pack using a folding expression.
The last step applies TaskJoiner<T>
to the result type.
Implementing when_all
when_all
is very similar to make_task
, except that the dependencies are only known at runtime, and the resulting task is always of type std::vector<T>
and as such does not need TaskJoiner<T>
.
template <class R>
auto when_all(std::vector<SPTask<R>> deps) {
assert(!deps.empty());
auto collect = [=] {
std::vector<R> vec;
for (auto dep : deps) {
vec.push_back(dep->result());
}
return vec;
};
auto task = std::make_shared<TaskImpl<decltype(collect)>>(
deps.size(), collect);
for (auto dep : deps) {
dep->addOnSchedule([=] {
task->signalDependency();
});
}
return static_cast<SPTask<std::vector<R>>>(task);
}
Now the complete task-creating API is implemented.
The lack of exception handling.
So far I completely ignored exceptions. Any exception inside the thread pool will just terminate the program. One way to handle exceptions is to catch exceptions thrown inside task bodies and put them into std::promise
. Then instead of passing unpacked dependency results as arguments, the std::future
s could be passed directly. The dependant tasks’ bodies would then access the passed std::future
s and trigger the previously captured exception, allowing them to either handle it, or pass it further down the task DAG.
For the sake of simplicity I omitted exception handling.
Bonus: It looks suspiciously monadic!
Indeed, make_task
combines the function of monadic return and bind
bind :: M T -> (T -> M R) -> M R
Which is the same as
std::function<SPTask<R>(T)> f;
SPTask<T> t;
SPTask<R> = make_task(f, t);
thanks to the collapsing rule Task<Task<T>>
=> Task<T>
, which is itself the monadic join
join :: M M T -> M T
Of course, there is also the monadic return
return :: T -> M T
The function of which is achieved by
SPTask<T> task = make_task([=] { return T{}; });
I have not checked the monad laws, so I am not sure Task<T>
is a true monad, but in any case, it feels like one.
An example: implementing quicksort using tasks
Quicksort is an easy algorithm to parallelize. The sequential version looks like this:
template <class Iter>
Iter qsPartition(Iter begin, Iter end) {
auto i = begin, p = end - 1;
for (auto j = begin; j < p; ++j) {
if (*j <= *p) {
std::iter_swap(i, j);
++i;
}
}
std::iter_swap(p, i);
return i;
}
template <class Iter>
void quickSortSeq(Iter begin, Iter end) {
if (begin >= end)
return;
auto p = qsPartition(begin, end);
quickSortSeq(begin, p),
quickSortSeq(p + 1, end);
}
Once the partition step is done, the elements to the left and right of the pivot element can be sorted independently. Here is a parallel version:
template <class Iter>
SPTask<int> quickSort(Iter begin, Iter end) {
if (begin >= end)
return make_task([] { return 0; });
auto partition = make_task([=] { return qsPartition(begin, end); });
auto recurse = make_task([=] (auto p) {
return when_all(std::vector{
quickSort(begin, p),
quickSort(p + 1, end)});
}, partition);
return make_task([] (auto) { return 0; }, recurse);
}
The partition step is done in a separate task. Once it completes, two new quickSort
task are created. When both of them complete, the whole initial invocation of quickSort
completes.
If when_all
had supported ITask
in addition to Task<T>
and also accepted variadic packs (and not only std::vector
s), we could improve the code, making it more natural:
template <class Iter>
std::shared_ptr<ITask> quickSort(Iter begin, Iter end) {
if (begin >= end)
return make_task([] { return 0; });
auto partition = make_task([=] { return qsPartition(begin, end); });
return make_task([] (auto p) {
return when_all(
quickSort(begin, p),
quickSort(p + 1, end));
}, partition);
}
As is, this parallel version is unlikely to be faster than the sequential one, due to the overhead created by tasks. A simple way to overcome this problem is to fallback to the sequential version when the array size is small, say 1000 elements or less.
Conclusion
In under 300 lines of code, we got ourselves a surprisingly capable task library. There are, of course, many improvements to be made. The queue might benefit from some performance optimization, the API might use some polish, more features such as when_any
might be provided, as well as the ability to query the task state: unscheduled, scheduled, running, completed. Making ITask
a first-class citizen would allow working more easily with tasks that do not return any value. Exception handling and propagation is also an obvious next step.
Still, the library is done completely using standard C++ and is pretty expressive thanks to the use of some basic template metaprogramming.
The complete code with a few test cases can be found on github.