Subscription engines

This page describes different ways to set up a GraphQL subscription along with a @neo4j/graphql server.

Default

The default behavior is automatically set if the subscriptions feature is set to true, as described in Getting Started:

new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: true
    },
});

This behavior enables a simple subscription system that works on a single instance. It is ideal for development, testing, and servers that do not require horizontal scaling.

Change Data Capture Beta

If your database supports Change Data Capture (CDC), you can use it as your mechanism for subscriptions using Neo4jGraphQLSubscriptionsCDCEngine. Make sure to follow the steps described on the CDC Documentation to enable it for your Neo4j instance.

Note that CDC-based subscriptions behave differently from other subscription mechanisms. In this case, it uses the native CDC events from Neo4j database. This has the following implications:

  • Any database change, including those changes done outside of GraphQL, will be reported.

  • Relationship events are not supported at the moment.

  • No additional broker mechanism is required. All the events are received by all the instances of @neo4j/graphql.

  • Events are not triggered immediately but are polled to the database.

Usage

Neo4jGraphQLSubscriptionsCDCEngine can be imported directly from the library. The Neo4j driver is the only required argument:

import { Neo4jGraphQL, Neo4jGraphQLSubscriptionsCDCEngine } from '@neo4j/graphql';

const engine = new Neo4jGraphQLSubscriptionsCDCEngine({
    driver,
})

const neoSchema = new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: engine,
    },
});

API

The following options can be passed to the constructor:

  • driver: The driver to be used for CDC queries.

  • pollTime: The interval, in milliseconds, between queries to CDC. Defaults to 100ms. Note that poll time is the period between one request finishing and the next one starting. The actual time it takes for CDC events to trigger the subscription also depend on your network.

  • queryConfig: An object with the driver query options to be passed to CDC requests. Use the db field to define the target database for CDC.

AMQP

Using subscriptions on a server with multiple instances can be complex, as described in Horizontal scaling. Therefore, the recommended approach is to use a PubSub system, which can be achieved with an AMQP broker such as RabbitMQ. This is supported by the @neo4j/graphql-amqp-subscriptions-engine package.

The @neo4j/graphql-amqp-subscriptions-engine plugin connects to message brokers through the AMQP 0-9-1 protocol to distribute subscription events across all server instances.

Some brokers supporting this protocol are:

The plugin can be installed with npm:

npm install @neo4j/graphql-amqp-subscriptions-engine

AMQP 1.0 is not supported by this plugin.

Usage

The AMQP plugin should be instanced and passed to the subscription field in features. This automatically enables the subscriptions with the AMQP broker as a message queue:

const { Neo4jGraphQLAMQPSubscriptionsEngine } = require("@neo4j/graphql-amqp-subscriptions-engine");

const amqpSubscription = new Neo4jGraphQLAMQPSubscriptionsEngine({
    connection: {
        hostname: "localhost",
        username: "guest",
        password: "guest",
    }
});

const neoSchema = new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: amqpSubscription,
    },
});

API

The following options can be passed to the constructor:

  • connection: an AMQP uri as a string or a configuration object.

    • hostname: hostname to be used. Defaults to localhost.

    • username: defaults to guest.

    • password: defaults to guest.

    • port: port of the AMQP broker. Defaults to 5672.

  • exchange: the exchange to be used in the broker. Defaults to neo4j.graphql.subscriptions.fx.

  • version: the AMQP version to be used. Currently only 0-9-1 is supported.

Additionally, any option supported by amqplib can be passed to connection. To set these configurations up, use the following method:

  • close(): Promise<void>: Closes the connection and channel created, and unbinds the event emitter.

Custom subscription engine

If none of the existing engines is valid for your use case, you can create a new engine to connect to any broker you may need. For that, you need to create a new class defining your messaging behavior and it must contain:

  • An EventEmitter property called events that should emit an event every time the broker sends a message.

  • A publish method that should publish a new event to the broker.

  • Optionally, an init method returning a promise that should be called on getSchema. This is useful for setting up the connection to a broker.

In case you want to handle subscriptions using redis:

// Note: This is an example of a custom subscription behavior and not a production ready redis implementation.
class CustomRedisSubscriptionEngine {
    constructor(redisClient) {
        this.client = redisClient;
        this.events = new EventEmitter();
    }

    // This method connects to Redis and sends messages to the eventEmitter when receiving events.
    async init(){
        await this.client.connect();
        this.subscriber = this.client.duplicate()
        this.publisher = this.client.duplicate();
        await this.subscriber.connect();
        await this.publisher.connect();

        await this.subscriber.subscribe("graphql-subscriptions", (message) => {
          const eventMeta = JSON.parse(message);
          this.events.emit(eventMeta.event, eventMeta); // Emits a new event when receiving a new message from redis
        });
    }

    async publish(eventMeta) {
        await this.publisher.publish("graphql-subscriptions", JSON.stringify(eventMeta)); // Sends a message to redis
    }
}

const client = createClient(); // From https://www.npmjs.com/package/redis
const redisSubscriptions = new CustomRedisSubscriptionEngine(client)

const neoSchema = new Neo4jGraphQL({
    typeDefs,
    driver,
    features: {
        subscriptions: redisSubscriptions,
    },
});

Note that extra properties and methods are often needed to handle the connection to the broker. However, as long as the messages are sent to the broker in the publish method and that these messages are received and then emitted through the events property, the subscriptions are properly handled.

Using Typescript

If using Typescript, you may import the interface Neo4jGraphQLSubscriptionsEngine to implement your own class. Ensure the API is correctly defined:

class CustomRedisEngine implements Neo4jGraphQLSubscriptionsEngine {}

Events are sent in order to the class. However, order is not guaranteed once these events have been broadcasted through a broker. For cases when ordering is important, you must set up the field timestamp in the subscriptions payload.