src/eventstore/TCPConnection.ts
- import {EventstoreSettings} from './EventstoreSettings'
- import * as net from 'net'
- import * as tls from 'tls'
- import * as bunyan from 'bunyan'
- import {EventEmitter} from 'events'
- import {uuidToBuffer, uuidFromBuffer} from '../protobuf/uuidBufferConvert'
- import {EventstoreCommand} from '../protobuf/EventstoreCommand'
- import * as eventstoreError from '../errors'
- import {EventstoreError} from '../errors'
- import * as model from '../protobuf/model'
- import {
- Subscription,
- PersistentSubscription,
- SubscriptionDropReason,
- SubscriptionStatus
- } from '../subscription'
- import {Stream} from '../stream'
- import {UserCredentials} from '../eventstore/EventstoreSettings'
- import uuid = require('uuid/v4')
- import {Event} from '../event'
- import {getIpAndPort} from './getConnectInfo'
- import {Position} from './Position'
- import {WriteResult} from './Eventstore'
- import Long from 'long'
-
- /** protobuf shorthand */
- const protobuf = model.eventstore.proto
-
- /** typescript enumeration of connection states */
- enum connectionState {
- closed,
- init,
- connected,
- drain
- }
-
- /** raw tcp communication constant */
- const FLAGS_NONE = 0x00
- /** raw tcp communication constant */
- const FLAGS_AUTH = 0x01
- /** raw tcp communication constant */
- const UINT32_LENGTH = 4
- /** raw tcp communication constant */
- const GUID_LENGTH = 16
- /** raw tcp communication constant */
- const HEADER_LENGTH = 1 + 1 + GUID_LENGTH // Cmd + Flags + CorrelationId
- /** raw tcp communication constant */
- const COMMAND_OFFSET = UINT32_LENGTH
- /** raw tcp communication constant */
- const FLAGS_OFFSET = COMMAND_OFFSET + 1
- /** raw tcp communication constant */
- const CORRELATION_ID_OFFSET = FLAGS_OFFSET + 1
- /** raw tcp communication constant */
- const DATA_OFFSET = CORRELATION_ID_OFFSET + GUID_LENGTH // Length + Cmd + Flags + CorrelationId
-
- /**
- * Raw tcp communication to eventstore
- * This class handles basic communication with eventstore
- */
- export class TCPConnection extends EventEmitter {
- /** initial config */
- protected initialConfig: EventstoreSettings
- /** config after discovery process */
- protected connectionConfig: EventstoreSettings
- /** tcp socket */
- protected socket: net.Socket | tls.TLSSocket
- /** connection id */
- protected connectionId: string | null = null
- /** list of pending requests */
- protected pendingRequests: Map<
- string,
- {resolve: Function; reject: Function; sendTime: number}
- > = new Map()
- /** timeout interval for timed out pending requests */
- protected timeoutInterval: null | NodeJS.Timeout = null
- /** logger instance */
- public log: bunyan
- /** connection state */
- protected state: connectionState = connectionState.closed
- /** message offset of tcp data */
- protected messageCurrentOffset: number = 0
- /** message length of tcp data */
- protected messageCurrentLength: number = 0
- /** message buffer of tcp data */
- protected messageData: Buffer | null = null
- /** list of subscriptions */
- protected subscriptionList: Map<string, Subscription> = new Map()
- /** list of persistent subscriptions */
- protected persistentSubscriptionList: Map<string, PersistentSubscription> = new Map()
- /** indicates if connection close is wanted by user or not */
- protected isUnexpectedClosed: boolean = true
- /** counter for re-connections */
- protected reconnectCount: number = 0
-
- /**
- *Creates an instance of TCPConnection.
- */
- public constructor(connectionConfiguration: EventstoreSettings) {
- super()
- this.initialConfig = {...connectionConfiguration}
- this.connectionConfig = connectionConfiguration
- this.log = this.connectionConfig.logger.child
- ? this.connectionConfig.logger.child({module: 'TCPConnection'})
- : this.connectionConfig.logger
-
- this.socket = new net.Socket()
- }
-
- /**
- * Returns true if connected to eventstore otherwise false
- */
- public get isConnected(): boolean {
- return this.state === connectionState.connected
- }
-
- /**
- * Called to connect to eventstore
- */
- public async connect(): Promise<void> {
- let connected = false
- while (!connected && this.reconnectCount < this.initialConfig.maxReconnections) {
- this.state = connectionState.init
- this.connectionConfig = await getIpAndPort({...this.initialConfig}, this.log)
-
- try {
- await this.tryToConnect()
- connected = true
- } catch (err) {
- this.log.error({err, count: this.reconnectCount, fn: 'connect'}, 'Try to connect failed ')
- this.reconnectCount++
- this.emit('reconnect', this.reconnectCount)
- await new Promise(
- (resolve): void => {
- setTimeout(resolve, this.initialConfig.reconnectionDelay)
- }
- )
- }
- }
- }
-
- /**
- * Connect to eventstore
- */
- protected async tryToConnect(): Promise<void> {
- const port = this.connectionConfig.port
- const host = this.connectionConfig.host
-
- if (port === 0 || host === '') {
- throw eventstoreError.newConnectionError('Invalid connection settings on host and port')
- }
-
- this.log.debug(`Start connecting to ${host}:${port}`)
-
- await new Promise(
- (resolve, reject): void => {
- const errorListener = (err: Error | EventstoreError): void => {
- this.state = connectionState.closed
- if (err instanceof Error) {
- this.onError(eventstoreError.newConnectionError(err.message, err))
- } else {
- this.onError(err)
- }
-
- reject(err)
- }
-
- const successListener = (): void => {
- if (this.socket instanceof tls.TLSSocket) {
- if (!this.socket.authorized) {
- this.log.warn({err: this.socket.authorizationError}, 'SSL authorization warning')
- }
- }
- this.socket.removeListener('error', errorListener)
- this.socket.on('error', this.onError.bind(this))
- this.onConnect()
- resolve()
- }
-
- if (this.connectionConfig.useSSL) {
- let secureContext
- if (this.connectionConfig.secureContext) {
- try {
- secureContext = tls.createSecureContext(this.connectionConfig.secureContext)
- } catch (err) {
- const conErr = eventstoreError.newConnectionError(
- 'Error creating secure context',
- err
- )
- reject(conErr)
- }
- }
-
- const options = {
- port,
- host,
- servername: host,
- requestCert: this.connectionConfig.validateServer,
- rejectUnauthorized: this.connectionConfig.validateServer,
- timeout: this.connectionConfig.connectTimeout,
- secureContext
- }
-
- this.socket = tls.connect(options, successListener)
- } else {
- const options = {
- port,
- host,
- servername: host,
- timeout: this.connectionConfig.connectTimeout
- }
- this.socket = net.connect(options, successListener)
- }
-
- this.socket.once('error', errorListener.bind(this))
- this.socket.on('close', this.onClose.bind(this))
- this.socket.on('data', this.onData.bind(this))
- this.socket.on('secureConnect', this.onSecureConnect.bind(this))
- }
- )
- }
-
- /**
- * Disconnect from eventstore.
- * It tries to drain pending queue to prevent data loose before connection gets closed
- * If disconnect() is call no new outgoing requests accepted
- */
- public async disconnect(): Promise<void> {
- if (!this.isConnected) {
- return
- }
- this.isUnexpectedClosed = false
- await new Promise(
- (resolve): void => {
- this.onDrain()
- if (this.pendingRequests.size <= 0) {
- this.state = connectionState.closed
- this.socket.destroy()
- resolve()
- } else {
- this.log.debug(
- {
- pendingRequests: this.pendingRequests.size,
- timeout:
- this.initialConfig.operationTimeout + this.initialConfig.operationTimeoutCheckPeriod
- },
- 'Wait for pending requests'
- )
- // wait for pending requests/timeouts
- setTimeout((): void => {
- this.state = connectionState.closed
- this.socket.destroy()
- this.log.debug('Timeout finished')
- this.pendingRequests.forEach(
- (value, id): void => {
- this.rejectCommandPromise(
- id,
- eventstoreError.newConnectionError('Connection closed')
- )
- }
- )
- resolve()
- }, this.initialConfig.operationTimeout + this.initialConfig.operationTimeoutCheckPeriod)
- }
- }
- )
- }
-
- /**
- * Called by interval function to check if there are some pending requests which should be rejected with time out error
- */
- protected checkTimeout(): void {
- this.log.trace('Check timeout queue')
- const timeout: string[] = []
- const now = Date.now() - this.initialConfig.operationTimeout
- for (var [key, value] of this.pendingRequests) {
- if (value.sendTime < now) {
- timeout.push(key)
- }
- }
- for (let x = 0, xMax = timeout.length; x < xMax; x++) {
- try {
- this.rejectCommandPromise(
- timeout[x],
- eventstoreError.newTimeoutError('Timeout by eventstore-ts-client')
- )
- } catch (err) {
- this.log.error({err, fn: 'checkTimeout'}, 'Error on rejectCommandPromise')
- }
- }
- }
-
- /**
- * Creates and sends raw data message to eventstore and adds given promise to pending queue
- */
- public sendCommand(
- correlationId: string,
- command: EventstoreCommand,
- data: Buffer | null = null,
- credentials: UserCredentials | null = null,
- promise: {resolve: Function; reject: Function} | null = null
- ): void {
- this.log.trace(`Sending ${EventstoreCommand[command]} with ${correlationId}`)
- if (
- this.state !== connectionState.connected &&
- command !== EventstoreCommand.HeartbeatResponseCommand &&
- command !== EventstoreCommand.Pong
- ) {
- throw eventstoreError.newConnectionError(
- 'Connection to eventstore is: ' + connectionState[this.state]
- )
- }
-
- if (promise) {
- if (this.pendingRequests.size >= this.connectionConfig.maxQueueSize) {
- promise.reject(eventstoreError.newConnectionError('Maximum concurrent items reached'))
- throw eventstoreError.newConnectionError('Maximum concurrent items reached')
- }
- this.pendingRequests.set(correlationId, {...promise, sendTime: Date.now()})
- }
-
- try {
- let authLength = 0
- let flags = FLAGS_NONE
- if (credentials) {
- flags = FLAGS_AUTH
- authLength = 1 + credentials.username.length + 1 + credentials.password.length
- }
-
- let commandLength = HEADER_LENGTH + authLength
- if (data) {
- commandLength += data.length
- }
- const packetLength = 4 + commandLength
-
- const buf = Buffer.alloc(packetLength)
- buf.writeUInt32LE(commandLength, 0)
- buf[COMMAND_OFFSET] = command
- buf[FLAGS_OFFSET] = flags
-
- uuidToBuffer(correlationId).copy(buf, CORRELATION_ID_OFFSET, 0, GUID_LENGTH)
- if (credentials) {
- buf.writeUInt8(credentials.username.length, DATA_OFFSET)
- buf.write(credentials.username, DATA_OFFSET + 1)
- buf.writeUInt8(credentials.password.length, DATA_OFFSET + 1 + credentials.username.length)
- buf.write(credentials.password, DATA_OFFSET + 1 + credentials.username.length + 1)
- }
-
- if (data) {
- data.copy(buf, DATA_OFFSET + authLength, 0, data.length)
- }
-
- this.socket.write(buf)
- } catch (err) {
- const newErr = eventstoreError.newConnectionError(err.message, err)
- this.rejectCommandPromise(correlationId, newErr)
- this.onError(newErr)
- }
- }
-
- /**
- * Gets called as soon as new data over tcp connection arrives as raw buffer data
- * Checks if
- * - new received data is part of previously received data
- * - new data contains multiple responses
- * - new data is single response
- */
- protected handleNewResponseData(data: Buffer): Buffer | null {
- const commandLength = data.readUInt32LE(0)
- if (commandLength < HEADER_LENGTH) {
- this.log.error(
- {
- connectionId: this.connectionId,
- fn: 'handleNewResponseData'
- },
- 'Invalid command length of ' + commandLength + ' bytes'
- )
- throw eventstoreError.newProtocolError('Invalid command length')
- }
-
- const messageLength = UINT32_LENGTH + commandLength
- if (data.length === messageLength) {
- // A single packet message, no need to copy into another buffer
- this.handleSingleResponseData(data)
- return null
- } else if (data.length > messageLength) {
- // Multiple messages in one packet
- const firstMessage = data.slice(0, messageLength)
- this.messageCurrentLength = messageLength
- this.handleSingleResponseData(firstMessage)
- return data.slice(this.messageCurrentLength)
- } else {
- // The first packet of a multi-packet message
- this.messageData = Buffer.alloc(messageLength)
- const packetLength = data.copy(this.messageData, this.messageCurrentOffset, 0)
- this.messageCurrentOffset = packetLength
- return null
- }
- }
-
- /**
- * This function handles raw buffer responses received within multiple tcp data package
- */
- protected handleMultiPacketResponseData(data: Buffer): Buffer | null {
- this.log.trace({fn: 'handleMultiPacketResponseData'}, `MultipacketResponse`)
- if (this.messageData === null) {
- return null
- }
- const packetLength = data.copy(this.messageData, this.messageCurrentOffset, 0)
- this.messageCurrentOffset += packetLength
- if (this.messageCurrentOffset >= this.messageData.length) {
- this.handleSingleResponseData(this.messageData)
- this.messageData = null
- this.messageCurrentOffset = 0
- }
- return null
- }
-
- /**
- * This function handles a single raw buffer response
- */
- protected handleSingleResponseData(data: Buffer): void {
- const commandLength = data.readUInt32LE(0)
- if (commandLength < HEADER_LENGTH) {
- this.log.error(
- {
- connectionId: this.connectionId,
- fn: 'handleSingleResponseData'
- },
- 'Invalid command length of ' + commandLength + ' bytes'
- )
- throw eventstoreError.newProtocolError('Invalid command length')
- }
-
- const command = data[COMMAND_OFFSET]
-
- const correlationId = uuidFromBuffer(
- data.slice(CORRELATION_ID_OFFSET, CORRELATION_ID_OFFSET + GUID_LENGTH)
- )
-
- this.log.trace('Incoming response: ' + EventstoreCommand[command])
-
- //Answer Heartbeat directly without adding to promise queue
- if (command === EventstoreCommand.HeartbeatRequestCommand) {
- this.emit('heartbeat')
- this.sendCommand(correlationId, EventstoreCommand.HeartbeatResponseCommand)
- return
- }
-
- //Answer Ping directly without adding to promise queue
- if (command === EventstoreCommand.Ping) {
- this.sendCommand(correlationId, EventstoreCommand.Pong)
- return
- }
-
- const payloadLength = commandLength - HEADER_LENGTH
- const payload = Buffer.alloc(payloadLength)
- if (payloadLength > 0) {
- data.copy(payload, 0, DATA_OFFSET, DATA_OFFSET + payloadLength)
- }
-
- let err: eventstoreError.EventstoreError
- switch (command) {
- case EventstoreCommand.BadRequest:
- err = eventstoreError.newBadRequestError()
- this.rejectCommandPromise(correlationId, err)
- this.onError(err)
- break
- case EventstoreCommand.NotAuthenticated:
- err = eventstoreError.newNotAuthenticatedError()
- this.rejectCommandPromise(correlationId, err)
- this.onError(err)
- break
- case EventstoreCommand.NotHandled:
- const notHandled = protobuf.NotHandled.decode(payload)
- err = eventstoreError.newNotHandledError(`
- ${protobuf.NotHandled.NotHandledReason[notHandled.reason]}
- `)
- this.rejectCommandPromise(correlationId, err)
- this.onError(err)
- break
- case EventstoreCommand.CreatePersistentSubscriptionCompleted:
- this.handleCreatePersistentSubscriptionCompleted(correlationId, payload)
- break
- case EventstoreCommand.DeletePersistentSubscriptionCompleted:
- this.handleDeletePersistentSubscriptionCompleted(correlationId, payload)
- break
- case EventstoreCommand.DeleteStreamCompleted:
- this.handleDeleteStreamCompleted(correlationId, payload)
- break
- case EventstoreCommand.PersistentSubscriptionConfirmation:
- this.handlePersistentSubscriptionConfirmation(correlationId, payload)
- break
- case EventstoreCommand.PersistentSubscriptionStreamEventAppeared:
- this.handlePersistentSubscriptionStreamEventAppeared(correlationId, payload)
- break
-
- case EventstoreCommand.ReadAllEventsBackwardCompleted:
- this.handleReadAllEventsCompleted(correlationId, payload)
- break
- case EventstoreCommand.ReadAllEventsForwardCompleted:
- this.handleReadAllEventsCompleted(correlationId, payload)
- break
-
- case EventstoreCommand.ReadEventCompleted:
- this.handleReadEventCompleted(correlationId, payload)
- break
-
- case EventstoreCommand.ReadStreamEventsBackwardCompleted:
- this.handleReadStreamEventsCompleted(correlationId, payload)
- break
- case EventstoreCommand.ReadStreamEventsForwardCompleted:
- this.handleReadStreamEventsCompleted(correlationId, payload)
- break
- case EventstoreCommand.StreamEventAppeared:
- this.handleStreamEventAppeared(correlationId, payload)
- break
- case EventstoreCommand.SubscriptionConfirmation:
- this.handleSubscriptionConfirmation(correlationId, payload)
- break
- case EventstoreCommand.SubscriptionDropped:
- this.handleSubscriptionDropped(correlationId, payload)
- break
- case EventstoreCommand.TransactionCommitCompleted:
- this.handleTransactionCommitCompleted(correlationId, payload)
- break
- case EventstoreCommand.TransactionStartCompleted:
- this.handleTransactionStartCompleted(correlationId, payload)
- break
- case EventstoreCommand.TransactionWriteCompleted:
- this.handleTransactionWriteCompleted(correlationId, payload)
- break
- case EventstoreCommand.UpdatePersistentSubscriptionCompleted:
- this.handleUpdatePersistentSubscriptionCompleted(correlationId, payload)
- break
- case EventstoreCommand.WriteEventsCompleted:
- this.handleWriteEventsCompleted(correlationId, payload)
- break
- case EventstoreCommand.ClientIdentified:
- this.resolveCommandPromise(correlationId)
- break
- case EventstoreCommand.Pong:
- this.resolveCommandPromise(correlationId)
- break
- case EventstoreCommand.Authenticated:
- this.resolveCommandPromise(correlationId)
- break
- default:
- err = new eventstoreError.EventstoreError(
- 'Unhandled eventstore command : ' + EventstoreCommand[command] + ' -> ' + command,
- 'EventstoreImplementationError'
- )
- this.rejectCommandPromise(correlationId, err)
- this.onError(err)
- break
- }
- }
-
- /**
- * Handle response for command CreatePersistentSubscription
- */
- protected handleCreatePersistentSubscriptionCompleted(
- correlationId: string,
- payload: Buffer
- ): void {
- const decoded = protobuf.CreatePersistentSubscriptionCompleted.decode(payload)
- if (
- decoded.result ===
- protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Success
- ) {
- this.resolveCommandPromise(correlationId)
- } else {
- const errorMsg =
- `${
- protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult[
- decoded.result
- ]
- } ` + (decoded.reason || '')
- let err
- switch (decoded.result) {
- case protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult
- .AccessDenied:
- err = eventstoreError.newAccessDeniedError(errorMsg)
- break
- case protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult
- .AlreadyExists:
- err = eventstoreError.newAlreadyExistError(errorMsg)
- break
- default:
- err = eventstoreError.newUnspecificError(errorMsg)
- }
- this.rejectCommandPromise(correlationId, err)
- }
- }
-
- /**
- * Handle response for command DeletePersistentSubscription
- */
- protected handleDeletePersistentSubscriptionCompleted(
- correlationId: string,
- payload: Buffer
- ): void {
- const status = protobuf.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult
- const decoded = protobuf.DeletePersistentSubscriptionCompleted.decode(payload)
- if (decoded.result === status.Success) {
- this.resolveCommandPromise(correlationId)
- } else {
- let returnError
- switch (decoded.result) {
- case status.AccessDenied:
- returnError = eventstoreError.newAccessDeniedError(
- 'Delete of Subscription not allowed: ' + decoded.reason || ''
- )
- break
- case status.DoesNotExist:
- returnError = eventstoreError.newDoesNotExistError(
- 'Persistent subscription does not exist: ' + decoded.reason || ''
- )
- break
- default:
- returnError = eventstoreError.newUnspecificError(
- 'Delete persistent connection failed: ' + (decoded.reason || '')
- )
- }
- this.rejectCommandPromise(correlationId, returnError)
- }
- }
-
- /**
- * Handle response for command DeleteStreamCompleted
- */
- protected handleDeleteStreamCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.DeleteStreamCompleted.decode(payload)
- if (
- this.checkOperationResult(
- correlationId,
- decoded.result,
- 'handleDeleteStream: ' + decoded.message
- )
- ) {
- this.resolveCommandPromise(
- correlationId,
- new Position(decoded.commitPosition, decoded.preparePosition)
- )
- }
- }
-
- /**
- * Handle response for command ReadAllEvents
- */
- protected handleReadAllEventsCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.ReadAllEventsCompleted.decode(payload)
- let err: eventstoreError.EventstoreError
- const message: string = decoded.error || ''
- switch (decoded.result) {
- case protobuf.ReadAllEventsCompleted.ReadAllResult.Success:
- this.resolveCommandPromise(correlationId, decoded)
- return
- case protobuf.ReadAllEventsCompleted.ReadAllResult.AccessDenied:
- err = eventstoreError.newAccessDeniedError(message)
- break
- case protobuf.ReadAllEventsCompleted.ReadAllResult.NotModified:
- err = eventstoreError.newNotModifiedError(message)
- break
- default:
- err = eventstoreError.newUnspecificError(message)
- }
- this.rejectCommandPromise(correlationId, err)
- }
-
- /**
- * Handle response for command ReadStreamEvents
- */
- protected handleReadStreamEventsCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.ReadStreamEventsCompleted.decode(payload)
- let err: eventstoreError.EventstoreError
- const message: string = decoded.error || ''
- switch (decoded.result) {
- case protobuf.ReadStreamEventsCompleted.ReadStreamResult.Success:
- this.resolveCommandPromise(correlationId, decoded)
- return
- case protobuf.ReadStreamEventsCompleted.ReadStreamResult.NoStream:
- err = eventstoreError.newNoStreamError(message)
- break
- case protobuf.ReadStreamEventsCompleted.ReadStreamResult.NotModified:
- err = eventstoreError.newNotModifiedError(message)
- break
- case protobuf.ReadStreamEventsCompleted.ReadStreamResult.StreamDeleted:
- err = eventstoreError.newStreamDeletedError(message)
- break
- case protobuf.ReadStreamEventsCompleted.ReadStreamResult.AccessDenied:
- err = eventstoreError.newAccessDeniedError(message)
- break
-
- default:
- err = eventstoreError.newUnspecificError(message)
- }
- this.rejectCommandPromise(correlationId, err)
- }
-
- /**
- * Handle response for command ReadEvent
- */
- protected handleReadEventCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.ReadEventCompleted.decode(payload)
-
- let err: eventstoreError.EventstoreError
- const message: string = decoded.error || ''
- switch (decoded.result) {
- case protobuf.ReadEventCompleted.ReadEventResult.Success:
- this.resolveCommandPromise(correlationId, decoded.event)
- return
- case protobuf.ReadEventCompleted.ReadEventResult.NotFound:
- err = eventstoreError.newNotFoundError(message)
- break
- case protobuf.ReadEventCompleted.ReadEventResult.NoStream:
- err = eventstoreError.newNoStreamError(message)
- break
- case protobuf.ReadEventCompleted.ReadEventResult.StreamDeleted:
- err = eventstoreError.newStreamDeletedError(message)
- break
- case protobuf.ReadEventCompleted.ReadEventResult.AccessDenied:
- err = eventstoreError.newAccessDeniedError(message)
- break
- default:
- err = eventstoreError.newUnspecificError(message)
- }
- this.rejectCommandPromise(correlationId, err)
- }
-
- /**
- * Handle incoming event for subscription
- */
- protected handleStreamEventAppeared(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.StreamEventAppeared.decode(payload)
- const subscription = this.subscriptionList.get(correlationId)
- if (subscription) {
- const event = Event.fromRaw(decoded.event.event || decoded.event.link)
- subscription.eventAppeared(
- event,
- new Position(decoded.event.commitPosition, decoded.event.preparePosition)
- )
- } else {
- this.log.error(
- {subscriptionId: correlationId, fn: 'handleStreamEventAppeared'},
- 'Received StreamEventAppeared for unknown id'
- )
- this.emit(
- 'error',
- eventstoreError.newImplementationError(
- `Received StreamEventAppeared for unknown id ${correlationId}`
- )
- )
- }
- }
-
- /**
- * Handle response for command Subscription
- */
- protected handleSubscriptionConfirmation(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.SubscriptionConfirmation.decode(payload)
-
- this.resolveCommandPromise(correlationId, {
- subscriptionId: correlationId,
- lastCommitPosition: decoded.lastCommitPosition,
- lastEventNumber: decoded.lastEventNumber
- })
- }
-
- /**
- * Handle subscription drop
- */
- protected handleSubscriptionDropped(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.SubscriptionDropped.decode(payload)
- const subscription = this.subscriptionList.get(correlationId) || null
- if (subscription) {
- subscription.emit('dropped', SubscriptionDropReason[decoded.reason])
- }
- const persistentSubscription = this.persistentSubscriptionList.get(correlationId) || null
- if (persistentSubscription) {
- persistentSubscription.emit('dropped', SubscriptionDropReason[decoded.reason])
- }
- if (this.pendingRequests.has(correlationId)) {
- if (decoded.reason === SubscriptionDropReason.Unsubscribed) {
- this.resolveCommandPromise(correlationId, SubscriptionDropReason[decoded.reason])
- } else {
- this.rejectCommandPromise(
- correlationId,
- eventstoreError.newUnspecificError(
- 'Subscription dropped: ' + SubscriptionDropReason[decoded.reason]
- )
- )
- }
- }
- }
-
- /**
- * Handle response for command TransactionCommit
- */
- protected handleTransactionCommitCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.TransactionCommitCompleted.decode(payload)
- if (
- this.checkOperationResult(
- correlationId,
- decoded.result,
- 'handleTransactionCommit: ' + decoded.message
- )
- ) {
- const result: WriteResult = {
- firstEventNumber: decoded.firstEventNumber,
- lastEventNumber: decoded.lastEventNumber,
- position: new Position(decoded.commitPosition, decoded.preparePosition)
- }
- this.resolveCommandPromise(correlationId, result)
- }
- }
-
- /**
- * Handle response for command TransactionStart
- */
- protected handleTransactionStartCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.TransactionStartCompleted.decode(payload)
- if (
- this.checkOperationResult(
- correlationId,
- decoded.result,
- 'handleTransactionStart: ' + decoded.message
- )
- ) {
- this.resolveCommandPromise(correlationId, decoded.transactionId)
- }
- }
-
- /**
- * Handles transaction write completed
- */
- protected handleTransactionWriteCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.TransactionWriteCompleted.decode(payload)
- if (
- this.checkOperationResult(
- correlationId,
- decoded.result,
- 'handleTransactionWrite: ' + decoded.message
- )
- ) {
- this.resolveCommandPromise(correlationId, decoded.transactionId)
- }
- }
-
- /**
- * Handles update persistent subscription completed
- */
- protected handleUpdatePersistentSubscriptionCompleted(
- correlationId: string,
- payload: Buffer
- ): void {
- const decoded = protobuf.UpdatePersistentSubscriptionCompleted.decode(payload)
- const status = protobuf.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult
- const message = decoded.reason || ''
- switch (decoded.result) {
- case status.Success:
- this.resolveCommandPromise(correlationId)
- break
- case status.DoesNotExist:
- this.rejectCommandPromise(correlationId, eventstoreError.newDoesNotExistError(message))
- break
- case status.AccessDenied:
- this.rejectCommandPromise(correlationId, eventstoreError.newAccessDeniedError(message))
- break
- default:
- }
- }
-
- /**
- * Handles write events completed
- */
- protected handleWriteEventsCompleted(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.WriteEventsCompleted.decode(payload)
- if (
- this.checkOperationResult(
- correlationId,
- decoded.result,
- 'handleWriteEvents: ' + decoded.message
- )
- ) {
- this.resolveCommandPromise(correlationId, decoded)
- }
- }
-
- /**
- * Handles persistent subscription confirmation
- */
- protected handlePersistentSubscriptionConfirmation(correlationId: string, payload: Buffer): void {
- const decoded = protobuf.PersistentSubscriptionConfirmation.decode(payload)
- this.resolveCommandPromise(correlationId, decoded)
- }
-
- /**
- * Handles persistent subscription stream event appeared
- */
- protected handlePersistentSubscriptionStreamEventAppeared(
- correlationId: string,
- payload: Buffer
- ): void {
- const decoded = protobuf.PersistentSubscriptionStreamEventAppeared.decode(payload)
- const subscription = this.persistentSubscriptionList.get(correlationId)
- if (subscription) {
- const event = Event.fromRaw(decoded.event.event || decoded.event.link)
- subscription.eventAppeared(event)
- } else {
- this.log.error(
- {
- subscriptionId: correlationId,
- persistentSubscriptionList: this.persistentSubscriptionList,
- fn: 'handlePersistentSubscriptionStreamEventAppeared'
- },
- 'Received PersistentSubscriptionStreamEventAppeared for unknown id'
- )
- this.emit(
- 'error',
- eventstoreError.newImplementationError(
- `Received PersistentSubscriptionStreamEventAppeared for unknown id ${correlationId}`
- )
- )
- }
- }
-
- /**
- * CHecks if given result is an error code
- * It returns true for successful result otherwise it returns false.
- * If result is an error this function rejects corresponding command promise and remove it from command queue
- */
- protected checkOperationResult(
- correlationId: string,
- result: number,
- message: string = ''
- ): boolean {
- let err: eventstoreError.EventstoreError
- switch (result) {
- case protobuf.OperationResult.Success:
- return true
- case protobuf.OperationResult.AccessDenied:
- err = eventstoreError.newAccessDeniedError(message)
- break
- case protobuf.OperationResult.CommitTimeout:
- err = eventstoreError.newCommitTimeoutError(message)
- break
- case protobuf.OperationResult.ForwardTimeout:
- err = eventstoreError.newForwardTimeoutError(message)
- break
- case protobuf.OperationResult.InvalidTransaction:
- err = eventstoreError.newInvalidTransactionError(message)
- break
- case protobuf.OperationResult.PrepareTimeout:
- err = eventstoreError.newPrepareTimeoutError(message)
- break
- case protobuf.OperationResult.StreamDeleted:
- err = eventstoreError.newStreamDeletedError(message)
- break
- case protobuf.OperationResult.WrongExpectedVersion:
- err = eventstoreError.newWrongExpectedVersionError(message)
- break
- default:
- err = eventstoreError.newUnspecificError('Invalid operation result')
- break
- }
- this.rejectCommandPromise(correlationId, err)
- return false
- }
-
- /**
- * Will be called if a command send to eventstore was replied with an error
- * In this case corresponding promise will be rejected and removed from queue
- */
- protected rejectCommandPromise(
- correlationId: string,
- error: eventstoreError.EventstoreError
- ): void {
- const resultPromise = this.pendingRequests.get(correlationId)
- if (resultPromise) {
- resultPromise.reject(error)
- this.pendingRequests.delete(correlationId)
- } else {
- const err = eventstoreError.newImplementationError(
- `Could not find correlationId ${correlationId} on rejectCommandPromise`
- )
- this.onError(err)
- }
- }
-
- /**
- * Will be called if a command send to eventstore was replied with success response
- * In this case corresponding promise will be resolved with result received from eventstore
- */
- protected resolveCommandPromise<T>(correlationId: string, result: null | T = null): void {
- const resultPromise = this.pendingRequests.get(correlationId)
- if (resultPromise) {
- resultPromise.resolve(result)
- this.pendingRequests.delete(correlationId)
- } else {
- const err = eventstoreError.newImplementationError(
- `Could not find correlationId ${correlationId} on resolveCommandPromise`
- )
- this.onError(err)
- }
- }
-
- /**
- * Subscribes to stream
- */
- public subscribeToStream(
- stream: Stream,
- resolveLinkTos: boolean = true,
- credentials: UserCredentials | null
- ): Promise<Subscription> {
- const newSubscription = new Subscription(uuid(), this, stream, resolveLinkTos, credentials)
- this.subscriptionList.set(newSubscription.id, newSubscription)
- return new Promise(
- (resolve, reject): void => {
- const resolveFunction = (): void => {
- newSubscription.isSubscribed = true
- newSubscription.emit('subscribed')
- resolve(newSubscription)
- }
- const raw = protobuf.SubscribeToStream.fromObject({
- eventStreamId: stream.id,
- resolveLinkTos
- })
- this.sendCommand(
- newSubscription.id,
- EventstoreCommand.SubscribeToStream,
- Buffer.from(protobuf.SubscribeToStream.encode(raw).finish()),
- credentials,
- {
- resolve: resolveFunction,
- reject
- }
- )
- }
- )
- }
-
- /**
- * Unsubscribes from stream
- */
- public async unsubscribeFromStream(subscriptionId: string): Promise<void> {
- const subscription = this.subscriptionList.get(subscriptionId)
- if (!subscription) {
- throw eventstoreError.newImplementationError(
- `Can not unsubscribe - subscription ${subscriptionId} not found`
- )
- }
- const subscriptionList = this.subscriptionList
- await new Promise(
- (resolve, reject): void => {
- const resolveFunction = (): void => {
- subscription.isSubscribed = false
- subscriptionList.delete(subscriptionId)
- resolve()
- }
- this.sendCommand(
- subscription.id,
- EventstoreCommand.UnsubscribeFromStream,
- null,
- subscription.getCredentials,
- {
- resolve: resolveFunction,
- reject
- }
- )
- }
- )
- }
-
- /**
- * Connects to persistent subscription
- */
- public async connectToPersistentSubscription(
- subscription: PersistentSubscription,
- allowedInFlightMessages: number = 10,
- credentials?: UserCredentials | null
- ): Promise<model.eventstore.proto.PersistentSubscriptionConfirmation> {
- this.persistentSubscriptionList.set(subscription.id, subscription)
- const result: model.eventstore.proto.PersistentSubscriptionConfirmation = await new Promise(
- (resolve, reject): void => {
- const raw = protobuf.ConnectToPersistentSubscription.fromObject({
- subscriptionId: subscription.subscriptionGroupName,
- eventStreamId: subscription.stream.id,
- allowedInFlightMessages
- })
- this.sendCommand(
- subscription.id,
- EventstoreCommand.ConnectToPersistentSubscription,
- Buffer.from(protobuf.ConnectToPersistentSubscription.encode(raw).finish()),
- credentials,
- {
- resolve,
- reject
- }
- )
- }
- )
- subscription.emit('subscribed')
- subscription.lastCommitPosition = result.lastCommitPosition
- ? Long.fromValue(result.lastCommitPosition)
- : Long.fromValue(0)
- subscription.lastEventNumber = result.lastEventNumber
- ? Long.fromValue(result.lastEventNumber)
- : Long.fromValue(-1)
-
- return result
- }
-
- /**
- * Stop listening on persistent subscription
- */
- public async unsubscribeFromPersistentSubscription(
- subscriptionId: string,
- credentials?: UserCredentials | null
- ): Promise<void> {
- const subscription = this.persistentSubscriptionList.get(subscriptionId)
- if (!subscription) {
- throw eventstoreError.newImplementationError(
- `Can not unsubscribe - persistent subscription ${subscriptionId} not found`
- )
- }
- const subscriptionList = this.persistentSubscriptionList
- await new Promise(
- (resolve, reject): void => {
- const resolveFunction = (): void => {
- subscription.state = SubscriptionStatus.disconnected
- subscriptionList.delete(subscriptionId)
- resolve()
- }
- this.sendCommand(
- subscription.id,
- EventstoreCommand.UnsubscribeFromStream,
- null,
- credentials,
- {
- resolve: resolveFunction,
- reject
- }
- )
- }
- )
- }
-
- /**
- * Emit general low level connection errors (communication errors).
- * Will not emit errors on business level
- */
- protected onError(err?: Error): void {
- let errorMessage
- let error = err ? err : eventstoreError.newConnectionError('Eventstore connection error')
-
- if (error.name === 'Error') {
- error = eventstoreError.newConnectionError(error.message, err)
- }
- errorMessage = error.message
- this.log.error({err: error}, errorMessage)
- this.emit('error', error)
- }
-
- /**
- * Emit as soon as connection to eventstore was established successfully
- */
- protected onConnect(): void {
- this.reconnectCount = 0
- this.isUnexpectedClosed = true
- this.log.debug('Connected to eventstore')
- this.state = connectionState.connected
- this.emit('connected')
-
- this.timeoutInterval = setInterval(
- this.checkTimeout.bind(this),
- this.initialConfig.operationTimeoutCheckPeriod
- )
- }
-
- /**
- * Emitted as soon as data arrives over tcp connection
- */
- protected onData(data: Buffer | null): void {
- while (data != null) {
- if (this.messageData === null) {
- data = this.handleNewResponseData(data)
- } else {
- data = this.handleMultiPacketResponseData(data)
- }
- }
- }
-
- /**
- * Emit as soon as connection to eventstore is closed
- */
- protected onClose(): void {
- this.log.debug('Connection to eventstore closed')
- this.state = connectionState.closed
- this.emit('close')
- if (this.isUnexpectedClosed) {
- this.emit('error', eventstoreError.newConnectionError('Connection closed unexpected'))
- this.connect()
- }
-
- // stop timeout interval
- if (this.timeoutInterval) {
- clearInterval(this.timeoutInterval)
- this.timeoutInterval = null
- }
-
- // reject all pending promises
- this.pendingRequests.forEach(
- (value): void => {
- value.reject(eventstoreError.newConnectionError('Connection closed'))
- }
- )
- this.pendingRequests = new Map()
-
- //drop all subscriptions
- this.subscriptionList.forEach(
- (subscription): void => {
- subscription.emit('dropped', 'Connection closed')
- }
- )
-
- //drop all persistent subscriptions
- this.persistentSubscriptionList.forEach(
- (subscription): void => {
- subscription.emit('dropped', 'Connection closed')
- }
- )
- }
-
- /**
- * Emit when connection starts draining
- */
- protected onDrain(): void {
- this.log.debug('Eventstore connection draining')
- this.state = connectionState.drain
- this.emit('drain')
- }
-
- /**
- * Emit when connection secured
- */
- protected onSecureConnect(): void {
- this.log.debug('Eventstore connection secured')
- this.emit('secureConnect')
- }
- }