Home Reference Source Test

src/eventstore/Eventstore.ts

import {setConnectionSettings, EventstoreSettings, UserCredentials} from './EventstoreSettings'
import {EventEmitter} from 'events'
import {Stream, StreamOptions, StreamPosition} from '../stream'
import * as bunyan from 'bunyan'
import {TCPConnection} from './TCPConnection'
import uuid = require('uuid/v4')
import Long = require('long')
import {EventstoreCommand} from '../protobuf/EventstoreCommand'
import * as model from '../protobuf/model'
import {Position} from './Position'
import {StreamWalker} from '../StreamWalker'
import {Event} from '../event'
import * as eventstoreErrors from '../errors'

/** protobuf shorthand */
const protobuf = model.eventstore.proto

/**
 * @typedef {object} WriteResult
 * @property {Long | number} firstEventNumber
 * @property {Long | number} lastEventNumber
 * @property {Position} position
 */
export interface WriteResult {
  firstEventNumber: Long | number
  lastEventNumber: Long | number
  position: Position
}

/**
 * Base class to communicate with eventstore
 * @emits{connected} emitted as soon as connection is established
 * @emits{secureConnect} emitted when connection is secured connected
 * @emits{ready} emitted after connected after authentication and identify client
 * @emits{reconnect} emitted as soon as lib tries to reconnect (param: reconnect count)
 * @emits{close} emitted as soon as connection is closed
 * @emits{drain} emitted when connection drains existing requests before connection close
 * @emits{error} emitted on connection errors (param: error)
 */
export class Eventstore extends EventEmitter {
  /** connection config */
  protected connectionConfig: EventstoreSettings
  /** logger */
  public log: bunyan
  /** connection base */
  protected connection: TCPConnection
  /** default read slice size */
  protected defaultSliceSize: number = 100

  /**
   * Creates an instance of Eventstore.
   */
  public constructor(connectionConfiguration: EventstoreSettings | object = {}) {
    super()
    this.connectionConfig = setConnectionSettings(connectionConfiguration)
    this.log = this.connectionConfig.logger
    this.connection = new TCPConnection({...this.connectionConfig})

    this.on(
      'error',
      (err): void => {
        //prevent throwing error
        this.log.error({err}, 'Eventstore error')
      }
    )
  }

  /**
   * Ensure to use up-to-date settings, logger and a fresh connection socket
   */
  protected init(connectionConfiguration: EventstoreSettings | {} = {}): void {
    this.connectionConfig = {...this.connectionConfig, ...connectionConfiguration}
    this.log = this.connectionConfig.logger
    this.connection = new TCPConnection(this.connectionConfig)
    this.connection.on(
      'error',
      (err): void => {
        this.log.error({err}, err.name)
        this.emit('error', err)
      }
    )
    this.connection.on(
      'secureConnect',
      (): void => {
        this.log.debug('secure connected')
        this.emit('secureConnect')
      }
    )
    this.connection.on(
      'drain',
      (): void => {
        this.log.debug('connection is draining')
        this.emit('drain')
      }
    )
    this.connection.on(
      'close',
      (): void => {
        this.log.debug('connection is closed')
        this.emit('close')
      }
    )
    this.connection.on(
      'connected',
      (): void => {
        this.log.debug('connected to eventstore')
        this.emit('connected')
      }
    )
    this.connection.on(
      'reconnect',
      (reconnectCount: number): void => {
        this.log.debug({reconnectCount}, 'reconnecting to eventstore')
        this.emit('reconnect', reconnectCount)
      }
    )
  }

  /**
   * Returns client id - name of eventstore connection
   */
  public get name(): string {
    return this.connectionConfig.clientId
  }

  /**
   * Connect to eventstore
   */
  public async connect(connectionConfiguration: EventstoreSettings | object = {}): Promise<void> {
    this.init(connectionConfiguration)
    await this.connection.connect()

    try {
      await this.authenticate()
      await this.identifyClient()
      this.log.debug('connection ready')
      this.emit('ready')
    } catch (err) {
      await this.disconnect()
      throw err
    }
  }

  /**
   * Disconnect from eventstore and try to drain pending requests
   */
  public disconnect(): Promise<void> {
    return this.connection.disconnect()
  }

  /**
   * Indicates if connection to eventstore is available
   */
  public get isConnected(): boolean {
    return this.connection.isConnected
  }

  /**
   * Returns current connection
   */
  public getConnection(): TCPConnection {
    return this.connection
  }

  /**
   * Get current logger instance
   */
  public get logger(): bunyan {
    return this.log
  }

  /**
   * Set logger instance
   */
  public set logger(newLogger: bunyan) {
    this.connectionConfig.logger = newLogger
  }

  /**
   * Get a stream instance specified by streamName
   * You can also use one of the alias functions fromStream or atStream
   */
  public stream(streamName: string, streamOptions?: StreamOptions): Stream {
    const defaultOptions: StreamOptions = {
      requireMaster: false,
      resolveLinks: true,
      credentials: null
    }
    return new Stream(this, streamName, {...defaultOptions, ...streamOptions})
  }

  /**
   * Get a stream instance specified by streamName
   * Alias for method stream
   */
  public fromStream(streamName: string, streamOptions?: StreamOptions): Stream {
    return this.stream(streamName, streamOptions)
  }

  /**
   * Get a stream instance specified by streamName
   * Alias for method stream
   */
  public atStream(streamName: string, streamOptions?: StreamOptions): Stream {
    return this.stream(streamName, streamOptions)
  }

  /**
   * Ping eventstore
   */
  public async ping(): Promise<void> {
    await new Promise(
      (resolve, reject): void => {
        this.connection.sendCommand(
          uuid(),
          EventstoreCommand.Ping,
          null,
          this.connectionConfig.credentials,
          {
            resolve,
            reject
          }
        )
      }
    )
  }

  /**
   * Called directly after connecting to eventstore
   * Identifies connection against eventstore
   * Identification can be set in connection settings field clientId
   */
  protected async identifyClient(): Promise<void> {
    await new Promise(
      (resolve, reject): void => {
        this.log.debug(`Identify as ${this.connectionConfig.clientId}`)
        const raw = protobuf.IdentifyClient.fromObject({
          version: 1,
          connectionName: this.connectionConfig.clientId
        })
        this.connection.sendCommand(
          uuid(),
          EventstoreCommand.IdentifyClient,
          Buffer.from(protobuf.IdentifyClient.encode(raw).finish()),
          this.connectionConfig.credentials,
          {
            resolve,
            reject
          }
        )
      }
    )
  }

  /**
   * Authenticate with credentials from settings
   */
  protected async authenticate(): Promise<void> {
    await new Promise(
      (resolve, reject): void => {
        this.log.debug(`Authenticate`)
        this.connection.sendCommand(
          uuid(),
          EventstoreCommand.Authenticate,
          null,
          this.connectionConfig.credentials,
          {
            resolve,
            reject
          }
        )
      }
    )
  }

  /**
   * Reads a slice of events from current stream
   */
  protected async readSlice(
    direction: EventstoreCommand,
    position: Position,
    maxSliceCount: number,
    resolveLinks: boolean,
    requireMaster: boolean,
    credentials: UserCredentials | null
  ): Promise<model.eventstore.proto.ReadAllEventsCompleted> {
    return await new Promise(
      (resolve, reject): void => {
        const raw = protobuf.ReadAllEvents.fromObject({
          commitPosition: position.commitPosition,
          preparePosition: position.preparePosition,
          maxCount: maxSliceCount,
          resolveLinks,
          requireMaster
        })
        this.connection.sendCommand(
          uuid(),
          direction,
          Buffer.from(protobuf.ReadAllEvents.encode(raw).finish()),
          credentials,
          {
            resolve,
            reject
          }
        )
      }
    )
  }

  /**
   * Reads a slice from current stream in forward direction
   */
  public async readSliceForward(
    position: Position,
    maxSliceCount?: number,
    resolveLinks: boolean = true,
    requireMaster?: boolean,
    credentials?: UserCredentials | null
  ): Promise<model.eventstore.proto.ReadAllEventsCompleted> {
    if (maxSliceCount === undefined) {
      maxSliceCount = this.defaultSliceSize
    }
    return await this.readSlice(
      EventstoreCommand.ReadAllEventsForward,
      position,
      maxSliceCount,
      resolveLinks,
      requireMaster || this.connectionConfig.requireMaster,
      credentials || this.connectionConfig.credentials
    )
  }

  /**
   * Reads a slice from current stream in backward direction
   */
  public async readSliceBackward(
    position: Position,
    maxSliceCount?: number,
    resolveLinks: boolean = true,
    requireMaster?: boolean,
    credentials?: UserCredentials | null
  ): Promise<model.eventstore.proto.ReadAllEventsCompleted> {
    if (maxSliceCount === undefined) {
      maxSliceCount = this.defaultSliceSize
    }
    return await this.readSlice(
      EventstoreCommand.ReadAllEventsBackward,
      position,
      maxSliceCount,
      resolveLinks,
      requireMaster || this.connectionConfig.requireMaster,
      credentials || this.connectionConfig.credentials
    )
  }

  // eslint-disable-next-line @typescript-eslint/explicit-function-return-type
  /**
   * Walks all events forward
   */
  public async walkAllForward(
    start: Position = Position.Start,
    resolveLinks: boolean = true,
    requireMaster?: boolean,
    credentials?: UserCredentials | null
  ): Promise<StreamWalker> {
    const that = this
    if (requireMaster === undefined) {
      requireMaster = this.connectionConfig.requireMaster
    }
    if (credentials === undefined) {
      credentials = this.connectionConfig.credentials
    }
    // eslint-disable-next-line @typescript-eslint/explicit-function-return-type
    const asyncGenerator = async function*(begin: Position) {
      let index = 0
      //fetch first slice
      let readResult = that.readSliceForward(
        begin,
        that.defaultSliceSize,
        resolveLinks,
        requireMaster,
        credentials
      )
      let result = await readResult
      let maxSlicePosition = new Position(result.commitPosition, result.preparePosition)
      begin = new Position(result.nextCommitPosition, result.nextPreparePosition)
      if (begin.compareTo(maxSlicePosition) >= 0) {
        //we have more so start fetching in background
        readResult = that.readSliceForward(
          begin,
          that.defaultSliceSize,
          resolveLinks,
          requireMaster,
          credentials
        )
      }
      while (true) {
        if (index < result.events.length) {
          const entry = result.events[index++]
          yield Event.fromRaw(entry.event || entry.link)
        } else if (begin.compareTo(maxSlicePosition) <= 0) {
          return null
        } else {
          index = 0
          //wait for background fetch and grab result
          result = await readResult
          maxSlicePosition = new Position(result.commitPosition, result.preparePosition)
          begin = new Position(result.nextCommitPosition, result.nextPreparePosition)
          if (begin.compareTo(maxSlicePosition) > 0) {
            //if there are more events start fetching in background

            readResult = that.readSliceForward(
              begin,
              that.defaultSliceSize,
              resolveLinks,
              requireMaster,
              credentials
            )
          }
        }
      }
    }

    return new StreamWalker(asyncGenerator(start))
  }

  /**
   * Walks all events backward
   */
  public async walkAllBackward(
    start: Position = Position.End,
    resolveLinks: boolean = true,
    requireMaster?: boolean,
    credentials?: UserCredentials | null
  ): Promise<StreamWalker> {
    const that = this
    if (requireMaster === undefined) {
      requireMaster = this.connectionConfig.requireMaster
    }
    if (credentials === undefined) {
      credentials = this.connectionConfig.credentials
    }
    // eslint-disable-next-line @typescript-eslint/explicit-function-return-type
    const asyncGenerator = async function*(begin: Position) {
      let index = 0
      //fetch first slice
      let readResult = that.readSliceBackward(
        begin,
        that.defaultSliceSize,
        resolveLinks,
        requireMaster,
        credentials
      )
      let result = await readResult

      let maxSlicePosition = new Position(result.commitPosition, result.preparePosition)
      begin = new Position(result.nextCommitPosition, result.nextPreparePosition)
      if (begin.compareTo(maxSlicePosition) < 0) {
        //we have more so start fetching in background
        readResult = that.readSliceBackward(
          begin,
          that.defaultSliceSize,
          resolveLinks,
          requireMaster,
          credentials
        )
      }
      while (true) {
        if (index < result.events.length) {
          const entry = result.events[index++]
          yield Event.fromRaw(entry.event || entry.link)
        } else if (begin.compareTo(maxSlicePosition) >= 0) {
          return null
        } else {
          index = 0
          //wait for background fetch and grab result
          result = await readResult
          maxSlicePosition = new Position(result.commitPosition, result.preparePosition)
          begin = new Position(result.nextCommitPosition, result.nextPreparePosition)
          if (begin.compareTo(maxSlicePosition) < 0) {
            //if there are more events start fetching in background

            readResult = that.readSliceBackward(
              begin,
              that.defaultSliceSize,
              resolveLinks,
              requireMaster,
              credentials
            )
          }
        }
      }
    }

    return new StreamWalker(asyncGenerator(start))
  }

  /** Resolves a link  */
  public async resolveLink(
    link: Event,
    requireMaster?: boolean,
    credentials?: UserCredentials | null
  ): Promise<Event | null> {
    if (!link.isLink()) {
      return link
    }

    if (requireMaster === undefined) {
      requireMaster = this.connectionConfig.requireMaster
    }

    if (typeof link.data !== 'string') {
      throw eventstoreErrors.newProtocolError('Invalid link data')
    }

    const linkInfo = link.data.split('@')
    let streamName = linkInfo[1]
    let eventNumber = Long.fromValue(linkInfo[0])

    const stream = this.stream(streamName)
    if (stream.isMetaStream()) {
      this.log.debug({streamName}, 'Getting specific event from meta is not supported')
      return null
    }
    if (requireMaster) {
      stream.requiresMaster()
    }
    if (credentials) {
      stream.withCredentials(credentials)
    }
    const event = await stream.getEventByNumber(eventNumber)
    if (!event) {
      throw eventstoreErrors.newNotFoundError('Event could not be found')
    }
    return event
  }

  /**
   * Get all events from given stream category
   * (eventstore system projections must be enabled)
   */
  public async walkEventsByStreamCategory(
    category: string,
    start: Long | number = StreamPosition.Start,
    resolveLinks: boolean = true,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<StreamWalker> {
    const stream = this.stream(`$ce-${category}`)
    return stream.walkStreamForward(start, resolveLinks, requireMaster, credentials)
  }

  /**
   * Get all events by event type (event name)
   * (eventstore system projections must be enabled)
   */
  public async walkEventsByType(
    eventType: string,
    start: Long | number = StreamPosition.Start,
    resolveLinks: boolean = true,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<StreamWalker> {
    const stream = this.stream(`$et-${eventType}`)
    return stream.walkStreamForward(start, resolveLinks, requireMaster, credentials)
  }

  /**
   * Get all events by correlation id
   * (eventstore system projections must be enabled and eventstore server version >=5 must be installed)
   */
  public async walkEventsByCorrelationId(
    correlationId: string,
    start: Long | number = StreamPosition.Start,
    resolveLinks: boolean = true,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<StreamWalker> {
    const stream = this.stream(`$bc-${correlationId}`)

    return stream.walkStreamForward(start, resolveLinks, requireMaster, credentials)
  }

  /**
   * Get all stream names by stream category
   * (eventstore system projections must be enabled)
   */
  public async streamNamesByCategory(
    category: string,
    start: Long | number = StreamPosition.Start,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<StreamWalker> {
    const stream = this.stream(`$category-${category}`)
    return stream.walkStreamForward(start, false, requireMaster, credentials)
  }
}