Home Reference Source Test

Eventstore typescript / ES6 javascript client lib

License Build Status Quality Gate Status Coverage Lines of Code

Security Rating Reliability Rating Maintainability Rating Vulnerabilities doc coverage

Eventstore node.js javascript client library written in typescript with ES6 async/await syntax.

This repository is in BETA status

Main focus on this lib are:

  • available for typescript
  • available for ES6 with async/await syntax
  • well documented
  • clean code
  • proper code readability
  • proper testing
  • nice error handling

Documentation

Full documentation is available at https://sebastianwessel.github.io/eventstore-ts-client/

Requirements

You will need to have node.js version >=10 installed to use this lib because it's based on async iterators.
It's tested against eventstore version 5, but should also work on lower versions in general.
Expect some shortcut function for accessing standard projections which are not part of lower eventstore versions.

Installation

Installation is as simple as most packages.
Just install package in your projects root with:

npm i --save eventstore-ts-client

Quick-Start

const {Eventstore, Event} = require('eventstore-ts-client')
const es = new Eventstore({
  uri: 'tcp://admin:changeit@127.0.0.1:1113'
})
await es.connect()


const eventA = new Event('EventA',{
  some: 'string data',
  num : 1
})
await es.atStream('mystream').append(eventA)

const eventB = new Event('EventB',{
  text: 'other string',
  count : 2
})

eventB.correlationId = eventA.id
await es.atStream('mystream').append(eventB)

const eventC = new Event('EventC')
const eventD = new Event('EventD')

await es.atStream('mystream').append([eventC, eventD])

const events = await es
      .stream('mystream')
      .walkStreamForward()

for await (const event of events) {
  console.log(event.name)
}

await es.close()

For full documentation please visit: https://sebastianwessel.github.io/eventstore-ts-client/

Building

To build this lib just clone this repo and run:

npm install
npm run build

Maybe you need to change file attributes of scripts inside of scripts/ folder to be executable

Test lib

eventstore-ts-client tests are heavily based on integration tests.

Test are running against a 3 node eventstore cluster which will configured and filled with some test data and settings.
You will need to have docker installed on your machine to be able to run tests.
The setup contains a script which does all stuff for you - just type:

npm run test

Running tests will also generate some code coverage report to be used in tools like sonarqube

ToDo

  • catch-up subscription
  • ACL handling
  • code review and cleanup
  • improve tests and documentation

Apache 2.0 License

see LICENSE

General

This library tries to help you to communicate with eventstore in a smooth way.

There are a couple of things like error handling or function naming which should help you to write nice and speaking code.

It follows async/await style to avoid "callback hell" as well.

Speaking code

Instead of having some base function with lots of parameters this lib provides some nice syntax and alias functions.
Instead of writing:

var esConnection = esClient.createConnection(connSettings, "tcp://localhost:1113");
esConnection.connect();

var eventId = uuid.v4();
var eventData = {
  a : Math.random(),
  b : uuid.v4()
};
var event = esClient.createJsonEventData(eventId, eventData, null, 'TestWasDone');

esConnection.appendToStream('streamName', esClient.expectedVersion.any, event,someDifferentCredentials)
  .then(function(result) {
      console.log("Stored event:", eventId);
  })

you should write something like:

const eventstore = new Eventstore('tcp://localhost:1113')
await eventstore.connect()

const eventTestWasDone = new Event('TestWasDone',{
  a : Math.random(),
  b : uuid.v4()
})

await eventstore
  .atStream('streamName')
  .withCredentials(someDifferentCredentials)
  .requiresMaster()
  .append(eventTestWasDone)

 console.log("Stored event:", eventTestWasDone.id)

As you can see your code becomes a lot more self speaking and also someone who isn't familiar with your program or even unfamiliar with javascript/typescript is able to understand what these lines of code are doing.

Of course you're able to use function parameters as well if you prefer it, but it's not recommended

Error handling

This client lib also uses named errors - so no need to parse error messages.
Just use error.name to identify different errors.
Also the error instance may contain an additional field causedBy which will hold some error object if there was an error before which causes the current error.

List of possible errors:

  • EventstoreAccessDeniedError
  • EventstoreAlreadyExistError
  • EventstoreBadRequestError
  • EventstoreCommitTimeoutError
  • EventstoreConnectionError
  • EventstoreDoesNotExistError
  • EventstoreForwardTimeoutError
  • EventstoreImplementationError
  • EventstoreInvalidTransactionError
  • EventstoreNoStreamError
  • EventstoreNotAuthenticatedError
  • EventstoreNotFoundError
  • EventstoreNotHandledError
  • EventstoreNotModifiedError
  • EventstoreOperationError
  • EventstorePrepareTimeoutError
  • EventstoreProtocolError
  • EventstoreStreamDeletedError
  • EventstoreTimeoutError
  • EventstoreUnspecificError
  • EventstoreWrongExpectedVersionError

Request response queue

This lib does not use a queue for outgoing requests and commands.
Most outgoing requests will result in promises which will get resolved as soon as an valid response without error code arrives.
If an connection to eventstore get lost while sending a request you will get an connection error immediately and this library will not try to resend requests for you.

I strongly believe it's up to program logic to handle unexpected lost connection failures while sending some requests.
It depends very hard on each usecase how to handle such failures and also if it's possible to reconnect or not.
To avoid unsolvable questions like "What to do with requests when..." and because tcp connections are duplex connections there is no request queue. This library tries to do as much as possible in real time and/or in asynchronous fashion.

If a correlating response for a request contains some error code the request promise will be rejected with correlating error.
If no response arrives within given timeout setting the request will promise will be rejected with an EventstoreTimeoutError.

Eventstore

Connecting

Main connection parameters are set inside of connection uri
This lib supports connecting to single instance via tcp and auto discovering to clusters also.
Connections can be unsecured or encrypted

// connection to single instance at local at port 1113 with username and password
const es = new Eventstore({
  uri: 'tcp://username:userpassword@127.0.0.1:1113'
})

await es.connect()
// connection to cluster at domain escluster.net at port 2112 with username and password
const es = new Eventstore({
  uri: 'discover://username:password@escluster.net:2112'
})

await es.connect()

//...same with some additional properties like encryption and connect to master only
const es = new Eventstore({
  uri: 'discover://username:password@escluster.net:2112',
  requireMaster:true,
  useSSL: true,
  validateServer: true,
  secureContext: {
    ca: fs.readFileSync('./rootCA.crt'),
    key: fs.readFileSync('./domain.key'),
    cert: fs.readFileSync('./domain.crt')
  }
})

await es.connect()

Disconnecting

It's highly recommended to close a connection in save way.
You should use es.disconnect() for proper shut down, because this function sets current connection to state drain. This means the connection does not accept outgoing requests any longer and waits if necessary for outstanding responses from eventstore.

Emitted events

  • connected emitted as soon as connection is established
  • secureConnect emitted when connection is secured connected
  • ready emitted after connected after authentication and identify client
  • reconnect emitted as soon as lib tries to reconnect (param: reconnect count)
  • close emitted as soon as connection is closed
  • drain emitted when connection drains existing requests before connection close
  • error emitted on connection errors (param: error)

Streams

Accessing steams

You can use different methods to get an stream instance:

  • stream('streamId')
  • fromStream('streamId')
  • atStream('streamId')

Technically they do same, but with these different named function calls you can improve readability of your code. You can use it like:

const eventstore = new Eventstore()
await eventstore.connect()

await eventstore
      .stream('streamId')
      .softDelete()

or if you know you will do some read operation:

const eventstore = new Eventstore()
await eventstore.connect()

const metadata = await eventstore
      .fromStream('userstream')
      .getMetadata()

or if you do some write operation do it this way:

const eventstore = new Eventstore()

const eventUsernameChanged = new Event('UsernameChanged')
await eventstore.connect()

await eventstore
      .atStream('userstream')
      .append(eventUsernameChanged)

Writing to streams

Writing to streams is simple by using .append() function at a stream instance.

const eventstore = new Eventstore()
await eventstore.connect()

const eventA = new Event('EventA',{
  some: 'string data',
  num : 1
})
await eventstore.atStream('mystream').append(eventA)

const eventB = new Event('EventB',{
  text: 'other string',
  count : 2
})

eventB.correlationId = eventA.id
await eventstore.atStream('mystream').append(eventB)

const eventC = new Event('EventC')
const eventD = new Event('EventD')

await eventstore.atStream('mystream').append([eventC, eventD])

Reading from stream

It's recommended to use an async iterator to fetch events from streams.

const eventstore = new Eventstore()
await eventstore.connect()

const events = await eventstore
      .stream('streamId')
      .walkStreamForward()

for await (const event of events) {
  console.log(event.name)
}

You can use handy functions of async iterator returned by .walkStreamForward() and .walkStreamBackward(). They are similar to array functions:

  • map()
  • filter()
  • forEach()
  • reduce()
  • toArray()
  • every()
const walker = await eventstore
  .stream('streamId')
  .walkStreamForward()

const result = await walker
  .filter((event) => event.name != 'EventB')
  .map((event) => {
    return event.name
  })
  .toArray()

console.log(result)

Reading all events

It's possible to read all events from eventstore.
It uses same behavior as regular async iterator for streams but it needs admin rights.

const eventstore = new Eventstore()
await eventstore.connect()

const events = await eventstore.walkAllForward()

for await (const event of events) {
  console.log(event.name)
}

or it reverse from end to beginning

const eventstore = new Eventstore()
await eventstore.connect()

const events = await eventstore.walkAllBackward()

for await (const event of events) {
  console.log(event.name)
}

System projections

Eventstore comes with some handy projections which are available if you have enabled system projections in your server config and your current user acl is not disallowing access to them.

You can use some functions to access these streams:

  • walkEventsByStreamCategory
  • walkEventsByType
  • walkEventsByCorrelationId
  • streamNamesByCategory
const eventstore = new Eventstore()
await eventstore.connect()

const events = await eventstore.walkEventsByType('sometype')

for await (const event of events) {
  console.log(event.name)
}

Subscriptions

There are 3 different subscription types.
Simple stream subscriptions, persistent subscriptions and catchUp subscriptions.

Subscribe to stream

You can use subscribe() at stream instances and simply adding listener(s) to returned subscription.
There are two options.

Listen to all events:

const eventstore = new Eventstore()
await eventstore.connect()

subscription = await eventstore
      .stream('streamId')
      .subscribe()

subscription.on('event', (event) => {
  console.log(event.name)
})

Listen for specific event:

const eventstore = new Eventstore()
await eventstore.connect()

subscription = await eventstore
      .stream('streamId')
      .subscribe()

// listen for "event-"+ lowercase event name
subscription.on('event-specificeventa', (event) => {
  console.log('log only SpecificEventA')
})

With this approach you're able to add different listeners for different needs to one single stream subscription.

Persistent subscription

You can create persistent subscriptions (needs admin rights)

const eventstore = new Eventstore()
await eventstore.connect()

const persistentSubscription = await eventstore
  .atStream('myStream')
  .withCredentials({username: 'admin', password: 'changeit'})
  .createPersistentSubscription('persistentsubscription')

You can update an existing persistent subscription (needs admin rights)

const eventstore = new Eventstore()
await eventstore.connect()

const newConfig = {
    messageTimeoutMilliseconds : 30000
  }

const persistentSubscription = await eventstore
  .atStream('myStream')
  .withCredentials({username: 'admin', password: 'changeit'})
  .getPersistentSubscription('persistentsubscription')
  .update(newConfig)

You can delete an existing persistent subscription (needs admin rights)

const eventstore = new Eventstore()
await eventstore.connect()

const newConfig = {
    messageTimeoutMilliseconds : 30000
  }

const persistentSubscription = eventstore
  .atStream('myStream')
  .withCredentials({username: 'admin', password: 'changeit'})
  .getPersistentSubscription('persistentsubscription')

await persistentSubscription.delete()

You can connect to an existing persistent subscription

const eventstore = new Eventstore()
await eventstore.connect()

const newConfig = {
    messageTimeoutMilliseconds : 30000
  }

const persistentSubscription = eventstore
  .atStream('myStream')
  .getPersistentSubscription('persistentsubscription')

await persistentSubscription.subscribe()

Unsubscribing an existing persistent subscription is also pretty easy

await persistentSubscription.unsubscribe()

CatchUp subscriptions

/** TODO **/
not implemented yet

Emitted events

  • subscribed emitted after subscription is established
  • dropped emitted when subscription is dropped
  • event emitted when event appeared (param: Event)
  • event-eventnametolowercase emitted when event appeared (param: Event)
  • error emitted in error case (param: Error)

Events

Events are holding your stored information and business data.
A single event is written to a stream and contains a name, a uuid, a creation timestamp and maybe metadata and data.

If you get events during read operations as result be aware that these events are immutable or if you write an event to a stream this event will also become immutable. Reason for this is quite simple: You can't change the past.
If you try to change already written events will result in throwing EventstoreOperationError.

Creating event

You can create a event like this:

const newEvent = new Event('MyEventName')

newEvent.data = {some:'data'}
newEvent.metadata = {some:'metadata'}

// SAME AS

const newEvent = new Event('MyEventName',{some:'data'},{some:'metadata'})

Correlation id

In real world events are often created because of a previously appeared event.
For example because of event SomeRequest appeared you will have an event ResponseForRequestSend. For not loosing this logical connection between events you can use $correlationID field in events metadata.

As starting with eventstore version 5 you will have some new system projection using this special field.

You can use handy functions for setting a correlation id on new events:

const someRequestEvent = new Event('SomeRequest')
someRequestEvent.correlationId = someRequestEvent.id


await eventstore
      .atStream('somestream')
      .append(someRequestEvent)

const someResponseEvent = new Event('ResponseForRequestSend',null,{$correlationId: someRequestEvent.correlationId})

await eventstore
      .atStream('somestream')
      .append(someResponseEvent)


// OR IN NICE WAY

const someRequestEvent = new Event('SomeRequest')
someRequestEvent.correlationId = someRequestEvent.id


await eventstore
      .atStream('somestream')
      .append(someRequestEvent)

const someResponseEvent = someRequestEvent.causesEvent('ResponseForRequestSend')

await eventstore
      .atStream('somestream')
      .append(someResponseEvent)


// AND READING (eventstore version >= 5 & system projections enabled):

const events = await eventstore.walkEventsByCorrelationId(someRequestEvent.id)

for await (const event of events) {
  console.log(event.name)
}

//outputs
//SomeRequest
//ResponseForRequestSend

Testing

Quality Gate Status Coverage codecov doc coverage Lines of Code Bugs Code Smells Security Rating Reliability Rating Maintainability Rating Vulnerabilities

Tests are implemented as integration tests against a real eventstore cluster.
You can find code coverage analysis at sonarcloud.com and at codecov.io

Requirements

  • node > 10
  • linux or mac for running bash scripts
  • installed Docker

Running tests

  • install all dependencies npm install
  • run tests npm run test

Test suite

The whole tests are configured and started by scripts/allTest.sh

  • starting eventstore with 3 cluster instances
  • building test container - a docker container containing this repository
  • adding test users to eventstore
  • setting up acl for eventstore
  • adding test streams to eventstore
  • starting test container and running tests within that container
  • shutting down eventstore clusters