Pipeline¶
A pipeline is a feature offered by StreamPU
allowing to split a
sequence into multiple stages. Each stage is executed
on one or more threads in parallel. The pipeline takes care of the
synchronizations between stages. This is achieved through an implementation
of a producer/consumer algorithm.
A pipeline is a C++ object of the spu::runtime::Pipeline
class. The
following sections try to give an overview of the most important attributes and
methods to facilitate the code understanding.
Main Attributes¶
The original sequence from which the pipeline was created. Vector of the different stages in the pipeline. Each stage is a sequence.std::vector<std::pair<std::tuple<runtime::Socket*, size_t, size_t, size_t,size_t>,
std::tuple<runtime::Socket*, size_t, size_t, size_t>>> sck_orphan_binds;
input
, output
, priority
) of the created adaptors,
priority
is used to order the tuples.
Main Methods¶
This is the public method that runs the pipeline in loop. Other variants exist where it is possible to give a stop condition function.void init(const std::vector<runtime::Task*> &firsts,
const std::vector<runtime::Task*> &lasts,
const std::vector<std::tuple<std::vector<runtime::Task*>, std::vector<runtime::Task*>, std::vector<runtime::Task*>>> &sep_stages = {},
const std::vector<size_t> &n_threads = {},
const std::vector<size_t> &synchro_buffer_sizes = {},
const std::vector<bool> &synchro_active_waiting = {},
const std::vector<bool> &thread_pinning = {},
const std::vector<std::vector<size_t>> &puids = {});
This method creates the pipeline given:
- The first and last tasks of the original sequence (
firsts
andlasts
). - The first and last tasks of each stage (
sep_stages
). - The number of threads to allocate to each stage (
n_threads
). - The number of buffers between stages (
synchro_buffer_sizes
). - The type of waiting for the adaptor tasks (
synchro_active_waiting
).
Note
StreamPU doesn't support consecutive multi-threaded stages yet.
void create_adaptors(const std::vector<size_t> &synchro_buffer_sizes = {},
const std::vector<bool> &synchro_active_waiting = {});
pull
& push
tasks)
that are added between each stage to transmit data from the stage \(S\) to the
stage \(S+1\).
Adaptor module tasks pull
& push
need to be bound to each task in the two
consecutive stages, the target sockets to bind are stored in the vector
sck_orphan_binds.
Adaptor¶
spu::module::Adaptor
is a special module automatically inserted between
stages when creating a pipeline and serve as "bridges" between them, they are
bound to first and last tasks of the consecutive stages. The purpose of adaptors
is to synchronize data exchange between each stage using pre-allocated buffer
pools. In other words, this is an implementation of the producer-consumer
algorithm. There are 4 tasks performed by adaptors:
push_1
: when the \(S\) stage is executed on one thread and the \((S+1)\) stage is executed on multiple threads. The function gets an empty buffer and fills it with the data produced in the stage \(S\). The buffers are filled using a round-robin algorithm.pull_n
: when the \(S\) stage is executed on multiple threads and the \((S-1)\) stage is on one thread. It is the task executed just after thepush_1
, it takes a filled buffer from the inter-stage pool and forwards the data. There is apull_n
task for every thread of the stage.push_n
: when the \(S\) stage is executed on multiple threads and the \((S+1)\) stage is on one thread. The task takes an empty buffer from the pool and fills it with the data produced by the thread. There is apush_n
task for each thread of the stage.pull_1
: when the \(S\) stage is executed on one thread and the \((S-1)\) stage is on multiple threads, it's the task executed just after thepush_n
. It takes filled buffers from the pool using the same round-robin algorithm aspush_1
and forward the data.
Main Attributes¶
The inter-stage buffer pool size. Pointers to each buffer of the inter-stage pool.std::shared_ptr<std::vector<std::atomic<uint64_t>>> first;
std::shared_ptr<std::vector<std::atomic<uint64_t>>> last;
first
is used to get the filled
buffers, and last
for the empty ones.
Main Methods¶
These are the methods used to synchronize the buffer pool between the pipeline stages. When getting a buffer, the thread may sleep if there is no empty buffer available. When a new empty buffer will be available, the sleeping thread will be woken up.
Get a pointer to the first empty buffer in the pool (at indexlast
).
Get a pointer to the first filled buffer in the pool (at index first
).
Get a pointer to the first empty buffer in the pool, and replace this buffer
with a new one pointed by swap_buffer
parameter.
Get a pointer to the first filled buffer in the pool, and replace this buffer
with a new one pointed by swap_buffer
parameter.
The puller can wake up a push task if this one is waiting for an empty buffer.
The pusher can wake up a pull task if this one is waiting for an empty buffer.