Home Reference Source Test

src/subscription/Subscription.ts

import {EventEmitter} from 'events'
import * as model from '../protobuf/model'
import {UserCredentials, Position} from '../eventstore'
import {Stream} from '../stream'
import {TCPConnection} from '../eventstore/TCPConnection'
import * as bunyan from 'bunyan'
import {Event} from '../event'

/**
 * Base class for handling subscriptions
 * @emits {subscribed}
 * @emits {dropped}
 * @emits {event}
 * @emits {event-eventnametolowercase}
 * @emits {error}
 */
export class Subscription extends EventEmitter {
  /** uuid4 of subscription */
  protected subscriptionId: string
  /** indicates if subscription is running */
  public isSubscribed: boolean = false
  /** connection to use */
  protected tcpConnection: TCPConnection
  /** credentials for subscription */
  protected credentials: UserCredentials | null = null
  /** instance of corresponding stream */
  protected stream: Stream
  /** indicates if events should be full resolved */
  protected resolveLinkTos: boolean
  /** logger */
  public log: bunyan
  /** global log position */
  protected position: Position | null = null

  /**
   * Creates an instance of subscription.
   */
  public constructor(
    subscriptionId: string,
    tcpConnection: TCPConnection,
    stream: Stream,
    resolveLinkTos: boolean,
    credentials: UserCredentials | null
  ) {
    super()
    this.subscriptionId = subscriptionId
    this.tcpConnection = tcpConnection
    this.credentials = credentials
    this.stream = stream
    this.resolveLinkTos = resolveLinkTos
    this.on('subscribed', this.onSubscribed)
    this.on('dropped', this.onDropped)
    this.on('event', this.onEvent)
    this.on('error', this.onError)
    this.log = stream.log.child({module: 'Subscription', subscriptionId: this.id})
  }

  /**
   * Returns subscriptionId
   */
  public get id(): string {
    return this.subscriptionId
  }

  /**
   * Returns subscription nam
   */
  public get name(): string {
    return 'Subscription: ' + this.subscriptionId
  }

  /**
   * Gets get credentials
   */
  public get getCredentials(): UserCredentials | null {
    return this.credentials
  }

  /**
   * Gets resolve link tos
   */
  public getResolveLinkTos(): boolean {
    return this.resolveLinkTos
  }

  /**
   * Unsubscribe from stream
   */
  public async unsubscribe(): Promise<void> {
    this.log.debug({fn: 'unsubscribe'}, 'unsubscribe subscription')
    await this.tcpConnection.unsubscribeFromStream(this.id)
  }

  /**
   * Called when event from eventstore arrives
   */
  public eventAppeared(event: Event, position: Position): void {
    this.emit('event', event, position)
    this.emit(`event-${event.name.toLocaleLowerCase()}`, event, position)
  }

  /**
   * Called when subscription was dropped
   */
  protected onDropped(
    reason: model.eventstore.proto.SubscriptionDropped.SubscriptionDropReason
  ): void {
    this.log.debug({reason}, 'Subscription dropped')
  }

  /**
   * Determines whether event on
   */
  protected onEvent(event: Event, position: Position): void {
    this.position = position
    this.log.debug({eventName: event.name, eventId: event.id}, 'Event received')
  }

  /**
   * Called when subscription is established
   */
  protected onSubscribed(): void {
    this.log.debug(
      {subscriptionId: this.subscriptionId, stream: this.stream.id},
      'Subscription started'
    )
  }

  /**
   * Called when error appears
   */
  protected onError(err: Error): void {
    this.log.error({err, subscriptionId: this.subscriptionId}, 'Error on subscription')
  }
}