§10.8.

Streams and pipelines

In computing, streams and pipelines refer to sequential chains of data processing stages.

Problem

Often data needs to go through multiple stages of processing.

For example, when you upload a video to YouTube, it might follow the following processing steps: [1]

  1. Check the video is a valid file

  2. Split the video into smaller ‘chunks’ for processing

  3. Compress chunks for streaming to mobile devices

  4. Detect unlicensed music use

  5. Detect unlicensed video use

  6. Detect inappropriate content

  7. Get video statistics (e.g., length, resolution, quality)

  8. Generate thumbnails

Similarly, when you upload a post to Facebook, it may also have a sequence of validation steps: [2]

  1. Detect if it contains abusive speech (e.g., hate speech)

  2. Check any embedded links for viruses or malware

  3. Detect faces in attached images

  4. Match faces to known friends

  5. Resize and compress images for mobile users

It is easy to imagine that these chains will keep growing in complexity. YouTube and Facebook have thousands of developers. Perhaps one developer will add a feature to detect whether the video or post is child-friendly. Another might want to warn users if they’ve accidentally included passwords or other confidential information. Yet another developer might want to add color adjustments for white-balance so that images or video captured indoors do not look yellow. Finally, another developer might want to gather logging or statistical data.

Some problems can arise in managing all of these stages:

  • Developers must carefully trace through code to see how the processing is connected.

  • When adding new steps, it can be difficult to identify the existing steps.

  • If something fails in a later stage, it may be necessary to reverse or redo earlier stages.

  • If the steps are invoked one-after-the-other, the entire series of steps may take a long time to be processed.

Solution

The idea behind pipelines or streams is to treat these stages like a configurable assembly line. Each stage performs a small amount of work. A pipeline architecture connects the stages and manages the flow of data from one stage to the next.

The architectural goal in a pipeline architecture is to break up processing into simple, small, self-contained units of work.

For example, ordinary video processing code might have one complex function that splits a video file into chunks and encodes those chunks (all at once). In stream processing, there are two simple stages: one function to split a video file into chunks, another that encodes the chunks.

Implementation

While the team shopping list does not yet need complex pipelines, creating a new item triggers two actions. Consider the following excerpt from the constructor in src/server/item.js:

// Notify the team about the new item
notificationService.notifyNewItem(this.description);

// Generate an icon
this.icon = iconService.generateIcon(this.description);

A basic pipeline architecture uses a consistent structure for each stage. In the example below, each stage is a function that takes an item:

const notifyNewItemStage =
    item => {
        notificationService.notifyNewItem(item.description);
    };

const generateIconStage =
    item => {
        item.icon = iconService.generateIcon(item.description);
    };

Then, we can define a helper function to build a pipeline:

function createPipeline(stages) {
    // Return a function that will apply each stage to the item
    return item => {
        for (let stage of stages)
            stage(item);
    };
}

The pipeline can be configured and used, as follows:

// Build a pipeline from each stage
const newItemPipeline = createPipeline(
    [
        notifyNewItemStage,
        generateIconStage
    ]
);


class Item {
    constructor(description, quantity) {
        ...

        // Start the pipeline on this object
        newItemPipeline(this);
    }

    ...

}

When this application becomes more complex, adding stages is as simple as adding lines to the newItemPipeline definition. [3]

Reactive programming and RxJS

The reactive programming framework RxJS is a popular framework for pipeline and stream programming. This framework is a core part of Angular.

In RxJS, Observables are objects that generate notifications. In the previous section, I noted that RxJS Observables can be used in a publish/subscribe architecture.

RxJS observables can also implement pipelines. The output of an Observable can be modified using Operators. These operators may be chained together in a pipeline to create more complex behaviors. [4]

In Angular templates, Pipes and the pipe (|) symbol can combine formatting operators to build a pipeline for sophisticated output formatting. Angular has pipes that are ready to use (uppercase, lowercase, date, currency and json). Also, the @Pipe annotation makes it possible to define custom stages in a pipeline.

I/O Streams

Stream processing is a common approach to handling input and output (I/O). Files, keyboard input and network connections are all streams of bytes or characters. Many programming languages provide a way of building more complex streams by chaining simple streams.

For example, a pipeline to send data over a network might consist of five stages: [5]

  1. Convert Unicode text into UTF-8 bytes

  2. Compress the bytes

  3. Encrypt the bytes

  4. Buffer the bytes (so data is queued-up while waiting for the network)

  5. Send the bytes over the network connection

Node.js provides I/O streams in the built-in stream module. Consider, as a starting point, the following code to write to a file in Node.js:

const fs = require('fs');

let results = fs.createWriteStream('results.txt');

results.write('Hello, world!',
    () => results.end()
);

Suppose the file should be compressed. With streams, this is a very simple change to create a short pipeline:

const fs = require('fs');
const zlib = require('zlib');
const stream = require('stream');

let raw = fs.createWriteStream('results.txt.gz');
let results = zlib.createGzip();

// Chain the Zlib compressed stream into the raw file
results.pipe(raw);

results.write('Hello, compressed world!',
    () => results.end()
);

In this example, the results.pipe(raw) establishes a single step pipeline between the written text and the output file. The invocation of results.write(…​) is unchanged. Node.js streams manage all the details of compressing and forwarding the compressed data to the underlying file.


1. These specific steps are speculation on my part. YouTube’s video ingest pipeline is proprietary.
2. These specific steps are speculation on my part. The pipeline or processing that Facebook’s uses on a post is proprietary (and may not even be implemented as a pipeline).
3. This pipeline could even be created automatically by configuration files or by scanning a directory for JavaScript files.
4. See the Angular documentation for an example of using a pipeline to chain together operators that remove whitespace, wait for pauses in input, send requests and cancel pending requests. The result of the pipeline is a sophisticated auto-completion feature, even though each stage in the pipeline is easy to understand.
5. In this example, the pipeline architecture is a form of layering. Each layer provides the same interface: it receives a stream of bytes or characters from the layer above and sends a stream of bytes or characters to the layer below.