Run concurrent transactions

You may leverage Goroutines and channels to run concurrent queries, or to delegate the processing of a query’s result to multiple threads. The examples below also use the Go sync package to coordinate different routines. If you are not familiar with concurrency in Go, checkout The Go Programming Language — Go Concurrency Patterns: Pipelines and cancellation.

If you need causal consistency across different transactions, use bookmarks.

Concurrent processing of a query result set (using sessions)

The following example shows how you can stream a query result to a channel, and have its records concurrently processed by several consumers.

package main

import (
    "fmt"
    "context"
    "time"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "<URI for Neo4j database>"
    dbUser := "<Username>"
    dbPassword := "<Password>"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    // Run a query and get results in a channel
    recordsC := queryToChannel(ctx, driver)  (1)

    // Spawn some consumers that will process records
    // They communicate back on the log channel
    // WaitGroup allows to keep track of progress and close channel when all are done
    log := make(chan string)  (4)
    wg := &sync.WaitGroup{}  (5)
    for i := 1; i < 10; i++ {  // i starts from 1 because 0th receiver would process too fast
        wg.Add(1)
        go consumer(wg, recordsC, log, i)  (6)
    }
    // When all consumers are done, close log channel
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log as it comes
    for v := range log {
        fmt.Println(v)
    }
}

func queryToChannel(ctx context.Context, driver neo4j.DriverWithContext) chan *neo4j.Record {
    recordsC := make(chan *neo4j.Record, 10)  (2)
    session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
    defer session.Close(ctx)
    go session.ExecuteWrite(ctx,
        func(tx neo4j.ManagedTransaction) (any, error) {
            // Neo4j query to create and retrieve some nodes
            result, err := tx.Run(ctx, `
                UNWIND range(1,25) AS id
                MERGE (p:Person {id: id})
                RETURN p
                `, nil)
            if err != nil {
                panic(err)
            }
            // Stream results to channel as they come from the server
            for result.Next(ctx) {  (3)
                record := result.Record()
                recordsC <- record
            }
            close(recordsC)
            return nil, err
        })
    return recordsC
}

func consumer(wg *sync.WaitGroup, records <-chan *neo4j.Record, log chan string, n int) {
    defer wg.Done()  // will communicate that routine is done
    for record := range records {
        log <- fmt.Sprintf("Receiver %v processed %v", n, record)
        time.Sleep(time.Duration(n) * time.Second)  // proxy for a time-consuming processing
    }
}
1 A Goroutine runs the query to the Neo4j server with a managed transaction. Notice that the driver session is created inside the routine, as sessions are not thread-safe.
2 The channel recordsC is where the query result records get streamed to. The transaction function from ExecuteWrite() writes to it, and the various consumers read from it. It is buffered so that the driver does not retrieve records faster than what the consumers can handle.
3 Each result record coming from the server is sent over the recordsC channel. The streaming continues so long as there are records to be processed, after which the channel gets closed and the routine exits.
4 The channel log is where the consumers comunicate on.
5 A sync.WaitGroup is needed to know when all consumers are done, and thus the log channel can be closed.
6 A number of consumers get started in separate Goroutines. Each consumer reads and processes records from the recordsC channel. Each consumer simulates a lengthy operation with a sleeping timer.

Concurrent run of multiple queries (using ExecuteQuery())

The following example shows how you can run multiple queries concurrently.

package main

import (
    "fmt"
    "context"
    "sync"
    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    ctx := context.Background()

    // Connection to database
    dbUri := "<URI for Neo4j database>"
    dbUser := "<Username>"
    dbPassword := "<Password>"
    driver, err := neo4j.NewDriverWithContext(
        dbUri,
        neo4j.BasicAuth(dbUser, dbPassword, ""))
    if err != nil {
        panic(err)
    }
    defer driver.Close(ctx)
    err = driver.VerifyConnectivity(ctx)
    if err != nil {
        panic(err)
    }

    log := make(chan string)  (1)
    wg := &sync.WaitGroup{}  (2)
    // Spawn 10 concurrent queries
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go runQuery(wg, ctx, driver, log)  (3)
    }
    // Wait for all runner routines to be done before closing log
    go func() {
        wg.Wait()
        close(log)
    }()
    // Print log
    for msg := range log {
        fmt.Println(msg)
    }
}

// Run Neo4j query with random sleep time, returning the sleep time in ms
func runQuery(wg *sync.WaitGroup, ctx context.Context, driver neo4j.DriverWithContext, log chan string) {
    defer wg.Done()  // will communicate that routine is done
    result, err := neo4j.ExecuteQuery(ctx, driver, `
        WITH round(rand()*2000) AS waitTime
        CALL apoc.util.sleep(toInteger(waitTime)) RETURN waitTime AS time
        `, nil, neo4j.EagerResultTransformer,
        neo4j.ExecuteQueryWithDatabase("neo4j"))
    if err != nil {
        log <- fmt.Sprintf("ERROR: %v", err)
    } else {
        neo, _ := result.Records[0].Get("time")
        log <- fmt.Sprintf("Query returned %v", neo)
    }
}
1 The log channel is where all query routine communicate to.
2 A sync.WaitGroup is needed to know when all query routines are done, and thus the log channel can be closed.
3 Ten different queries are run, each in its own Go routine. They run independently and concurrently, reporting to the shared log channel.

Glossary

LTS

A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.

Aura

Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.

Cypher

Cypher is Neo4j’s graph query language that lets you retrieve data from the database. It is like SQL, but for graphs.

APOC

Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.

Bolt

Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.

ACID

Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.

eventual consistency

A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.

causal consistency

A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.

NULL

The null marker is not a type but a placeholder for absence of value. For more information, see Cypher → Working with null.

transaction

A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.

backpressure

Backpressure is a force opposing the flow of data. It ensures that the client is not being overwhelmed by data faster than it can handle.

transaction function

A transaction function is a callback executed by an ExecuteRead or ExecuteWrite call. The driver automatically re-executes the callback in case of server failure.

DriverWithContext

A DriverWithContext object holds the details required to establish connections with a Neo4j database.