src/subscription/Subscription.ts
import {EventEmitter} from 'events'
import * as model from '../protobuf/model'
import {UserCredentials, Position} from '../eventstore'
import {Stream} from '../stream'
import {TCPConnection} from '../eventstore/TCPConnection'
import * as bunyan from 'bunyan'
import {Event} from '../event'
/**
* Base class for handling subscriptions
* @emits {subscribed}
* @emits {dropped}
* @emits {event}
* @emits {event-eventnametolowercase}
* @emits {error}
*/
export class Subscription extends EventEmitter {
/** uuid4 of subscription */
protected subscriptionId: string
/** indicates if subscription is running */
public isSubscribed: boolean = false
/** connection to use */
protected tcpConnection: TCPConnection
/** credentials for subscription */
protected credentials: UserCredentials | null = null
/** instance of corresponding stream */
protected stream: Stream
/** indicates if events should be full resolved */
protected resolveLinkTos: boolean
/** logger */
public log: bunyan
/** global log position */
protected position: Position | null = null
/**
* Creates an instance of subscription.
*/
public constructor(
subscriptionId: string,
tcpConnection: TCPConnection,
stream: Stream,
resolveLinkTos: boolean,
credentials: UserCredentials | null
) {
super()
this.subscriptionId = subscriptionId
this.tcpConnection = tcpConnection
this.credentials = credentials
this.stream = stream
this.resolveLinkTos = resolveLinkTos
this.on('subscribed', this.onSubscribed)
this.on('dropped', this.onDropped)
this.on('event', this.onEvent)
this.on('error', this.onError)
this.log = stream.log.child({module: 'Subscription', subscriptionId: this.id})
}
/**
* Returns subscriptionId
*/
public get id(): string {
return this.subscriptionId
}
/**
* Returns subscription nam
*/
public get name(): string {
return 'Subscription: ' + this.subscriptionId
}
/**
* Gets get credentials
*/
public get getCredentials(): UserCredentials | null {
return this.credentials
}
/**
* Gets resolve link tos
*/
public getResolveLinkTos(): boolean {
return this.resolveLinkTos
}
/**
* Unsubscribe from stream
*/
public async unsubscribe(): Promise<void> {
this.log.debug({fn: 'unsubscribe'}, 'unsubscribe subscription')
await this.tcpConnection.unsubscribeFromStream(this.id)
}
/**
* Called when event from eventstore arrives
*/
public eventAppeared(event: Event, position: Position): void {
this.emit('event', event, position)
this.emit(`event-${event.name.toLocaleLowerCase()}`, event, position)
}
/**
* Called when subscription was dropped
*/
protected onDropped(
reason: model.eventstore.proto.SubscriptionDropped.SubscriptionDropReason
): void {
this.log.debug({reason}, 'Subscription dropped')
}
/**
* Determines whether event on
*/
protected onEvent(event: Event, position: Position): void {
this.position = position
this.log.debug({eventName: event.name, eventId: event.id}, 'Event received')
}
/**
* Called when subscription is established
*/
protected onSubscribed(): void {
this.log.debug(
{subscriptionId: this.subscriptionId, stream: this.stream.id},
'Subscription started'
)
}
/**
* Called when error appears
*/
protected onError(err: Error): void {
this.log.error({err, subscriptionId: this.subscriptionId}, 'Error on subscription')
}
}