DevFast - Meet Parallel Producers/Consumers

Exploiting producer-consumer pattern to achieve intra-process data parallelism.

<< Previous Article (Rendezvous with JSON)

Introduction

We have a long way to go, thus, in order to quickly set the stage, we consider following psuedo code we often encounter:

   List<T> data = CreateData(...) 
   //where T is some known datatype, 
   //     CreateData is some function which returns a collection of instances of T
   
   ProcessList(data)
   //where ProcessList performs required processing on the generated data

As per above example, we declare, "CreateData" as our Data-Producer (or simply Producer) and "ProcessList" as our Data-Consumer (or simply Consumer). Then we consider following description of our goal:

Our Goal:

Given two (2) pieces of codes, one generates data (thus, calling producer) and another performs data-procesing (thus, calling consumer) and assuming that each data instance can be processed independently; we are interested in designing a generic mechanism that leverage data-parallelism, enables concurrent data-processing, and at the same time hides associated thread-synchronization intricacies and offers a simplified API.

Now after the goal is announced, we need to decide on the approach and for that when we look into the literature, we take inspiration from long known concept of Producer-consumer (one can find information of its generic case in this Wikipedia article). We would like to adapt this concept, to distribute workload (recieved from producers) among consumers while letting all the entities (all producers/consumers) running CONCURRENTly. During our discussion, we will go through, step-by-step, a possible implementation of it, i.e. to have our producer(s) running independently, yet in harmony, of our consumer(s). We will see how to create such a concert (here onward, calling it Pipeline) while satisfying following design requirements:

Although the original producer-consumer problem considered only the CONCORDANT case (third (3rd) point among above characteristics). Nevertheless, thanks to GoF's famous Adapter design pattern (see GoF design patterns), we would like to extend the idea, while stretching the original philosophy of the design pattern, in order to create a data oriented adapter (here onwards simply calling it adapter unless specified otherwise), in order to create such a pipeline among given discordant producer/consumers (given a condition we identify an adapter). The point of interest in doing so is to maintain separation of concerns, and, thus, to achieve simplified pipeline with detached data transformation logic.

Why ".Pipe"? To understand it, for a moment, imagine the date-flow and think of a simplistic view of data originating at producer and absorbed by consumer. Think of, as if there are two person (P and C) and P is handing over whatever he got to C. With this, I think of UNIX. As, with UNIX terminal when we want to create such a chain of actions (passing data between commands); it exactly lets us do that, thanks to famous "|" (a.k.a. pipe) syntax (see some examples here), and, so I thought of this name.

The Why?

When we talk about why we need such an implementation, we need to consider several factors:

Now, talking about our pipeline, lets consider a task at hand (perhaps trivial):

 

Assume, we need to parse a file, lets say CSV for simplicity sake, that contains some considerable number of records (i.e. rows). Further assume that we need to store these records to some database; without any additional computation on the data.

Here we observe two (2) distinct and uncorrelated sub-tasks: read the file (producer) & save data in database (consumer). Now, consider two(2) classic (non-concurrent) implementation approaches:

  1. Read full file -> make objects' list -> push the list to db (lets call it Approach1)
  2. read single line -> push the object to db -> repeat until end of file (lets call it Approach2)

Most importantly, both approaches ignore the fact that both, file and database operations, are I/O operations, and, given even a single core processor, concurrency can be achieved through thread-interleaving thanks to non-blocking I/O. It is also possible to design yet another balance approach where instead of pushing single record to db, we will push some fixed-size (chunk) lists to database. However, as we describe about our pipeline approach next (below), one can see that it remains less attractive in term of performance.

Assuming that we have our .Pipe implementation available. We can design a producer method (reading the file) and consumer method (making db transaction), we can simple write above code as: producer.Pipe(consumers) (lets call it Approach3)

With such an approach we make following significant observations:

  1. As a benefit of concurrency between producer and consumer: we are able to consume data (in this case, push it to db) while producer hasn't finished his work (in this case, reading file)
  2. As a benefit of concurrency among consumers: we are able to reduce the end-to-end latency (in this case, by a factor of 1/c where c is count of consumers)
  3. Thus, speaking theoritically, we can add total of n (where n ~ total_records / chunck_size) consumers in our pipeline to obtain minimal latency ~ file_handling_time + chunk_db_transaction_time

In general, as total number of records increases, we would notice (left-most is lowest and right-most is highest): (lower the better)

Thus, perhaps, it might be safe to say that our concurrent pipeline approach is a balanced approach. Thus, the why.

However, before we discuss implementation, we need to consider/make following limitations/assumptions:

About Implementation

The idea of producer-consumer is actually language neutral and can be developed in several programming languages. However, to achieve our goal, we opt to implement it in C# .Net Framework 4.6.1 while leveraging several TPL features (especially async-await) and inherent language capability to create extension methods. If you are interested to consume this implementation. Based on your choice of language you may achieve different usage forms.

During our discussion, we have provided a lot of comments along with C#.Net code snippets and added some amusing images (showing conversation among entities). Even if you feel uncomfortable with .Net syntax, do NOT be worried, you would be able to get the essential while reading this article.

There are no Dinosaurs!

During my college days, I always asked myself, every time I took the operating system book in my hand, why the dinosaurs? (unfortunately, I cannot find the original cover but this picture should do for the moment) And, I used to cajole myself that the book is not as terrifying as Jurassic Park of Steven Spielberg. I still wonder, sometimes, was it to symbolize operating system as gigantic/fascinating/stupefying as dinosaur or was it just to overwhelm a sophomore. Nonetheless, it is NEITHER the right time NOR the subject of our discussion, thus, whatever the case, during this discussion there are no dinosaurs and we will try our best to keep things simple.

Creating Interfaces

To begin with, lets have a look at following simple picture to understand a few of our design choices and more importantly what we are actually trying to build:

simple-shared-buffer

So based on above picture, we want:

  1. to standarderize the way producer will procure the buffer and add items in it, in ISOLATION, i.e. unaware of the presence of other producers or consumers.
  2. to standarderize the way consumer can retrieve those populated items from the buffer and perform required processing, in ISOLATION, i.e. unaware of the presence of other consumers or producers.
  3. have a buffer that can handle those concurrent operations.

In order to design our solution, we would like to focus on the buffer as it is going to be the central piece of our solution; and, its implementation is going to be impacted by the producer side requirements as well as of consumer, plus we should not forget that we need to diffuse all the features to our design. Thus, in order NOT to complicate the discussion with everything explained in a single silo based proposed solution, we further sub-divide the discussion into several smaller pieces as follow:

1. Our Producer and Buffer

As our solution is producer agnostic, i.e. we do not know how exactly the producer would produce an item (i.e. the actual producer implementation). In this case, we can only define a generic signature of it and thus our producer can be defined as simple as following delegate:

//NOTE: Some explanation are provided as comments

public delegate Task ProduceAsync<TP>(IProducerBuffer<TP> buffer, CancellationToken token);

//accepts buffer and cancellation token as inputs and returns a Task
//   where TP is the datatype of item produced by producer
//   and IProducerBuffer is an interface to our Buffer implementation
//we add CancellationToken as an input parameter in order to support 
//       interruptible pipeline feature
//In this way, by simply supplying CancellationToken.None to the pipeline 
//       we can create uninterruptible pipeline.

Based on above delegate signature, we can create following interface for our producer:

//IDisposable to avail Dispose method to perform resource clean-up
public interface IProducer<TP> : IDisposable
{
    //to perform some pre-processing initialization
    Task InitAsync();

    //actual data generating method
    Task ProduceAsync(IProducerBuffer<TP> buffer, CancellationToken token);
}

Now actual producer implementation can simply inherit IProducer<TP> interface. Though, we have designed how to provide buffer access to our producer, however, we haven't yet know how to populate the buffer. Thus, our first requirement at buffer side, i.e. to have some method for population. Lets look at it:

//NOTE: Some explanation are provided as comments

public interface IProducerBuffer<T>
{
    //adds an item to the buffer
    //it blocks, if buffer is full, until the item can be added
    void Add(T item, CancellationToken token);

    //adds an item to the buffer with given millisecond timeout
    //if the item was added with in timeout period returns true else false
    bool TryAdd(T item, int millisecTimeout, CancellationToken token)

    //we add this second method to support our lossy pipeline feature
    //     millisecTimeout=0 means immediately add or discard
    //based on boolean outcome, the actual producer implementation can 
    //  decide the fate of produced yet discarded item

    //we also add CancellationToken to support cancellation based on
    //transient method token (we will see an example when we talk about
    //                        detached pipeline)
}

Thus, till now, we have producer and it's buffer interfaces, and so, the means to add produced items to buffer. Now, lets look at the consumer side requirements next.

2. Our Consumer and Buffer

Similar to producer, our solution is also consumer agnostic (i.e. unaware of the the actual consumer implementation), thus, in a similar way we can define following consumer interface:

//IDisposable to avail Dispose method to perform resource clean-up
public interface IConsumer<TC> : IDisposable
{
    //to perform some pre-processing initialization
    Task InitAsync();

    //actual data consuming method
    Task ConsumeAsync(TC item, CancellationToken token);
}

while deciding about the consumer interface, especially the signature of ConsumeAsync, we had a choice to pass the buffer as method parameter as we did for producer. However, doing so we noticed such design:

  1. burdened the consumer implementation with boiler-plat code
  2. required delicate implementation to loop over the buffered items
  3. added further complexity for our discordant pipeline feature (to be discussed later)

thus, finally we decided to hide such complexity within the API and obtained a callable consumer. In such a way, the concrete consumer implementation shall focus on the business logic.

Though, no apparent requirement visible at consumer side, yet we can infer from point (2) above, that we need to loop over the items in order to drain the buffer. Thus, we need:

  1. a method to pop-out the item
  2. a boolean indicator to verify that all items are processed

So, we create our ConsumerBuffer interface:

public interface IConsumerBuffer<T>
{
    //true when all items are drained (producers are done producing too!)
    bool Finished { get; }

    //to retrieve an item
    //returns true when item was available within given millisecond timeout
    bool TryGet(int millisecTimeout, CancellationToken token, out T data);
    //we add millisecond timout to support a special case of our discordant pipeline feature
    //     anyway we can always pass millisecTimeout=Timeout.Infinite i.e. wait infinitely
}

3. Keeping Both Shards

Until this point, we are trying to fulfill all the requirements, and following item list quickly covers those points:

Now, the only remaining point is Concordance. In fact, the way we have defined our interfaces above, we have intentionally kept TP as producer type parameter and TC as consumer type parameter. Although, such different symbols (type placeholders) hardly matters in generics, nonetheless, it is to impose the idea that we will inject IDENTICAL <data-type> for both TP and TC during concordant pipeline construction and different <data-types> for discordant pipeline. Furthermore, for a rapid understanding of such a conflict, we offer following illustration:

dicordant-shared-buffer

Now, we see that:

Thus, above design will not work in case of discordant pipeline. With this idea in mind, we keep both shards of our *Buffer interface, for the moment. Even from the point of view of "abstraction", we would be wise to NOT to expose TryGet method to producer whose only interest is to fill the buffer.

4. Plugging Adapter

To fulfill our last requirement, we need to review our Image 2 as shown above; as having different datatypes will create conflict. But, before we talk about how to overcome this limitation using adapter, let's visualize what adapter must do logically based on below picture:

ppc-adapter

Thus, if we consider provided adapter as a black box, we expect that by passing an object of type <TI> it will output an object of type <TO>. As per our requirements, thus, if we pass produced items of type <TP> and convert those into consumer's type <TC>; our pipeline should work.

IMPORTANT: In order to remain generic, for the concordant case, when TC = TP = T (thus, TI = TO), we prepare a default IDENTITY adapter that does NOTHING i.e. it returns us back the same item that we provided as input without making any change in it. Following C# .Net code snippet roughly represents this idea:

public static TI IdentityAdapter<TI>(TI input)
{
    return input;
}

In order to plug such an adapter, we have following choices:

  1. Inject adapter between Producer and Buffer: We design buffer with type <TC> (shown in image 4)

    after-producer
     
  2. Inject adapter between two buffers: We introduce a second buffer and inject adapter between those, while first buffer has type <TP> and second has type <TC> (shown in image 5)

    two-buffers
     
  3. Inject adapter between buffer and Consumer: We design buffer with type <TP> (shown in image 6)

    after-consumer

Among given implementation choices, we opt to choose the third (3rd) option, i.e. injecting adapter between buffer and consumer, because:

By injecting adapter between buffer and consumer, we only need to maintain a single buffer (thus, single drainage loop), but also, we make a transperant call to adapter just at right time (before feeding data to consumer). And, by doing so we hide all these intricated implementation details behind our .Pipe call and offer complete seperation of concerns among producer, consumer and adapter so that all these three (3) pieces of code can evolve independently.

5. Vicious cycle of Agnosticism

Up until now, we kept our design both producer and consumer agnostic, however, in order to keep the complexity out of our discussion we assumed a naive approach to the Adapter. As shown above, we provided an object instance, of some given type, to our adapter and receive back an object instance of a well defined type. However, as we are close to finalize our interface design, we would like to get rid of this give-n-take assumption about our adapter. In fact, we desire to finalize the design as Adapter agnostic too! And, this is the only way we are sure that we have provided full liberty to the end-user to achieve the desired end result from such pipeline without hacking/patching business logic. End-user can then focus on actual logic and associated data-model without worrying about mundane technical plumbing between producer, consumer, and adapter.

To achieve such Adapter agnostic design, we propose following interfacing:

//NOTE: Some explanation are provided as comments 

public interface IDataAdapter<TP, TC>
{
    //accept the buffer and cancellation token and outs consumable object
    //   returns true until buffer is NOT empty! else false.
    bool TryGet(IConsumerBuffer<TP> buffer, CancellationToken token, out TC consumable);

    //we notice that we provide buffer containing produced object instances
    //   with IConsumerBuffer interface, thus exposing TryGet method!
    //Actual adapter implementation can then recover produced item (or several items)
    //    to construct an item of type TC
}

Now as we have defined all the three (3) key parts of the pipeline, given any task and assuming that our pipeline can be implemented, we can achieve an optimal solution by thinking in terms of these sub-components as shown below:

thinker

Following itemized list summarizes the above idea:

Before Getting Crazy with Code!

Until this point, we tried to use a lot of drawings to convey our ideas, but, unfortunately, now we are obliged to introduce the code, and, thus, below you will see some long code snippets. But do NOT be worried, we will add some amusing drawings to illustrate the same idea in psuedo manner; nonetheless, you must memorize below given pyramidical mind-map which is closely related to our concrete implementation:

pyramidical-mind-map

Implementing Interfaces

As we are aware that our solution is producer/consumer/adapter agnostic, thus, their respective concrete implementation is not our concern; once we have exposed our interfaces, end-user will inherit those to use in the pipeline. However, it would be nice to implement some default Adapters in order to cover some mundane use cases. Thus, in this section we will propose following implementations:

  1. Adapters:
  2. Buffer
  3. Attached pipeline
  4. Detached pipeline

1.a Identity Adapter

identity-adapter-mind-map

To start simple, we choose to implement identity adapter first and if we remember from above, it should just return the produced item as it is. We achieve this as follows:

//NOTE: Some explanation are provided as comments

//generic adapter satisfying TP = TC = T, buffer type:<T>
public class IdentityAdapter<T> : IDataAdapter<T, T>
{
    public bool TryGet(IConsumerBuffer<T> buffer, CancellationToken token, out T consumable)
    {
        //we just transfer the call on the buffer and return boolean
        //   status and also the object as it.

        return buffer.TryGet(Timeout.Infinite, token, out consumable);

        //NOTE: we pass INFINITE timeout on buffer, thus:
        //if all buffered items are processed AND producers are done...
        //   buffer will return false. Thus satisfying adapter boolean status.
        //else buffer will return True and out an instance of the produced object
        //   this again fulfils adapter behaviour.
    }
}

1.b Awaitable List Adapter

A word before implementation

Sometimes we encounter a case when consuming single item leads to a suboptimal solution; and processing those in group (chunks) is technically cost-effective. A few of such examples are:

In order to handle such use cases, we have decided to implement awaitable list adapter, so that end-user is relieved and use it out of the box. The idea is to recover List<TC> on each TryGet call on the adapter as shown below in image 7.

NOTE: Now onwards, we will use words "chunk" and "list" interchangeably, i.e., unless and until specified otherwise, list would mean a subset (a part of) of whole data.

awaitable-list-adapter

As soon as we think about list, following design related options comes to mind related to TryGet method:

For the first (1st) option, given the fact that it might NOT be possible to generate identically sized list (consider if we have total of 103 items and we fixed the list size to be 10, then the last list will contain ONLY 3 items instead of 10); yet, we choose to implement it based on belief that consumer logic is indifferent to the size of the chunk (and it should!) and the whole idea behind consuming chunks (instead of single instance) is to reduce associated technical latency.

The second (2nd) option is a generalized case of the first option, so we will implemented it but with some assumptions. These assumptions we will underline when we describe our implementation details.

We choose to opt out the third (3rd) option because it again questions the usefulness of spanning multiple consumers. Lets rethink that if we are able to supply unbounded lists to consumers then perhaps we are able to supply available items to a single consumer alone irrespective of the fact whether consumer has a capacity to handle such a list or not; then why to span other consumers concurrently? Thus, we observe that our design is going astray (based on our pre-decided goal).

NOTE: Perhaps due to our myopic vision, we dropped implementing third (3rd) option. Still, not to forget that our pipeline is Adapter agnostic, thus, end-user can always construct their own version of Adapter and plug it in.

What's the BIG idea; ain't it simply List Adapter?

The short answer is: No. it isn't!

If you have followed us till now, perhaps you might got an impression that this adapter is all about creating a list, then why we call it "Awaitable" list adapter? Ain't it as simple as spanning a loop to producea list? If you have got similar thoughts, then we assure you that its more than that; for the simplest fact that items we want to iterate might not be promptly awailable. In fact, to elaborate further let's consider following below listed arguments:

One thing is certain that if we want to reduce on latency (as a part of our goal) we need to have a some kind of notification when the items arrives in the buffer, while our thread is asleep. Similar suggestion can also be found in the original producer-consumer problem. Now, of course we do not want to build such a mechanism inside the adapter else it would fail the whole purpose (imagine, everytime end-user/we write an adapter we need to write a separate notification mechanism). Nonetheless, if we look into the literature of producer-consumer, we already know that producer is capable of providing such a signal (at the time of adding item in the buffer). Thus, considering both the perspectives, for the moment we assume that buffer is capable of such notification.

Based on above discussion, we got following insights on buffer behaviour (we would use it during buffer's TryGet implementation):

For the moment, we can safely assume that if we pass INFINITE timeout to buffer.TryGet method, then buffer we return us an item as soon as it gets added. This resolves one of our concerns, but, we still need to work on both fixed-size list and variable-size list preparation.

Constraints/Assumptions

While implementing Awaitable List Adapter, we keep following important points in mind:

  1. We can ALWAYS wait on buffer with INFINITE timeout. If it has elements, it should promptly return one else it should return one as soon as possible.
  2. No end-user is interested in consuming empty lists, i.e. list without any item in it. Thus, we only need to supply lists when it has AT LEAST one (1) element in it.
  3. End-user decides the size of the list as he is aware of system capabilities and his requirements.
  4. When end-user runs the pipeline to have FIXED size lists (as shown in Image 8):
  5. When end-user runs the pipeline to have Fixed duration lists (or variable sized) (as shown in Image 9):

NOTE: We discuss some possible use-cases of these adapters separately (below) in the article.

Implementation

list-adapter-mind-map

Based on our constraints/assumptions, we have two (2) parameters to deal with: 1) List size and 2) Timeout period. And we already know if timeout=Infinite, then we are outputting fixed-size list else variable-sized list. Let's look at the code then:

//NOTE: Some explanation are provided as comments

//generic adapter satisfying TP = T and TC = List<T>, buffer type:<T>
public class AwaitableListAdapter<T> : IDataAdapter<T, List<T>>
{
	private readonly int _millisecTimeout;
	private readonly int _maxListSize;

    //we ask user for timeout value and list size
    //if millisecTimeout = Timeout.Infinite
    //   we will construct fixed size chunks with size = maxListSize
    //otherwise,
    //   we will contruct variable sized chunks with max. size = maxListSize
	public AwaitableListAdapter(int maxListSize, int millisecTimeout)
	{
		_millisecTimeout = millisecTimeout;
		_maxListSize = maxListSize;
	}

	public bool TryGet(IConsumerBuffer<T> buffer, CancellationToken token, 
        out List<T> consumable)
	{
		consumable = default(List<T>);
		
        //NOTE: From our discussion, no user wants to consume 0-length list
        //      so we wait with INFINITE timeout, i.e.
        //      if item is already in buffer, we promptly receive it
        //      else we wait until an item is available... so we are good!

        if (!buffer.TryGet(Timeout.Infinite, token, out var value)) return false;

        //init list WITH first item
		consumable = new List<T>(_maxListSize) {value};

        //we choose what kind of list is required based on timeout.		
        return _millisecTimeout == Timeout.Infinite
			? TryFillFixedSize(buffer, token, consumable)
			: TryFillFixedDurationChunk(buffer, token, consumable);
	}

	private bool TryFillFixedSize(IConsumerBuffer<T> buffer, CancellationToken token,
		List<T> consumable)
	{
        //We loop until we fill the list

		while (consumable.Count < _maxListSize)
		{
            //we always wait for INFINITE time to be sure of having an item
            //     except
            // we are left with no item and production is over!

			if (buffer.TryGet(Timeout.Infinite, token, out var value))
			{
				consumable.Add(value);
			}
            else return true;
		} 
        return true;

        //our list already has at least 1 item, so we return TRUE!
	}

	private bool TryFillFixedDurationChunk(IConsumerBuffer<T> buffer, CancellationToken token,
		List<T> consumable)
	{
		var timeRemains = _millisecTimeout;
		var sw = Stopwatch.StartNew();

        //using stopwatch we can measure elapsed time

		while (consumable.Count < _maxListSize)
		{
            //and we loop until 
            //     1. chunk is not full
            //     2. we receive item with-in remaining time

			if (buffer.TryGet(timeRemains, token, out var value))
			{
				consumable.Add(value);

				if (timeRemains != 0)
				{
                    //IMPORTANT:
                    //we put a lower limit to zero coz:
                    //   1. of course, we can't wait with -ve time
                    //   2. but we want to keep looping even if given timeout has over
                    //      and we can still recover items from buffer
                    //      indeed, with timeout=0, we either promptly receive an item
                    //      or buffer returns FALSE.
                    //      this way we can always be able to provide larger chunk
                    //      when possible
                    //      hence the IF does NOT has ELSE with break/return
                    //      but the OUTER IF does has!

					timeRemains = (int) Math.Max(0, _millisecTimeout - sw.ElapsedMilliseconds);
				}
			}
			else return true;
		}
		return true;

        //our list already has at least 1 item, so we return TRUE!
	}
}

2. Implementing Buffer

At this point, we already have buffer interface and behavior based implementation requirements. Using below code written snippet we achieve these requirements:

//NOTE: Some explanation are provided as comments

//we implement both interface
public class PpcBuffer<T> : IProducerBuffer<T>, IConsumerBuffer<T>
{
    private readonly CancellationToken _token;
    private BlockingCollection<T> _collection;

    public PpcBuffer(int bufferSize, CancellationToken token)
    {
        //we say 0 represents unbounded buffer
        _collection = bufferSize.Equals(0) ? new BlockingCollection<T>()
                : new BlockingCollection<T>(bufferSize);
        _token = token;
    }

    //IProducerBuffer<T> IMPLEMENTATION >>>>>>>>>

    public void Add(T item, CancellationToken token)
    {
        //Add should wait even if buffer is FULL, so we
        //simply call TryAdd with INFINITE timeout

        TryAdd(item, Timeout.Infinite, token);
    }

    public bool TryAdd(T item, int millisecTimeout, CancellationToken token)
    {
        //either blocking collection will add it with in timeout
        //   or return false... so our requirement is satisfied
        //when timeout is INFINITE this method would either
        //   finish with item being added or in an exception
        //         1. when either of cancellation token is canceled
        //         2. buffer is closed
        //   so again we satisfy our requirements.

        using (var mergeToken = CancellationTokenSource.CreateLinkedTokenSource(token, _token))
        {
           return _collection.TryAdd(item, millisecTimeout, mergeToken.Token);
        } 
    }

    //IConsumerBuffer<T> IMPLEMENTATION >>>>>>>>>

    public bool TryGet(int millisecTimeout, CancellationToken token, out T data)
    {
        //we do not create merge token, as user should be able to
        //extract queued items once pipeline is closed for addition.
        return _collection.TryTake(out data, millisecTimeout, token);
    }

    //shows together both... closed for adding and empty... so we are good.
    public bool Finished => _collection.IsCompleted;

    //we implement CloseForAdding method to support implementation of 
    //  detached mode >>>>>>>

    public void CloseForAdding()
    {
        _collection.CompleteAdding();
    }
}

With such an implementation we are able to cover all the requirements as discussed above. Now, all that remains is the plumbing of these individual artifacts. And, so we do separately for both attached and detached pipeline below.

3. Attached Pipeline

As we have discussed, attached pipeline mode has following characteristics:

Raw Implementation

As our interest is to create the form producers.Pipe(consumers), we first need to device a raw implementation as fabricating final form would be just a matter of creating an extension method. We will create this method separately. Approach of our raw implementation would revolve around following idea:

  1. Run all producers indepedently as async methods
  2. Run all consumers independently as async methods
  3. Feed Adapter transformed items to consumers
  4. Observer producers as they completes production
  5. Signal buffer once all producers are done
  6. Dispose producers as they finish their work
  7. Let consumers consume finish all remaining items
  8. Dispose consumers

NOTE: We have used one of home-made extension methods to span and await on tasks (for both producers/consumers) :

attached-pipeline-mind-map

Following approach implements all above listed steps:

//NOTE: Some explanation are provided as comments

//We hide the implementation inside INTERNAL class to
//expose it through extension method
internal static class PipeImpl<TP, TC>
{
    public static Task Execute(CancellationToken token, 
        int bufferSize, 
        IDataAdapter<TP, TC> adapter,
        IReadOnlyList<IProducer<TP>> producers, 
        IReadOnlyList<IConsumer<TC>> consumers)
    {
        //instead of using await, we decided to create a new Task
        //so that caller func can await as per its convenience

        return Task.Run(async () =>
        {
            using (var localCts = new CancellationTokenSource())
            {
                using (var combinedCts = CancellationTokenSource
                    .CreateLinkedTokenSource(token, localCts.Token))
                {
                    //creating buffer as per required size
                    using (var ppcBuffer = new PpcBuffer<TP>(bufferSize, 
                                                      combinedCts.Token))
                    {
                        //span consumers
                        var rc = RunConsumers(consumers, ppcBuffer, adapter, 
                                              combinedCts.Token, localCts);
                        //span producers
                        var rp = RunProducers(producers, ppcBuffer, 
                                              combinedCts.Token, localCts);

                        //wait until all consumers and producers finish
                        await Task.WhenAll(rc, rp).ConfigureAwait(false);
                    }
                }
            }
        });
    }
    
    internal static Task RunConsumers(IReadOnlyList<IConsumer<TC>> consumers,
        IConsumerBuffer<TP> feed, IDataAdapter<TP, TC> adapter,
        CancellationToken token, CancellationTokenSource tokenSrc)
    {
        //following line span all consumers (RunConsumer method) in the list
        //as separate task

        return new Func<int, CancellationToken, Task>(async (i, t) =>
                await RunConsumer(consumers[i], feed, adapter, t, tokenSrc)
                                .ConfigureAwait(false))
            .WhenAll(consumers.Count, token);
        //our home-made WHENALL line waits on all created tasks
        // (i.e. waits on all consumer to finish)
    }

    private static async Task RunConsumer(IConsumer<TC> parallelConsumer,
        IConsumerBuffer<TP> feed, IDataAdapter<TP, TC> adapter,
        CancellationToken token, CancellationTokenSource tokenSrc)
    {
        try
        {
            //this would dispose the consumer once we have nothing left
            //to consume
            using (parallelConsumer)
            {
                //init consumers
                await parallelConsumer.InitAsync().ConfigureAwait(false);
                token.ThrowIfCancellationRequested();
          
                //we loop until adapter is capable to create a consumable
                //   instance
                while (adapter.TryGet(feed, token, out var consumable))
                {
                    //we feed the item to consumer and wait before
                    // supplying another item.          
                    await parallelConsumer.ConsumeAsync(consumable, token)
                                          .ConfigureAwait(false);
                }
            }
        }
        catch
        {
            //in case producer ends up in error
            // we cancel the token so that producer can intercept it
            if (!token.IsCancellationRequested) tokenSrc.Cancel();
            throw;
        }
    }

    private static Task RunProducers(IReadOnlyList<IProducer<TP>> producers,
        PpcBuffer<TP> buffer, CancellationToken token,
        CancellationTokenSource tokenSrc)
    {
        return Task.Run(async () =>
        {
            try
            {
                //following line span all consumers (RunProducer method) in the list
                //as separate task

                await new Func<int, CancellationToken, Task>(async (i, t) =>
                        await RunProducer(producers[i], buffer, t, tokenSrc)
                                        .ConfigureAwait(false))
                    .WhenAll(producers.Count, token).ConfigureAwait(false);
                //our home-made WHENALL line waits on all created tasks
                // (i.e. waits on all producer to finish)
            }
            finally
            {
                //>>>>> IMPORTANT: No matter whether producers finishes normally
                //                 or ends-up in error
                //                 we close the buffer
                buffer.CloseForAdding();
            }
        });
    }

    private static async Task RunProducer(IProducer<TP> parallelProducer,
        IProducerBuffer<TP> feed, CancellationToken token,
        CancellationTokenSource tokenSrc)
    {
        try
        {
            //this would dispose the producer once we have nothing left
            //to produce
            using (parallelProducer)
            {
                //initalize producer
                await parallelProducer.InitAsync().ConfigureAwait(false);
                token.ThrowIfCancellationRequested();
                
                //we provide our buffer to producer
                //it will be producer responcibility to populate it
                //    and return from it once there is nothing left
                //    to produce.
                await parallelProducer.ProduceAsync(feed, token)
                                      .ConfigureAwait(false);
            }
        }
        catch
        {
            //in case producer ends up in error
            // we cancel the token so that consumer can intercept it
            if (!token.IsCancellationRequested) tokenSrc.Cancel();
            throw;
        }
    }
}
Achieving .Pipe usage form (or Syntactic Sugar)

We have all the ingredients to cook our extension methods and we propose following four (4) such methods to achieve different pipelines as we had discussed above:

  1. Concordant Pipeline: Producer type matches consumer type (i.e. <TP> = <TC>). Normally end-user need to inject IDENTITY adapter to it, but we can implicitly do it inside our method as shown below:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<T>> consumers,
                                    CancellationToken token = default(CancellationToken),
                                    int bufferSize = 256)
    {
        return PipeImpl<T, T>.Execute(token, bufferSize, 
                                      new IdentityAdapter<T>(), 
                                      producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //await producers.Pipe(consumers);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
  2. Discordent Pipeline with FIXED SIZE chunk: If Producer type is <T> then consumer type is List<T> and end-user seek fixed sized chunks. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<List<T>>> consumers,
                                    int listSize,
                                    CancellationToken token = default(CancellationToken),
                                    int bufferSize = 256)
    {
        //timeout is INFINITE, we will get FIXED-size chunks
        return PipeImpl<T, List<T>>.Execute(token, bufferSize, 
                                  new AwaitableListAdapter<T>(listSize, Timeout.Infinite),
                                  producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<List<T>>[]{ consumer1, ..., consumerM };
    //await producers.Pipe(consumers, some_positive_int_for_chunk_size);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
  3. Discordent Pipeline with FIXED DURATION chunk: If Producer type is <T> then consumer type is List<T> and end-user seek variable sized chunks created using fixed duration. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<List<T>>> consumers,
                                    int listMaxSize,
                                    int millisecondTimeout,
                                    CancellationToken token = default(CancellationToken),
                                    int bufferSize = 256)
    {
        //timeout and MAX list size passed to adapter to avail chunks
        return PipeImpl<T, List<T>>.Execute(token, bufferSize, 
                                  new AwaitableListAdapter<T>(listMaxSize, millisecondTimeout),
                                  producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<List<T>>[]{ consumer1, ..., consumerM }; 
    //await producers.Pipe(consumers, some_positive_int_for_max_chunk_size,
    //                                some_positive_int_for_timeout);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
  4. Generic Pipeline: Producer type is <TP> and consumer type is <TC> and IDataAdapter<TP, TC> implementation is available to end-user.
    //IMPLEMENTATION
    public static Task Pipe<TP, TC>(this IReadOnlyList<IProducer<TP>> producers,
                                         IReadOnlyList<IConsumer<TC>> consumers,
                                         IDataAdapter<TP, TC> adapter,
                                         CancellationToken token = default(CancellationToken),
                                         int bufferSize = 256)
    {
        //timeout and MAX list size passed to adapter to avail chunks
        return PipeImpl<TP, TC>.Execute(token, bufferSize, 
                                        adapter,
                                        producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<TP>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<TC>[]{ consumer1, ..., consumerM };
    //IDataAdapter<TP, TC> adapter = ...end-user-adapter-creation-call...
    //await producers.Pipe(consumers, adapter);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    

4. Detached Pipeline

Deatched pipeline differs a bit as we do not have producers instances available to us at pipeline construction time (i.e. while calling PipeImpl<TP,TC>.Execute) as we do have for attached mode. Due to the absence of these producers we are do not have any mechanism to populate our buffer. Also, we had discussed during our initial discussion, producers for such a pipeline may appear sporadically. Thus, unfortunately, we will NOT be able to achieve our desired producers.Pipe(consumers) usage form, however, we attempt to achieve a similar simplified usage form based on following information:

Raw Implementation

As we do NOT have any single point in code to await on, we need to fabricate a way to keep our pipeline alive for the whole duration so that all produced items (by ephemeral sporadically appearing producers or long-running producers) can be added in it. For all pragmatic reasons, we measure such duration as: "The time duration starting from the moment when such a pipeline is constructed until the moment when the call to CloseForAdding method is made."

With these assumptions made and intentions declared, we proceed with detached Pipeline interfacing as follow:

//we adopt this interface as this nearly mimic
//all operations of RunProducers method of 
//PipeImpl<TP, TC> static class we used for attached mode
//i.e.
//    Add and TryAdd method
//    and Dispose method
//    we have nothing to Init.
public interface IPipeline<T> : IProducerBuffer<T>, IDisposable
{
}

detached-pipeline-mind-map

Now, with IPipeline, we will be able to mimic all the producer related operations as we have done before (in RunProducers method of PipeImpl<TP, TC> static class). Lets look at the implementation:

//NOTE: Some explanation are provided as comments

internal sealed class PipelineImpl<TP,TC> : IPipeline<TP>
{
    private readonly CancellationTokenSource _mergedCts;
    private readonly PpcBuffer<TP> _feed;
    private readonly Task _consumerTask;
    private CancellationTokenSource _localCts;

    public Pipeline(IReadOnlyList<IConsumer<TC>> consumers, 
                    IDataAdapter<TP, TC> adapter, 
                    CancellationToken token, 
                    int bufferSize)
    {
        _localCts = new CancellationTokenSource();
        _mergedCts = CancellationTokenSource.CreateLinkedTokenSource(token, 
                                                           _localCts.Token);
        _feed = new PpcBuffer<TP>(bufferSize, _mergedCts.Token);

        //in order to span and await on our consumer
        //    we simply call the existing implementation
        //        from PipeImpl class
        _consumerTask = PipeImpl<TP, TC>.RunConsumers(consumers, _feed, 
                                                adapter, token, _localCts);
    }

    public void Add(TP item, CancellationToken token)
    {
        TryAdd(item, Timeout.Infinite, token);
    }

    public bool TryAdd(TP item, int millisecTimeout, CancellationToken token)
    {
        //passing the item to buffer
        return _feed.TryAdd(item, millisecTimeout, token);
    }

    public void Dispose()
    {
        if (_localCts == null) return;
        try
        {
            using (_localCts)
            {
                using (_mergedCts)
                {
                    using (_feed)
                    {
                        //FIRST, we cancel our local token
                        _localCts.Cancel();
                        
                        //SECOND, we close the feed for addition
                        _feed.CloseForAdding();
                        
                        //Then, we wait for remaining items to be
                        //      consumed
                        _consumerTask.Wait();
                    }
                }
            }
        }
        finally
        {
            _localCts = null;
        }
    }
}
Instance Management

Contrary to attached pipeline, where we had a single place in code to await on the whole pipeline workflow, in detached mode we do NOT have such a luxury. Thus, end-users need to maintain the instance of IPipeline<TP> somewhere after the construction and explicitly call the Dispose on it. This, of course, require some attentions, however, before rejecting the usage of this implementation we need to meditate over following thoughts:

Achieving .Pipeline usage form (or Syntactic Sugar)

Above, we have already created .Pipe extension methods. In similar manner, we can obtain following .Pipeline methods:

  1. Concordant Pipeline: Producer type matches consumer type (i.e. <TP> = <TC>). Normally end-user need to inject IDENTITY adapter to it, but we can implicitly do it inside our method as shown below:
    //Implentation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new IdentityAdapter<T>(),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline();
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
    
  2. Discordent Pipeline with FIXED SIZE chunk: If Producer type is <T> then consumer type is List<T> and end-user seek fixed sized chunks. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below (IMPORTANT: As producers are sporadic, one might like to avoid this adapter completely as unnecessary consumer side delays will be observed if no producer appears for long time... in detached mode, FIXED DURATION chunk is preferred):
    //Implentation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   int listSize,
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new AwaitableListAdapter<T>(listSize, Timeout.Infinite),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline(some_positive_list_size);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
    
  3. Discordent Pipeline with FIXED DURATION chunk: If Producer type is <T> then consumer type is List<T> and end-user seek variable sized chunks created using fixed duration. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below:
    //Implentation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   int listMaxSize,
                                   int millisecondTimeout, 
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new AwaitableListAdapter<T>(listMaxSize, millisecondTimeout),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline(some_positive_list_size,
                                                            some_positive_timeout);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
    
  4. Generic Pipeline: Producer type is <TP> and consumer type is <TC> and IDataAdapter<TP, TC> implementation is available to end-user.
    //Implentation
    public static IPipeline<TP> Pipeline<TP, TC>(this IReadOnlyList<IConsumer<TC>> consumers,
                                   IDataAdapter<TP, TC> adapter,
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<TP, TC>(consumers, 
                                  adapter,
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<TC>[]{ consumer1, ..., consumerM };
    //IDataAdapter<TP, TC> adapter = ...end-user-adapter-creation-call...
    //var save_this_instance_somewhere = consumers.Pipeline(adapter);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
    

Commentary

Feature Implementation

So far, we have implemented all the initially set requirements. Before we close this discussion, we would like to wrap our features:

Feature Implementation
Feature Implementation
Buffer Size Through method parameter (bufferSize); 0 is unbounded
Losslessness using millisecond Timeout during Add/TryAdd, Timeout.Infinite represents no-loss
Interruptibility Using CancelationToken
Concordance Use of Adapters
Attachability

Attached mode implementation using .Pipe extension methods

Detached mode implementation using .Pipeline extension methods

We have also noticed:

Original Work and Nuget Package

In our original work (Source Code LinkNuGet Package Link), we have further elaborated our implementation as explained below:

Usage

IMPORTANT: We suggest you to use v1.4.0 or higher; as it contains some breaking changes compared to previous versions. This library also contains some other interesting extension methods which we might cover in future articles on code-project itself. However, if you are interested in usage of those methods, you can find information here.

History

This is the v1 of the present idea.

<< Previous Article (Rendezvous with JSON)