Home Reference Source Test

src/stream/Stream.ts

import {Eventstore, WriteResult, Position} from '../eventstore'
import {Event} from '../event'
import uuid = require('uuid/v4')
import * as bunyan from 'bunyan'
import * as model from '../protobuf/model'
import {EventstoreCommand} from '../protobuf/EventstoreCommand'
import {ExpectedVersion} from '../protobuf/ExpectedVersion'
import {StreamPosition} from './StreamPosition'
import {Transaction} from './Transaction'
import {
  Subscription,
  PersistentSubscriptionConfig,
  setPersistentSubscriptionConfig,
  PersistentSubscription
} from '../subscription'
import * as eventstoreError from '../errors'
import {UserCredentials} from '../eventstore/EventstoreSettings'
import Long = require('long')
import {JSONValue} from '../JSON'
import {StreamWalker} from '../StreamWalker'

/** protobuf shorthand */
const protobuf = model.eventstore.proto

/**
 * @typedef {object} StreamOptions
 * @property {boolean} requireMaster indicates if operations require master node
 * @property {boolean} resolveLinks indicates if event links should be resolved on read operations
 * @property {UserCredentials | null} credentials user credentials if others than default connection credentials
 */
export interface StreamOptions {
  requireMaster: boolean
  resolveLinks: boolean
  credentials: UserCredentials | null
}

/**
 * Base class for handling a stream
 */
export class Stream {
  /** eventstore instance */
  protected esConnection: Eventstore
  /** bunyan logger */
  public log: bunyan
  /** id of stream */
  protected streamId: string
  /** stream options */
  protected options: StreamOptions
  /** default read slice size */
  protected defaultSliceSize: number = 100

  /**
   * Creates an instance of Stream.
   */
  public constructor(eventstore: Eventstore, streamId: string, options: StreamOptions) {
    this.esConnection = eventstore
    this.streamId = streamId
    this.log = this.esConnection.logger.child
      ? this.esConnection.logger.child({module: 'Stream'})
      : this.esConnection.logger

    this.options = options
  }

  /**
   * Return name of stream instance
   */
  public get name(): string {
    return 'Stream: ' + this.streamId
  }

  /**
   * Gets id  of stream
   */
  public get id(): string {
    return this.streamId
  }

  /**
   * Enforces to use master node for any read/write operation
   */
  public requiresMaster(): Stream {
    this.options.requireMaster = true
    return this
  }

  /**
   * Set credentials for any read/write operation
   */
  public withCredentials(credentials: UserCredentials): Stream {
    this.options.credentials = credentials
    return this
  }

  /**
   * Enforce to resolve links on read operations
   */
  public resolveAllLinks(): Stream {
    this.options.resolveLinks = true
    return this
  }

  /**
   * Appends array of events to stream
   */
  protected appendEvents(
    events: Event[],
    expectedVersion: ExpectedVersion | number | Long,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<WriteResult> {
    if (requireMaster === undefined) {
      requireMaster = this.options.requireMaster
    }
    const eventArrayTransformed: model.eventstore.proto.NewEvent[] = events.map(
      (event): model.eventstore.proto.NewEvent => {
        if (!event.isNew()) {
          throw eventstoreError.newOperationError(
            `Event ${event.name} is already stored in eventstore`
          )
        }
        return event.toRaw()
      }
    )

    const raw = protobuf.WriteEvents.fromObject({
      eventStreamId: this.streamId,
      expectedVersion,
      events: eventArrayTransformed,
      requireMaster: requireMaster === undefined ? this.options.requireMaster : requireMaster
    })
    return new Promise(
      (resolve, reject): void => {
        const setToWritten = (): void => {
          events.forEach((event): void => event.freeze())
          resolve()
        }

        this.esConnection
          .getConnection()
          .sendCommand(
            uuid(),
            EventstoreCommand.WriteEvents,
            Buffer.from(protobuf.WriteEvents.encode(raw).finish()),
            credentials || this.options.credentials,
            {
              resolve: setToWritten,
              reject
            }
          )
      }
    )
  }

  /**
   * Indicates if given stream is a metadata stream or a regular steam
   */
  public isMetaStream(): boolean {
    return this.streamId.startsWith('$$')
  }

  /**
   * Append single event or array of events to stream
   */
  public async append(
    event: Event | Event[],
    expectedVersion: ExpectedVersion | number | Long = -2,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<WriteResult> {
    if (Array.isArray(event)) {
      return await this.appendEvents(event, expectedVersion, requireMaster, credentials)
    } else {
      return await this.appendEvents([event], expectedVersion, requireMaster, credentials)
    }
  }

  /**
   * Hard deletes a stream - stream with same name can not be used in future
   */
  public async hardDelete(
    expectedVersion: ExpectedVersion = ExpectedVersion.Any,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<Position> {
    this.log.debug(
      {fn: 'hardDelete', streamId: this.streamId},
      `Hard delete Stream ${this.streamId}`
    )
    return await this.delete(true, expectedVersion, requireMaster, credentials)
  }

  /**
   * Soft deletes a stream - stream with same name can be used in future and indexes are preserved
   */
  public async softDelete(
    expectedVersion: ExpectedVersion = ExpectedVersion.Any,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<Position> {
    this.log.debug(
      {fn: 'softDelete', streamId: this.streamId},
      `Soft delete Stream ${this.streamId}`
    )
    return await this.delete(false, expectedVersion, requireMaster, credentials)
  }

  /**
   * Delete a stream - can't be called directly
   * Use softDelete or hardDelete instead
   */
  protected delete(
    hardDelete: boolean,
    expectedVersion: ExpectedVersion,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<Position> {
    if (this.isMetaStream()) {
      throw eventstoreError.newBadRequestError(
        `You can not delete metadata stream ${this.streamId}`
      )
    }
    if (requireMaster === undefined) {
      requireMaster = this.options.requireMaster
    }
    return new Promise(
      (resolve, reject): void => {
        const raw = protobuf.DeleteStream.fromObject({
          eventStreamId: this.streamId,
          expectedVersion,
          requireMaster: requireMaster === undefined ? this.requiresMaster : requireMaster,
          hardDelete
        })
        this.esConnection
          .getConnection()
          .sendCommand(
            uuid(),
            EventstoreCommand.DeleteStream,
            Buffer.from(protobuf.DeleteStream.encode(raw).finish()),
            credentials || this.options.credentials,
            {
              resolve,
              reject
            }
          )
      }
    )
  }

  /**
   * Get event at specified position from stream
   */
  public async getEventByNumber(
    eventNumber: Long | number,
    resolveLinks?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<Event | null> {
    if (requireMaster === undefined) {
      requireMaster = this.options.requireMaster
    }
    const result: model.eventstore.proto.IResolvedIndexedEvent = await new Promise(
      (resolve, reject): void => {
        const raw = protobuf.ReadEvent.fromObject({
          eventStreamId: this.streamId,
          eventNumber: eventNumber,
          resolveLinkTos: resolveLinks === undefined ? this.options.resolveLinks : resolveLinks,
          requireMaster: requireMaster === undefined ? this.options.requireMaster : requireMaster
        })
        this.esConnection
          .getConnection()
          .sendCommand(
            uuid(),
            EventstoreCommand.ReadEvent,
            Buffer.from(protobuf.ReadEvent.encode(raw).finish()),
            credentials || this.options.credentials,
            {
              resolve,
              reject
            }
          )
      }
    )
    if (!result.event && !result.link) {
      return null
    }

    return Event.fromRaw(result.event || result.link)
  }

  /**
   * Returns first event from stream
   */
  public async getFirstEvent(
    resolveLinks?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<Event | null> {
    return await this.getEventByNumber(
      StreamPosition.Start,
      resolveLinks,
      requireMaster,
      credentials
    )
  }

  /**
   * Returns last event from stream
   */
  public async getLastEvent(
    resolveLinks?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<Event | null> {
    return await this.getEventByNumber(StreamPosition.End, resolveLinks, requireMaster, credentials)
  }

  /**
   * Returns stream metadata if set or
   */
  public async getMetadata(
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<
    | {
        $correlationId?: string
      } & {[k: string]: JSONValue}
    | null
  > {
    if (this.isMetaStream()) {
      throw eventstoreError.newBadRequestError(
        `You can not get metadata of metadata stream ${this.streamId}`
      )
    }
    try {
      const metadataEvent = await this.esConnection
        .fromStream(`$$${this.streamId}`, {
          resolveLinks: true,
          requireMaster: requireMaster === undefined ? this.options.requireMaster : requireMaster,
          credentials: credentials || this.options.credentials
        })
        .getLastEvent()

      if (metadataEvent) {
        metadataEvent.freeze()
        // eslint-disable-next-line @typescript-eslint/no-angle-bracket-type-assertion
        return {...(<object>metadataEvent.data)}
      } else return null
    } catch (err) {
      if (err.name === 'EventstoreNoStreamError') {
        return null
      }
      throw err
    }
  }

  /**
   * Set metadata for stream
   */
  public async setMetadata(
    newMetadata: {},
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<void> {
    if (this.isMetaStream()) {
      throw eventstoreError.newBadRequestError(
        `You can not set metadata for metadata stream ${this.streamId}`
      )
    }
    const newMetaEvent = new Event('$metadata', newMetadata)
    await this.esConnection
      .fromStream(`$$${this.streamId}`, {
        resolveLinks: false,
        requireMaster: requireMaster === undefined ? this.options.requireMaster : requireMaster,
        credentials: credentials || this.options.credentials
      })
      .append(newMetaEvent, ExpectedVersion.Any, requireMaster, credentials)
  }

  /**
   * Creates a new instance of {Transaction} for current stream
   */
  public async startTransaction(
    expectedVersion: ExpectedVersion = ExpectedVersion.Any,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<Transaction> {
    if (this.isMetaStream()) {
      throw eventstoreError.newBadRequestError(
        `Transactions fpr metadata stream ${this.streamId} not supported`
      )
    }
    const transactionId: Long = await new Promise(
      (resolve, reject): void => {
        const raw = protobuf.TransactionStart.fromObject({
          eventStreamId: this.streamId,
          expectedVersion: expectedVersion,
          requireMaster: requireMaster === undefined ? this.options.requireMaster : requireMaster
        })
        this.esConnection
          .getConnection()
          .sendCommand(
            uuid(),
            EventstoreCommand.TransactionStart,
            Buffer.from(protobuf.TransactionStart.encode(raw).finish()),
            credentials || this.options.credentials,
            {
              resolve,
              reject
            }
          )
      }
    )

    return new Transaction(
      this,
      transactionId,
      this.esConnection,
      requireMaster === undefined ? this.options.requireMaster : requireMaster,
      credentials
    )
  }

  /**
   * Reads a slice from current stream in given direction starting at given position
   */
  protected async readSlice(
    direction: EventstoreCommand,
    fromEventNumber: number | Long,
    maxSliceCount?: number,
    resolveLinks?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<model.eventstore.proto.ReadStreamEventsCompleted> {
    return await new Promise(
      (resolve, reject): void => {
        const raw = protobuf.ReadStreamEvents.fromObject({
          eventStreamId: this.streamId,
          fromEventNumber,
          maxCount: maxSliceCount || this.defaultSliceSize,
          resolveLinkTos: resolveLinks === undefined ? this.options.resolveLinks : resolveLinks,
          requireMaster: requireMaster === undefined ? this.options.requireMaster : requireMaster
        })
        this.esConnection
          .getConnection()
          .sendCommand(
            uuid(),
            direction,
            Buffer.from(protobuf.ReadStreamEvents.encode(raw).finish()),
            credentials || this.options.credentials,
            {
              resolve,
              reject
            }
          )
      }
    )
  }

  /**
   * Read a slice from stream in forward direction starting at given position
   */
  public async readSliceForward(
    fromEventNumber: number | Long,
    maxSliceCount?: number,
    resolveLinks?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<model.eventstore.proto.ReadStreamEventsCompleted> {
    return await this.readSlice(
      EventstoreCommand.ReadStreamEventsForward,
      fromEventNumber,
      maxSliceCount,
      resolveLinks,
      requireMaster,
      credentials
    )
  }

  /**
   * Read a slice from stream in backward direction starting at given position
   */
  public async readSliceBackward(
    fromEventNumber: number | Long,
    maxSliceCount?: number,
    resolveLinks?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<model.eventstore.proto.ReadStreamEventsCompleted> {
    return await this.readSlice(
      EventstoreCommand.ReadStreamEventsBackward,
      fromEventNumber,
      maxSliceCount,
      resolveLinks,
      requireMaster,
      credentials
    )
  }

  /**
   * Walk through all events of stream
   */
  protected async walkStream(
    forward: boolean,
    start: Long | number,
    resolveLinks?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<StreamWalker> {
    const that = this
    const resolveLinksTos = resolveLinks === undefined ? this.options.resolveLinks : resolveLinks
    const maxCount = this.defaultSliceSize
    const getSlice = forward ? 'readSliceForward' : 'readSliceBackward'
    if (requireMaster === undefined) {
      requireMaster = this.options.requireMaster
    }
    // eslint-disable-next-line @typescript-eslint/explicit-function-return-type
    const asyncGenerator = async function*(begin: Long | number) {
      let index = 0
      //fetch first slice
      let readResult = that[getSlice](begin, maxCount, resolveLinksTos, requireMaster, credentials)
      let result = await readResult
      if (!result.isEndOfStream) {
        //we have more so start fetching in background
        begin = result.nextEventNumber
        readResult = that[getSlice](begin, maxCount, resolveLinksTos, requireMaster, credentials)
      }
      while (true) {
        if (index < result.events.length) {
          const entry = result.events[index++]
          yield Event.fromRaw(entry.event || entry.link)
        } else if (result.isEndOfStream) {
          return null
        } else {
          index = 0
          //wait for background fetch and grab result
          result = await readResult
          if (!result.isEndOfStream) {
            //if there are more events start fetching in background
            begin = result.nextEventNumber
            readResult = that[getSlice](
              begin,
              maxCount,
              resolveLinksTos,
              requireMaster,
              credentials
            )
          }
        }
      }
    }

    return new StreamWalker(asyncGenerator(start))
  }

  /**
   * Walk through all events in stream forward
   */
  public async walkStreamForward(
    start: Long | number = StreamPosition.Start,
    resolveLinkTos?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<StreamWalker> {
    return await this.walkStream(true, start, resolveLinkTos, requireMaster, credentials)
  }

  /**
   * Walk through all events in stream backward
   */
  public async walkStreamBackward(
    start: Long | number = StreamPosition.End,
    resolveLinkTos?: boolean,
    requireMaster?: boolean,
    credentials?: UserCredentials
  ): Promise<StreamWalker> {
    return await this.walkStream(false, start, resolveLinkTos, requireMaster, credentials)
  }

  /**
   * Subscribe to current stream and return a subscription
   */
  public async subscribe(
    resolveLinkTos?: boolean,
    credentials?: UserCredentials
  ): Promise<Subscription> {
    return await this.esConnection
      .getConnection()
      .subscribeToStream(this, resolveLinkTos, credentials || this.options.credentials || null)
  }

  /**
   * Creates a persistent subscription for current stream
   * This operation needs admin rights and a master connection
   */
  public async createPersistentSubscription(
    subscriptionGroupName: string,
    customConfig: PersistentSubscriptionConfig | {} = {},
    credentials?: UserCredentials
  ): Promise<PersistentSubscription> {
    const settings = setPersistentSubscriptionConfig(customConfig)

    await new Promise(
      (resolve, reject): void => {
        const raw = protobuf.CreatePersistentSubscription.fromObject({
          subscriptionGroupName,
          eventStreamId: this.id,
          ...settings
        })
        this.esConnection
          .getConnection()
          .sendCommand(
            uuid(),
            EventstoreCommand.CreatePersistentSubscription,
            Buffer.from(protobuf.CreatePersistentSubscription.encode(raw).finish()),
            credentials || this.options.credentials,
            {
              resolve,
              reject
            }
          )
      }
    )
    return new PersistentSubscription(
      this,
      this.esConnection,
      this.options.credentials,
      subscriptionGroupName
    )
  }

  /**
   * Returns a instance of persistance subscription given by group name
   */
  public getPersistentSubscription(
    subscriptionGroupName: string,
    credentials?: UserCredentials
  ): PersistentSubscription {
    return new PersistentSubscription(
      this,
      this.esConnection,
      credentials || this.options.credentials,
      subscriptionGroupName
    )
  }
}