§10.7.

Publish/subscribe architecture

A publish/subscribe architecture is a mechanism that allows message sending to be separated from message receiving.

Problem

In the team shopping list, when an item changes its state, additional actions must be executed (e.g., sending emails, generating expenses).

This relationship between state change and action introduces new dependencies to item.js. The logic in item.js needs to trigger all of the relevant actions. Consider the following excerpt from item.js:

constructor(...) {
    ...
    notificationService.notifyNewItem(this.description);
    ...
    this.icon = iconService.generateIcon(this.description);
}

purchase() {
    ...
    expenseService.createExpense(this.description, this.quantity);
    ...
}

delete() {
    ...
    notificationService.notifyDeletedItem(this.description);
    ...
}

The problem with this code is that a single file contains many dependencies. It will become increasingly difficult to extend or adapt the system:

  • Enabling or disabling actions will require changes in the state handling logic

  • New actions (e.g., adding logging, adding SMS notifications) require careful checking of the state handling logic

  • There is no easy way to queue actions to be run later (e.g., queuing up the emails to be sent, rather than waiting for email for every state change)

Solution

Publish/subscribe architectures replace direct function invocation with indirect message publishing. Two modules no longer need to know of each other to communicate. Instead, the sender and receiver connect to the same topic: the sender publishes messages and the receiver subscribes. The architecture is responsible for routing messages from one to the other. The benefit is that replacing either the sender or receiver does not require a change to the other.

Publish/subscribe architecture

In practice, moving to a publish/subscribe architecture involves thinking about a problem in terms of events that occur, rather than specific sequences of function calls. Ideally, a good design for messages and topics has future extensions in mind.

For example, currently the Item class is written so that it calls functions in other modules:

  1. notificationService.notifyNewItem(…​)

  2. iconService.generateIcon(…​)

  3. expenseService.createExpense(…​)

  4. notificationService.notifyDeletedItem(…​)

The function invocations are translated into reusable topics. For example, topics based on the item life-cycle would include itemCreated, itemPurchased and itemDeleted.

The topics provide an anchor for future extensions. At first, only the expenseService would subscribe to the itemPurchased topic. However, as the system grows in complexity, new services could subscribe to the same topics. For example, when an item is purchased, it could simultaneously trigger an email to a manager, an update in an inventory tracking system, allocation of an asset tracking number. No change to the Item class is required to add these additional services as new subscribers.

Implementation

The publish/subscribe architecture has many implementations. For example, pubsub-js provides a simple framework for implementing publish/subscribe on any JavaScript platform. Node.js also has an events module that connects listeners (i.e., subscribers) to named events (i.e., topics) generated by event emitters (i.e., publishers).

Using established libraries provides a range of performance and reliability benefits. However, the fundamental ideas of publish/subscribe architectures are easy to implement.

For example, we could create a simple topics.js module as follows:

class Topic {
    constructor() {
        this.subscribers = [];
    }

    publish(message) {
        for (let callback of this.subscribers) {
            callback(message);
        }
    }

    subscribe(callback) {
        this.subscribers.push(callback);
    }
}

module.exports = {
    itemCreated: new Topic(),
    itemPurchased: new Topic(),
    itemDeleted: new Topic()
}

Then, the Item class can publish directly to topics:

constructor(...) {
    ...
    topics.itemCreated.publish(this);
    ...
}

purchase() {
    ...
    topics.itemPurchased.publish(this);
    ...
}

delete() {
    ...
    topics.itemDeleted.publish(this);
    ...
}

And the services can register themselves to listen for events. For example, the modified notification.js might look like this:

const topics = require('topics.js');

topics.itemCreated.subscribe(item => {
    console.log(`---------------------------------------------`);
    console.log(`To: Everyone`);
    console.log(`From: shopping list app`);
    console.log(`Please buy this item: ${item.description}`);
});

topics.itemDeleted.subscribe(description => {
    console.log(`---------------------------------------------`);
    console.log(`To: Everyone`);
    console.log(`From: shopping list app`);
    console.log(`Warning! Please don't buy this item: ${description}`);
});

By implementing this simple interpretation of the publish/subscribe architecture, the project separates the logic of the Item class from the other services that handle triggered events. More sophisticated publish/subscribe frameworks can provide additional features to improve the application:

  • Error handling with automatic retry

  • Queuing messages and processing them at a later time (asynchronous execution rather than synchronous execution)

  • Running multiple subscribers simultaneously (parallel execution rather than sequential execution)

  • Logging

Reactive programming and RxJS

The reactive programming framework RxJS includes elements of publish/subscribe architectures. Angular code requires RxJS. React code may also use RxJS.

In Angular code using RxJS, Observables are objects that allow observers to subscribe to notifications. Observable objects are ‘topics’. Subscribing to the observable is as simple as supplying a callback to the .subscribe(callback) method.

The Subject and BehaviorSubject classes of RxJS are similar to topics of publish/subscribe architectures. You can publish a message by calling .next(message) on a RxJS subject.

Content-based filtering

In this discussion, I have assumed separate topics for different kinds of events. A variation on this idea is the event bus architecture. Instead of delivering messages based on topics, a publish/subscribe architecture could use just one single topic for all messages. In this case, the architecture filters messages to subscribers based on the message content.

For example, rather than publishing to a specific topic, an event bus architecture might have only a single publish message:

const EVENT_DELETED = 'EVENT_DELETED';

delete() {
    ...
    eventBus.publish({
        type: EVENT_DELETED,
        description: this.description,
        quantity: this.quantity
    });
    ...
}

A filter expression specifies whether to deliver the message:

eventBus.subscribe(
    event => event.type == EVENT_DELETED && event.quantity > 10
    () => {
        console.log(`---------------------------------------------`);
        console.log(`To: Manager`);
        console.log(`From: shopping list app`);
        console.log(`Warning! A large order was deleted.`);
    }
);

The subscriber’s callback will only be invoked based if the filter expression event.type == EVENT_DELETED && event.quantity > 10 is true. i.e., The callback should subscribe to events for deleted items, where the quantity is greater than ten.

Redux

Many React applications use the Redux framework to manage application data. The design of Redux is a combination of a state machine and a publish/subscribe or event bus architecture.

In Redux, every change in an application occurs when components call dispatch to publish actions (i.e., messages) to a store (i.e., an event bus). In Redux, reducers are subscribers.

Redux extends the idea of an event bus by having a state machine act as a single ‘source of truth’. Reducers are triggered to update the store when actions are published.

For example, consider a simple counter application created in React with Redux. Every time the users clicks ‘Increase by one’, a counter should increase:

In Redux, we can write an action creator to create a message:

function increase() {
    return { type: 'INCREASE', amount: 1 };
}

The "Increase by one" button would publish the message to the event bus by calling dispatch:

<button onClick={() => store.dispatch(increase())}>Increase by one</button>

A store is configured with a reducer (i.e., a subscriber) and an initial value:

const store = createStore(
    increaseReducer,
    { count: 0 }
)

Finally, the reducer is a subscriber responsible for updating the state whenever it sees an 'INCREASE' message:

function increaseReducer(state, action) {
    switch (action.type) {
        case 'INCREASE':
            return {...state, count: state[count] + action.amount };
        default:
            return state;
    }
}

In this simple example, Redux adds a great deal of complexity for little benefit. However, the framework has become a well-accepted approach to managing the complexity of large web applications. Redux is one approach to separating presentation logic from action creation and command processing.

CQRS and event sourcing

The terms command/query responsibility segregation (CQRS) and event sourcing are names of related concepts used in server-side logic.

Recall that in a three-layer architecture, the presentation layer calls the domain layer and the domain layer calls the persistence layer.

An event sourcing architecture applies a Redux-style event bus architecture to domain logic. Specifically, in an event sourcing architecture, domain logic code is separated into commands (i.e., actions/events) and command handling logic (i.e., reducers/subscribers). The presentation layer creates commands, and these commands are the interface to the rest of the domain logic.

A benefit of event sourcing is that an event store records a complete history of all commands. Usually, a database only stores the current state of the system. Event sourcing makes it easy to keep a detailed log of every change up to the current state of the database. This detailed log is an audit trail and a source for historical data analysis.

The following figure depicts event sourcing:

Updates in event sourcing

Querying and data retrieval does not change the database. A query is distinct from commands. Command/query responsibility segregation refers to the idea that the domain logic must provide direct methods for querying the data.

Thus, data retrieval operations are read-only database queries that follow the traditional layered architecture:

Querying a database

When combined with event sourcing in command/query responsibility segregation, the resulting architecture is as follows:

Publish/subscribe architecture
Reflection: Disadvantages

What are some disadvantages of publish/subscribe architectures?