src/subscription/PersistentSubscription.ts
import {Stream} from '../stream'
import {UserCredentials, Eventstore} from '../eventstore'
import {EventEmitter} from 'events'
import {EventstoreCommand} from '../protobuf/EventstoreCommand'
import * as model from '../protobuf/model'
import uuid = require('uuid/v4')
import {PersistentSubscriptionConfig, setPersistentSubscriptionConfig, SubscriptionStatus} from '.'
import Long from 'long'
import {Event} from '../event'
import {uuidToBuffer} from '../protobuf/uuidBufferConvert'
/** protobuf shorthand */
const protobuf = model.eventstore.proto
/**
* Represents a persistent subscription
*
* @emits {dropped} emitted when subscription is disconnected
* @emits {event}
* @emits {event-eventnametolowercase}
*/
export class PersistentSubscription extends EventEmitter {
/** corresponding stream */
public stream: Stream
/** connection to use */
protected esConnection: Eventstore
/** name of subscription */
public subscriptionGroupName: string
/** user credentials */
protected credentials: UserCredentials | null = null
/** last commit position */
public lastCommitPosition: Long = Long.fromNumber(0)
/** last event number */
public lastEventNumber: Long | null = null
/** id of persistent subscription (uuid)*/
public id: string = uuid()
/** subscription id send back from eventstore */
public subscriptionId: string
/** count of max concurrent events */
public allowedInFlightMessages: number = 10
/** status of subscription */
protected status: SubscriptionStatus = SubscriptionStatus.disconnected
/** indicate if incoming events should automatically acknowledged */
public autoAcknowledge: boolean = true
/**
* Creates an instance of persistent subscription.
*/
public constructor(
stream: Stream,
esConnection: Eventstore,
credentials: UserCredentials | null,
subscriptionGroupName: string
) {
super()
this.stream = stream
this.esConnection = esConnection
this.subscriptionGroupName = subscriptionGroupName
this.credentials = credentials
this.subscriptionId = `${this.stream.id} :: ${this.subscriptionGroupName}`
this.on(
'dropped',
(): void => {
this.state = SubscriptionStatus.disconnected
}
)
}
/**
* Gets name
*/
public get name(): string {
return `PersistentSubscription: ${this.stream.id} :: ${this.subscriptionGroupName}`
}
/**
* Gets state
*/
public get state(): SubscriptionStatus {
return this.status
}
/**
* Sets state
*/
public set state(newStatus: SubscriptionStatus) {
this.emit(SubscriptionStatus[newStatus])
this.status = newStatus
}
/**
* Connects persistent subscription
*/
public async subscribe(
allowedInFlightMessages: number = 10,
credentials?: UserCredentials | null
): Promise<PersistentSubscription> {
this.allowedInFlightMessages = allowedInFlightMessages
if (credentials) {
this.credentials = credentials
}
const result = await this.esConnection
.getConnection()
.connectToPersistentSubscription(this, allowedInFlightMessages, this.credentials)
this.subscriptionId = result.subscriptionId
this.state = SubscriptionStatus.connected
return this
}
/**
* Unsubscribe from stream
*/
public async unsubscribe(credentials?: UserCredentials | null): Promise<void> {
await this.esConnection
.getConnection()
.unsubscribeFromPersistentSubscription(this.id, credentials || this.credentials)
}
/**
* Deletes persistent subscription
*/
public async delete(credentials?: UserCredentials | null): Promise<PersistentSubscription> {
const result: PersistentSubscription = await new Promise(
(resolve, reject): void => {
const raw = protobuf.UpdatePersistentSubscription.fromObject({
subscriptionGroupName: this.subscriptionGroupName,
eventStreamId: this.stream.id
})
this.esConnection
.getConnection()
.sendCommand(
uuid(),
EventstoreCommand.DeletePersistentSubscription,
Buffer.from(protobuf.DeletePersistentSubscription.encode(raw).finish()),
credentials || this.credentials,
{
resolve,
reject
}
)
}
)
this.state = SubscriptionStatus.disconnected
return result
}
/**
* Updates persistent subscription
*/
public async update(
customConfig: PersistentSubscriptionConfig | {},
credentials?: UserCredentials
): Promise<PersistentSubscription> {
const settings = setPersistentSubscriptionConfig(customConfig)
return await new Promise(
(resolve, reject): void => {
const raw = protobuf.UpdatePersistentSubscription.fromObject({
subscriptionGroupName: this.subscriptionGroupName,
eventStreamId: this.stream.id,
...settings
})
this.esConnection
.getConnection()
.sendCommand(
uuid(),
EventstoreCommand.UpdatePersistentSubscription,
Buffer.from(protobuf.UpdatePersistentSubscription.encode(raw).finish()),
credentials || this.credentials,
{
resolve,
reject
}
)
}
)
}
/**
* Called when event from eventstore arrives
*/
public eventAppeared(event: Event): void {
this.emit('event', event)
this.emit(`event-${event.name.toLocaleLowerCase()}`, event)
}
/**
* Acknowledges single event
*/
public acknowledgeEvent(event: Event, credentials?: UserCredentials): void {
return this.acknowledgeEvents([event], credentials)
}
/**
* Acknowledges array of events
*/
public acknowledgeEvents(events: Event[], credentials?: UserCredentials): void {
const processedEventIds = events.map(
(event): Buffer => {
return uuidToBuffer(event.id)
}
)
const raw = protobuf.PersistentSubscriptionAckEvents.fromObject({
subscriptionId: this.subscriptionId,
processedEventIds
})
this.esConnection
.getConnection()
.sendCommand(
this.id,
EventstoreCommand.PersistentSubscriptionAckEvents,
Buffer.from(protobuf.PersistentSubscriptionAckEvents.encode(raw).finish()),
credentials || this.credentials
)
}
/**
* Not acknowledge single event
*/
public notAcknowledgeEvent(
event: Event,
reason: model.eventstore.proto.PersistentSubscriptionNakEvents.NakAction = model.eventstore
.proto.PersistentSubscriptionNakEvents.NakAction.Unknown,
message?: string,
credentials?: UserCredentials
): void {
return this.notAcknowledgeEvents([event], reason, message, credentials)
}
/**
* Not acknowledge array of events
*/
public notAcknowledgeEvents(
events: Event[],
reason: model.eventstore.proto.PersistentSubscriptionNakEvents.NakAction = model.eventstore
.proto.PersistentSubscriptionNakEvents.NakAction.Unknown,
message?: string,
credentials?: UserCredentials
): void {
const processedEventIds = events.map(
(event): Buffer => {
return uuidToBuffer(event.id)
}
)
const raw = protobuf.PersistentSubscriptionNakEvents.fromObject({
subscriptionId: this.subscriptionId,
processedEventIds,
message,
action: reason
})
this.esConnection
.getConnection()
.sendCommand(
this.id,
EventstoreCommand.PersistentSubscriptionNakEvents,
Buffer.from(protobuf.PersistentSubscriptionNakEvents.encode(raw).finish()),
credentials || this.credentials
)
}
}