src/eventstore/TCPConnection.ts
import {EventstoreSettings} from './EventstoreSettings'
import * as net from 'net'
import * as tls from 'tls'
import * as bunyan from 'bunyan'
import {EventEmitter} from 'events'
import {uuidToBuffer, uuidFromBuffer} from '../protobuf/uuidBufferConvert'
import {EventstoreCommand} from '../protobuf/EventstoreCommand'
import * as eventstoreError from '../errors'
import {EventstoreError} from '../errors'
import * as model from '../protobuf/model'
import {
Subscription,
PersistentSubscription,
SubscriptionDropReason,
SubscriptionStatus
} from '../subscription'
import {Stream} from '../stream'
import {UserCredentials} from '../eventstore/EventstoreSettings'
import uuid = require('uuid/v4')
import {Event} from '../event'
import {getIpAndPort} from './getConnectInfo'
import {Position} from './Position'
import {WriteResult} from './Eventstore'
import Long from 'long'
/** protobuf shorthand */
const protobuf = model.eventstore.proto
/** typescript enumeration of connection states */
enum connectionState {
closed,
init,
connected,
drain
}
/** raw tcp communication constant */
const FLAGS_NONE = 0x00
/** raw tcp communication constant */
const FLAGS_AUTH = 0x01
/** raw tcp communication constant */
const UINT32_LENGTH = 4
/** raw tcp communication constant */
const GUID_LENGTH = 16
/** raw tcp communication constant */
const HEADER_LENGTH = 1 + 1 + GUID_LENGTH // Cmd + Flags + CorrelationId
/** raw tcp communication constant */
const COMMAND_OFFSET = UINT32_LENGTH
/** raw tcp communication constant */
const FLAGS_OFFSET = COMMAND_OFFSET + 1
/** raw tcp communication constant */
const CORRELATION_ID_OFFSET = FLAGS_OFFSET + 1
/** raw tcp communication constant */
const DATA_OFFSET = CORRELATION_ID_OFFSET + GUID_LENGTH // Length + Cmd + Flags + CorrelationId
/**
* Raw tcp communication to eventstore
* This class handles basic communication with eventstore
*/
export class TCPConnection extends EventEmitter {
/** initial config */
protected initialConfig: EventstoreSettings
/** config after discovery process */
protected connectionConfig: EventstoreSettings
/** tcp socket */
protected socket: net.Socket | tls.TLSSocket
/** connection id */
protected connectionId: string | null = null
/** list of pending requests */
protected pendingRequests: Map<
string,
{resolve: Function; reject: Function; sendTime: number}
> = new Map()
/** timeout interval for timed out pending requests */
protected timeoutInterval: null | NodeJS.Timeout = null
/** logger instance */
public log: bunyan
/** connection state */
protected state: connectionState = connectionState.closed
/** message offset of tcp data */
protected messageCurrentOffset: number = 0
/** message length of tcp data */
protected messageCurrentLength: number = 0
/** message buffer of tcp data */
protected messageData: Buffer | null = null
/** list of subscriptions */
protected subscriptionList: Map<string, Subscription> = new Map()
/** list of persistent subscriptions */
protected persistentSubscriptionList: Map<string, PersistentSubscription> = new Map()
/** indicates if connection close is wanted by user or not */
protected isUnexpectedClosed: boolean = true
/** counter for re-connections */
protected reconnectCount: number = 0
/**
*Creates an instance of TCPConnection.
*/
public constructor(connectionConfiguration: EventstoreSettings) {
super()
this.initialConfig = {...connectionConfiguration}
this.connectionConfig = connectionConfiguration
this.log = this.connectionConfig.logger.child
? this.connectionConfig.logger.child({module: 'TCPConnection'})
: this.connectionConfig.logger
this.socket = new net.Socket()
}
/**
* Returns true if connected to eventstore otherwise false
*/
public get isConnected(): boolean {
return this.state === connectionState.connected
}
/**
* Called to connect to eventstore
*/
public async connect(): Promise<void> {
let connected = false
while (!connected && this.reconnectCount < this.initialConfig.maxReconnections) {
this.state = connectionState.init
this.connectionConfig = await getIpAndPort({...this.initialConfig}, this.log)
try {
await this.tryToConnect()
connected = true
} catch (err) {
this.log.error({err, count: this.reconnectCount, fn: 'connect'}, 'Try to connect failed ')
this.reconnectCount++
this.emit('reconnect', this.reconnectCount)
await new Promise(
(resolve): void => {
setTimeout(resolve, this.initialConfig.reconnectionDelay)
}
)
}
}
}
/**
* Connect to eventstore
*/
protected async tryToConnect(): Promise<void> {
const port = this.connectionConfig.port
const host = this.connectionConfig.host
if (port === 0 || host === '') {
throw eventstoreError.newConnectionError('Invalid connection settings on host and port')
}
this.log.debug(`Start connecting to ${host}:${port}`)
await new Promise(
(resolve, reject): void => {
const errorListener = (err: Error | EventstoreError): void => {
this.state = connectionState.closed
if (err instanceof Error) {
this.onError(eventstoreError.newConnectionError(err.message, err))
} else {
this.onError(err)
}
reject(err)
}
const successListener = (): void => {
if (this.socket instanceof tls.TLSSocket) {
if (!this.socket.authorized) {
this.log.warn({err: this.socket.authorizationError}, 'SSL authorization warning')
}
}
this.socket.removeListener('error', errorListener)
this.socket.on('error', this.onError.bind(this))
this.onConnect()
resolve()
}
if (this.connectionConfig.useSSL) {
let secureContext
if (this.connectionConfig.secureContext) {
try {
secureContext = tls.createSecureContext(this.connectionConfig.secureContext)
} catch (err) {
const conErr = eventstoreError.newConnectionError(
'Error creating secure context',
err
)
reject(conErr)
}
}
const options = {
port,
host,
servername: host,
requestCert: this.connectionConfig.validateServer,
rejectUnauthorized: this.connectionConfig.validateServer,
timeout: this.connectionConfig.connectTimeout,
secureContext
}
this.socket = tls.connect(options, successListener)
} else {
const options = {
port,
host,
servername: host,
timeout: this.connectionConfig.connectTimeout
}
this.socket = net.connect(options, successListener)
}
this.socket.once('error', errorListener.bind(this))
this.socket.on('close', this.onClose.bind(this))
this.socket.on('data', this.onData.bind(this))
this.socket.on('secureConnect', this.onSecureConnect.bind(this))
}
)
}
/**
* Disconnect from eventstore.
* It tries to drain pending queue to prevent data loose before connection gets closed
* If disconnect() is call no new outgoing requests accepted
*/
public async disconnect(): Promise<void> {
if (!this.isConnected) {
return
}
this.isUnexpectedClosed = false
await new Promise(
(resolve): void => {
this.onDrain()
if (this.pendingRequests.size <= 0) {
this.state = connectionState.closed
this.socket.destroy()
resolve()
} else {
this.log.debug(
{
pendingRequests: this.pendingRequests.size,
timeout:
this.initialConfig.operationTimeout + this.initialConfig.operationTimeoutCheckPeriod
},
'Wait for pending requests'
)
// wait for pending requests/timeouts
setTimeout((): void => {
this.state = connectionState.closed
this.socket.destroy()
this.log.debug('Timeout finished')
this.pendingRequests.forEach(
(value, id): void => {
this.rejectCommandPromise(
id,
eventstoreError.newConnectionError('Connection closed')
)
}
)
resolve()
}, this.initialConfig.operationTimeout + this.initialConfig.operationTimeoutCheckPeriod)
}
}
)
}
/**
* Called by interval function to check if there are some pending requests which should be rejected with time out error
*/
protected checkTimeout(): void {
this.log.trace('Check timeout queue')
const timeout: string[] = []
const now = Date.now() - this.initialConfig.operationTimeout
for (var [key, value] of this.pendingRequests) {
if (value.sendTime < now) {
timeout.push(key)
}
}
for (let x = 0, xMax = timeout.length; x < xMax; x++) {
try {
this.rejectCommandPromise(
timeout[x],
eventstoreError.newTimeoutError('Timeout by eventstore-ts-client')
)
} catch (err) {
this.log.error({err, fn: 'checkTimeout'}, 'Error on rejectCommandPromise')
}
}
}
/**
* Creates and sends raw data message to eventstore and adds given promise to pending queue
*/
public sendCommand(
correlationId: string,
command: EventstoreCommand,
data: Buffer | null = null,
credentials: UserCredentials | null = null,
promise: {resolve: Function; reject: Function} | null = null
): void {
this.log.trace(`Sending ${EventstoreCommand[command]} with ${correlationId}`)
if (
this.state !== connectionState.connected &&
command !== EventstoreCommand.HeartbeatResponseCommand &&
command !== EventstoreCommand.Pong
) {
throw eventstoreError.newConnectionError(
'Connection to eventstore is: ' + connectionState[this.state]
)
}
if (promise) {
if (this.pendingRequests.size >= this.connectionConfig.maxQueueSize) {
promise.reject(eventstoreError.newConnectionError('Maximum concurrent items reached'))
throw eventstoreError.newConnectionError('Maximum concurrent items reached')
}
this.pendingRequests.set(correlationId, {...promise, sendTime: Date.now()})
}
try {
let authLength = 0
let flags = FLAGS_NONE
if (credentials) {
flags = FLAGS_AUTH
authLength = 1 + credentials.username.length + 1 + credentials.password.length
}
let commandLength = HEADER_LENGTH + authLength
if (data) {
commandLength += data.length
}
const packetLength = 4 + commandLength
const buf = Buffer.alloc(packetLength)
buf.writeUInt32LE(commandLength, 0)
buf[COMMAND_OFFSET] = command
buf[FLAGS_OFFSET] = flags
uuidToBuffer(correlationId).copy(buf, CORRELATION_ID_OFFSET, 0, GUID_LENGTH)
if (credentials) {
buf.writeUInt8(credentials.username.length, DATA_OFFSET)
buf.write(credentials.username, DATA_OFFSET + 1)
buf.writeUInt8(credentials.password.length, DATA_OFFSET + 1 + credentials.username.length)
buf.write(credentials.password, DATA_OFFSET + 1 + credentials.username.length + 1)
}
if (data) {
data.copy(buf, DATA_OFFSET + authLength, 0, data.length)
}
this.socket.write(buf)
} catch (err) {
const newErr = eventstoreError.newConnectionError(err.message, err)
this.rejectCommandPromise(correlationId, newErr)
this.onError(newErr)
}
}
/**
* Gets called as soon as new data over tcp connection arrives as raw buffer data
* Checks if
* - new received data is part of previously received data
* - new data contains multiple responses
* - new data is single response
*/
protected handleNewResponseData(data: Buffer): Buffer | null {
const commandLength = data.readUInt32LE(0)
if (commandLength < HEADER_LENGTH) {
this.log.error(
{
connectionId: this.connectionId,
fn: 'handleNewResponseData'
},
'Invalid command length of ' + commandLength + ' bytes'
)
throw eventstoreError.newProtocolError('Invalid command length')
}
const messageLength = UINT32_LENGTH + commandLength
if (data.length === messageLength) {
// A single packet message, no need to copy into another buffer
this.handleSingleResponseData(data)
return null
} else if (data.length > messageLength) {
// Multiple messages in one packet
const firstMessage = data.slice(0, messageLength)
this.messageCurrentLength = messageLength
this.handleSingleResponseData(firstMessage)
return data.slice(this.messageCurrentLength)
} else {
// The first packet of a multi-packet message
this.messageData = Buffer.alloc(messageLength)
const packetLength = data.copy(this.messageData, this.messageCurrentOffset, 0)
this.messageCurrentOffset = packetLength
return null
}
}
/**
* This function handles raw buffer responses received within multiple tcp data package
*/
protected handleMultiPacketResponseData(data: Buffer): Buffer | null {
this.log.trace({fn: 'handleMultiPacketResponseData'}, `MultipacketResponse`)
if (this.messageData === null) {
return null
}
const packetLength = data.copy(this.messageData, this.messageCurrentOffset, 0)
this.messageCurrentOffset += packetLength
if (this.messageCurrentOffset >= this.messageData.length) {
this.handleSingleResponseData(this.messageData)
this.messageData = null
this.messageCurrentOffset = 0
}
return null
}
/**
* This function handles a single raw buffer response
*/
protected handleSingleResponseData(data: Buffer): void {
const commandLength = data.readUInt32LE(0)
if (commandLength < HEADER_LENGTH) {
this.log.error(
{
connectionId: this.connectionId,
fn: 'handleSingleResponseData'
},
'Invalid command length of ' + commandLength + ' bytes'
)
throw eventstoreError.newProtocolError('Invalid command length')
}
const command = data[COMMAND_OFFSET]
const correlationId = uuidFromBuffer(
data.slice(CORRELATION_ID_OFFSET, CORRELATION_ID_OFFSET + GUID_LENGTH)
)
this.log.trace('Incoming response: ' + EventstoreCommand[command])
//Answer Heartbeat directly without adding to promise queue
if (command === EventstoreCommand.HeartbeatRequestCommand) {
this.emit('heartbeat')
this.sendCommand(correlationId, EventstoreCommand.HeartbeatResponseCommand)
return
}
//Answer Ping directly without adding to promise queue
if (command === EventstoreCommand.Ping) {
this.sendCommand(correlationId, EventstoreCommand.Pong)
return
}
const payloadLength = commandLength - HEADER_LENGTH
const payload = Buffer.alloc(payloadLength)
if (payloadLength > 0) {
data.copy(payload, 0, DATA_OFFSET, DATA_OFFSET + payloadLength)
}
let err: eventstoreError.EventstoreError
switch (command) {
case EventstoreCommand.BadRequest:
err = eventstoreError.newBadRequestError()
this.rejectCommandPromise(correlationId, err)
this.onError(err)
break
case EventstoreCommand.NotAuthenticated:
err = eventstoreError.newNotAuthenticatedError()
this.rejectCommandPromise(correlationId, err)
this.onError(err)
break
case EventstoreCommand.NotHandled:
const notHandled = protobuf.NotHandled.decode(payload)
err = eventstoreError.newNotHandledError(`
${protobuf.NotHandled.NotHandledReason[notHandled.reason]}
`)
this.rejectCommandPromise(correlationId, err)
this.onError(err)
break
case EventstoreCommand.CreatePersistentSubscriptionCompleted:
this.handleCreatePersistentSubscriptionCompleted(correlationId, payload)
break
case EventstoreCommand.DeletePersistentSubscriptionCompleted:
this.handleDeletePersistentSubscriptionCompleted(correlationId, payload)
break
case EventstoreCommand.DeleteStreamCompleted:
this.handleDeleteStreamCompleted(correlationId, payload)
break
case EventstoreCommand.PersistentSubscriptionConfirmation:
this.handlePersistentSubscriptionConfirmation(correlationId, payload)
break
case EventstoreCommand.PersistentSubscriptionStreamEventAppeared:
this.handlePersistentSubscriptionStreamEventAppeared(correlationId, payload)
break
case EventstoreCommand.ReadAllEventsBackwardCompleted:
this.handleReadAllEventsCompleted(correlationId, payload)
break
case EventstoreCommand.ReadAllEventsForwardCompleted:
this.handleReadAllEventsCompleted(correlationId, payload)
break
case EventstoreCommand.ReadEventCompleted:
this.handleReadEventCompleted(correlationId, payload)
break
case EventstoreCommand.ReadStreamEventsBackwardCompleted:
this.handleReadStreamEventsCompleted(correlationId, payload)
break
case EventstoreCommand.ReadStreamEventsForwardCompleted:
this.handleReadStreamEventsCompleted(correlationId, payload)
break
case EventstoreCommand.StreamEventAppeared:
this.handleStreamEventAppeared(correlationId, payload)
break
case EventstoreCommand.SubscriptionConfirmation:
this.handleSubscriptionConfirmation(correlationId, payload)
break
case EventstoreCommand.SubscriptionDropped:
this.handleSubscriptionDropped(correlationId, payload)
break
case EventstoreCommand.TransactionCommitCompleted:
this.handleTransactionCommitCompleted(correlationId, payload)
break
case EventstoreCommand.TransactionStartCompleted:
this.handleTransactionStartCompleted(correlationId, payload)
break
case EventstoreCommand.TransactionWriteCompleted:
this.handleTransactionWriteCompleted(correlationId, payload)
break
case EventstoreCommand.UpdatePersistentSubscriptionCompleted:
this.handleUpdatePersistentSubscriptionCompleted(correlationId, payload)
break
case EventstoreCommand.WriteEventsCompleted:
this.handleWriteEventsCompleted(correlationId, payload)
break
case EventstoreCommand.ClientIdentified:
this.resolveCommandPromise(correlationId)
break
case EventstoreCommand.Pong:
this.resolveCommandPromise(correlationId)
break
case EventstoreCommand.Authenticated:
this.resolveCommandPromise(correlationId)
break
default:
err = new eventstoreError.EventstoreError(
'Unhandled eventstore command : ' + EventstoreCommand[command] + ' -> ' + command,
'EventstoreImplementationError'
)
this.rejectCommandPromise(correlationId, err)
this.onError(err)
break
}
}
/**
* Handle response for command CreatePersistentSubscription
*/
protected handleCreatePersistentSubscriptionCompleted(
correlationId: string,
payload: Buffer
): void {
const decoded = protobuf.CreatePersistentSubscriptionCompleted.decode(payload)
if (
decoded.result ===
protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Success
) {
this.resolveCommandPromise(correlationId)
} else {
const errorMsg =
`${
protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult[
decoded.result
]
} ` + (decoded.reason || '')
let err
switch (decoded.result) {
case protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult
.AccessDenied:
err = eventstoreError.newAccessDeniedError(errorMsg)
break
case protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult
.AlreadyExists:
err = eventstoreError.newAlreadyExistError(errorMsg)
break
default:
err = eventstoreError.newUnspecificError(errorMsg)
}
this.rejectCommandPromise(correlationId, err)
}
}
/**
* Handle response for command DeletePersistentSubscription
*/
protected handleDeletePersistentSubscriptionCompleted(
correlationId: string,
payload: Buffer
): void {
const status = protobuf.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult
const decoded = protobuf.DeletePersistentSubscriptionCompleted.decode(payload)
if (decoded.result === status.Success) {
this.resolveCommandPromise(correlationId)
} else {
let returnError
switch (decoded.result) {
case status.AccessDenied:
returnError = eventstoreError.newAccessDeniedError(
'Delete of Subscription not allowed: ' + decoded.reason || ''
)
break
case status.DoesNotExist:
returnError = eventstoreError.newDoesNotExistError(
'Persistent subscription does not exist: ' + decoded.reason || ''
)
break
default:
returnError = eventstoreError.newUnspecificError(
'Delete persistent connection failed: ' + (decoded.reason || '')
)
}
this.rejectCommandPromise(correlationId, returnError)
}
}
/**
* Handle response for command DeleteStreamCompleted
*/
protected handleDeleteStreamCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.DeleteStreamCompleted.decode(payload)
if (
this.checkOperationResult(
correlationId,
decoded.result,
'handleDeleteStream: ' + decoded.message
)
) {
this.resolveCommandPromise(
correlationId,
new Position(decoded.commitPosition, decoded.preparePosition)
)
}
}
/**
* Handle response for command ReadAllEvents
*/
protected handleReadAllEventsCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.ReadAllEventsCompleted.decode(payload)
let err: eventstoreError.EventstoreError
const message: string = decoded.error || ''
switch (decoded.result) {
case protobuf.ReadAllEventsCompleted.ReadAllResult.Success:
this.resolveCommandPromise(correlationId, decoded)
return
case protobuf.ReadAllEventsCompleted.ReadAllResult.AccessDenied:
err = eventstoreError.newAccessDeniedError(message)
break
case protobuf.ReadAllEventsCompleted.ReadAllResult.NotModified:
err = eventstoreError.newNotModifiedError(message)
break
default:
err = eventstoreError.newUnspecificError(message)
}
this.rejectCommandPromise(correlationId, err)
}
/**
* Handle response for command ReadStreamEvents
*/
protected handleReadStreamEventsCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.ReadStreamEventsCompleted.decode(payload)
let err: eventstoreError.EventstoreError
const message: string = decoded.error || ''
switch (decoded.result) {
case protobuf.ReadStreamEventsCompleted.ReadStreamResult.Success:
this.resolveCommandPromise(correlationId, decoded)
return
case protobuf.ReadStreamEventsCompleted.ReadStreamResult.NoStream:
err = eventstoreError.newNoStreamError(message)
break
case protobuf.ReadStreamEventsCompleted.ReadStreamResult.NotModified:
err = eventstoreError.newNotModifiedError(message)
break
case protobuf.ReadStreamEventsCompleted.ReadStreamResult.StreamDeleted:
err = eventstoreError.newStreamDeletedError(message)
break
case protobuf.ReadStreamEventsCompleted.ReadStreamResult.AccessDenied:
err = eventstoreError.newAccessDeniedError(message)
break
default:
err = eventstoreError.newUnspecificError(message)
}
this.rejectCommandPromise(correlationId, err)
}
/**
* Handle response for command ReadEvent
*/
protected handleReadEventCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.ReadEventCompleted.decode(payload)
let err: eventstoreError.EventstoreError
const message: string = decoded.error || ''
switch (decoded.result) {
case protobuf.ReadEventCompleted.ReadEventResult.Success:
this.resolveCommandPromise(correlationId, decoded.event)
return
case protobuf.ReadEventCompleted.ReadEventResult.NotFound:
err = eventstoreError.newNotFoundError(message)
break
case protobuf.ReadEventCompleted.ReadEventResult.NoStream:
err = eventstoreError.newNoStreamError(message)
break
case protobuf.ReadEventCompleted.ReadEventResult.StreamDeleted:
err = eventstoreError.newStreamDeletedError(message)
break
case protobuf.ReadEventCompleted.ReadEventResult.AccessDenied:
err = eventstoreError.newAccessDeniedError(message)
break
default:
err = eventstoreError.newUnspecificError(message)
}
this.rejectCommandPromise(correlationId, err)
}
/**
* Handle incoming event for subscription
*/
protected handleStreamEventAppeared(correlationId: string, payload: Buffer): void {
const decoded = protobuf.StreamEventAppeared.decode(payload)
const subscription = this.subscriptionList.get(correlationId)
if (subscription) {
const event = Event.fromRaw(decoded.event.event || decoded.event.link)
subscription.eventAppeared(
event,
new Position(decoded.event.commitPosition, decoded.event.preparePosition)
)
} else {
this.log.error(
{subscriptionId: correlationId, fn: 'handleStreamEventAppeared'},
'Received StreamEventAppeared for unknown id'
)
this.emit(
'error',
eventstoreError.newImplementationError(
`Received StreamEventAppeared for unknown id ${correlationId}`
)
)
}
}
/**
* Handle response for command Subscription
*/
protected handleSubscriptionConfirmation(correlationId: string, payload: Buffer): void {
const decoded = protobuf.SubscriptionConfirmation.decode(payload)
this.resolveCommandPromise(correlationId, {
subscriptionId: correlationId,
lastCommitPosition: decoded.lastCommitPosition,
lastEventNumber: decoded.lastEventNumber
})
}
/**
* Handle subscription drop
*/
protected handleSubscriptionDropped(correlationId: string, payload: Buffer): void {
const decoded = protobuf.SubscriptionDropped.decode(payload)
const subscription = this.subscriptionList.get(correlationId) || null
if (subscription) {
subscription.emit('dropped', SubscriptionDropReason[decoded.reason])
}
const persistentSubscription = this.persistentSubscriptionList.get(correlationId) || null
if (persistentSubscription) {
persistentSubscription.emit('dropped', SubscriptionDropReason[decoded.reason])
}
if (this.pendingRequests.has(correlationId)) {
if (decoded.reason === SubscriptionDropReason.Unsubscribed) {
this.resolveCommandPromise(correlationId, SubscriptionDropReason[decoded.reason])
} else {
this.rejectCommandPromise(
correlationId,
eventstoreError.newUnspecificError(
'Subscription dropped: ' + SubscriptionDropReason[decoded.reason]
)
)
}
}
}
/**
* Handle response for command TransactionCommit
*/
protected handleTransactionCommitCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.TransactionCommitCompleted.decode(payload)
if (
this.checkOperationResult(
correlationId,
decoded.result,
'handleTransactionCommit: ' + decoded.message
)
) {
const result: WriteResult = {
firstEventNumber: decoded.firstEventNumber,
lastEventNumber: decoded.lastEventNumber,
position: new Position(decoded.commitPosition, decoded.preparePosition)
}
this.resolveCommandPromise(correlationId, result)
}
}
/**
* Handle response for command TransactionStart
*/
protected handleTransactionStartCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.TransactionStartCompleted.decode(payload)
if (
this.checkOperationResult(
correlationId,
decoded.result,
'handleTransactionStart: ' + decoded.message
)
) {
this.resolveCommandPromise(correlationId, decoded.transactionId)
}
}
/**
* Handles transaction write completed
*/
protected handleTransactionWriteCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.TransactionWriteCompleted.decode(payload)
if (
this.checkOperationResult(
correlationId,
decoded.result,
'handleTransactionWrite: ' + decoded.message
)
) {
this.resolveCommandPromise(correlationId, decoded.transactionId)
}
}
/**
* Handles update persistent subscription completed
*/
protected handleUpdatePersistentSubscriptionCompleted(
correlationId: string,
payload: Buffer
): void {
const decoded = protobuf.UpdatePersistentSubscriptionCompleted.decode(payload)
const status = protobuf.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult
const message = decoded.reason || ''
switch (decoded.result) {
case status.Success:
this.resolveCommandPromise(correlationId)
break
case status.DoesNotExist:
this.rejectCommandPromise(correlationId, eventstoreError.newDoesNotExistError(message))
break
case status.AccessDenied:
this.rejectCommandPromise(correlationId, eventstoreError.newAccessDeniedError(message))
break
default:
}
}
/**
* Handles write events completed
*/
protected handleWriteEventsCompleted(correlationId: string, payload: Buffer): void {
const decoded = protobuf.WriteEventsCompleted.decode(payload)
if (
this.checkOperationResult(
correlationId,
decoded.result,
'handleWriteEvents: ' + decoded.message
)
) {
this.resolveCommandPromise(correlationId, decoded)
}
}
/**
* Handles persistent subscription confirmation
*/
protected handlePersistentSubscriptionConfirmation(correlationId: string, payload: Buffer): void {
const decoded = protobuf.PersistentSubscriptionConfirmation.decode(payload)
this.resolveCommandPromise(correlationId, decoded)
}
/**
* Handles persistent subscription stream event appeared
*/
protected handlePersistentSubscriptionStreamEventAppeared(
correlationId: string,
payload: Buffer
): void {
const decoded = protobuf.PersistentSubscriptionStreamEventAppeared.decode(payload)
const subscription = this.persistentSubscriptionList.get(correlationId)
if (subscription) {
const event = Event.fromRaw(decoded.event.event || decoded.event.link)
subscription.eventAppeared(event)
} else {
this.log.error(
{
subscriptionId: correlationId,
persistentSubscriptionList: this.persistentSubscriptionList,
fn: 'handlePersistentSubscriptionStreamEventAppeared'
},
'Received PersistentSubscriptionStreamEventAppeared for unknown id'
)
this.emit(
'error',
eventstoreError.newImplementationError(
`Received PersistentSubscriptionStreamEventAppeared for unknown id ${correlationId}`
)
)
}
}
/**
* CHecks if given result is an error code
* It returns true for successful result otherwise it returns false.
* If result is an error this function rejects corresponding command promise and remove it from command queue
*/
protected checkOperationResult(
correlationId: string,
result: number,
message: string = ''
): boolean {
let err: eventstoreError.EventstoreError
switch (result) {
case protobuf.OperationResult.Success:
return true
case protobuf.OperationResult.AccessDenied:
err = eventstoreError.newAccessDeniedError(message)
break
case protobuf.OperationResult.CommitTimeout:
err = eventstoreError.newCommitTimeoutError(message)
break
case protobuf.OperationResult.ForwardTimeout:
err = eventstoreError.newForwardTimeoutError(message)
break
case protobuf.OperationResult.InvalidTransaction:
err = eventstoreError.newInvalidTransactionError(message)
break
case protobuf.OperationResult.PrepareTimeout:
err = eventstoreError.newPrepareTimeoutError(message)
break
case protobuf.OperationResult.StreamDeleted:
err = eventstoreError.newStreamDeletedError(message)
break
case protobuf.OperationResult.WrongExpectedVersion:
err = eventstoreError.newWrongExpectedVersionError(message)
break
default:
err = eventstoreError.newUnspecificError('Invalid operation result')
break
}
this.rejectCommandPromise(correlationId, err)
return false
}
/**
* Will be called if a command send to eventstore was replied with an error
* In this case corresponding promise will be rejected and removed from queue
*/
protected rejectCommandPromise(
correlationId: string,
error: eventstoreError.EventstoreError
): void {
const resultPromise = this.pendingRequests.get(correlationId)
if (resultPromise) {
resultPromise.reject(error)
this.pendingRequests.delete(correlationId)
} else {
const err = eventstoreError.newImplementationError(
`Could not find correlationId ${correlationId} on rejectCommandPromise`
)
this.onError(err)
}
}
/**
* Will be called if a command send to eventstore was replied with success response
* In this case corresponding promise will be resolved with result received from eventstore
*/
protected resolveCommandPromise<T>(correlationId: string, result: null | T = null): void {
const resultPromise = this.pendingRequests.get(correlationId)
if (resultPromise) {
resultPromise.resolve(result)
this.pendingRequests.delete(correlationId)
} else {
const err = eventstoreError.newImplementationError(
`Could not find correlationId ${correlationId} on resolveCommandPromise`
)
this.onError(err)
}
}
/**
* Subscribes to stream
*/
public subscribeToStream(
stream: Stream,
resolveLinkTos: boolean = true,
credentials: UserCredentials | null
): Promise<Subscription> {
const newSubscription = new Subscription(uuid(), this, stream, resolveLinkTos, credentials)
this.subscriptionList.set(newSubscription.id, newSubscription)
return new Promise(
(resolve, reject): void => {
const resolveFunction = (): void => {
newSubscription.isSubscribed = true
newSubscription.emit('subscribed')
resolve(newSubscription)
}
const raw = protobuf.SubscribeToStream.fromObject({
eventStreamId: stream.id,
resolveLinkTos
})
this.sendCommand(
newSubscription.id,
EventstoreCommand.SubscribeToStream,
Buffer.from(protobuf.SubscribeToStream.encode(raw).finish()),
credentials,
{
resolve: resolveFunction,
reject
}
)
}
)
}
/**
* Unsubscribes from stream
*/
public async unsubscribeFromStream(subscriptionId: string): Promise<void> {
const subscription = this.subscriptionList.get(subscriptionId)
if (!subscription) {
throw eventstoreError.newImplementationError(
`Can not unsubscribe - subscription ${subscriptionId} not found`
)
}
const subscriptionList = this.subscriptionList
await new Promise(
(resolve, reject): void => {
const resolveFunction = (): void => {
subscription.isSubscribed = false
subscriptionList.delete(subscriptionId)
resolve()
}
this.sendCommand(
subscription.id,
EventstoreCommand.UnsubscribeFromStream,
null,
subscription.getCredentials,
{
resolve: resolveFunction,
reject
}
)
}
)
}
/**
* Connects to persistent subscription
*/
public async connectToPersistentSubscription(
subscription: PersistentSubscription,
allowedInFlightMessages: number = 10,
credentials?: UserCredentials | null
): Promise<model.eventstore.proto.PersistentSubscriptionConfirmation> {
this.persistentSubscriptionList.set(subscription.id, subscription)
const result: model.eventstore.proto.PersistentSubscriptionConfirmation = await new Promise(
(resolve, reject): void => {
const raw = protobuf.ConnectToPersistentSubscription.fromObject({
subscriptionId: subscription.subscriptionGroupName,
eventStreamId: subscription.stream.id,
allowedInFlightMessages
})
this.sendCommand(
subscription.id,
EventstoreCommand.ConnectToPersistentSubscription,
Buffer.from(protobuf.ConnectToPersistentSubscription.encode(raw).finish()),
credentials,
{
resolve,
reject
}
)
}
)
subscription.emit('subscribed')
subscription.lastCommitPosition = result.lastCommitPosition
? Long.fromValue(result.lastCommitPosition)
: Long.fromValue(0)
subscription.lastEventNumber = result.lastEventNumber
? Long.fromValue(result.lastEventNumber)
: Long.fromValue(-1)
return result
}
/**
* Stop listening on persistent subscription
*/
public async unsubscribeFromPersistentSubscription(
subscriptionId: string,
credentials?: UserCredentials | null
): Promise<void> {
const subscription = this.persistentSubscriptionList.get(subscriptionId)
if (!subscription) {
throw eventstoreError.newImplementationError(
`Can not unsubscribe - persistent subscription ${subscriptionId} not found`
)
}
const subscriptionList = this.persistentSubscriptionList
await new Promise(
(resolve, reject): void => {
const resolveFunction = (): void => {
subscription.state = SubscriptionStatus.disconnected
subscriptionList.delete(subscriptionId)
resolve()
}
this.sendCommand(
subscription.id,
EventstoreCommand.UnsubscribeFromStream,
null,
credentials,
{
resolve: resolveFunction,
reject
}
)
}
)
}
/**
* Emit general low level connection errors (communication errors).
* Will not emit errors on business level
*/
protected onError(err?: Error): void {
let errorMessage
let error = err ? err : eventstoreError.newConnectionError('Eventstore connection error')
if (error.name === 'Error') {
error = eventstoreError.newConnectionError(error.message, err)
}
errorMessage = error.message
this.log.error({err: error}, errorMessage)
this.emit('error', error)
}
/**
* Emit as soon as connection to eventstore was established successfully
*/
protected onConnect(): void {
this.reconnectCount = 0
this.isUnexpectedClosed = true
this.log.debug('Connected to eventstore')
this.state = connectionState.connected
this.emit('connected')
this.timeoutInterval = setInterval(
this.checkTimeout.bind(this),
this.initialConfig.operationTimeoutCheckPeriod
)
}
/**
* Emitted as soon as data arrives over tcp connection
*/
protected onData(data: Buffer | null): void {
while (data != null) {
if (this.messageData === null) {
data = this.handleNewResponseData(data)
} else {
data = this.handleMultiPacketResponseData(data)
}
}
}
/**
* Emit as soon as connection to eventstore is closed
*/
protected onClose(): void {
this.log.debug('Connection to eventstore closed')
this.state = connectionState.closed
this.emit('close')
if (this.isUnexpectedClosed) {
this.emit('error', eventstoreError.newConnectionError('Connection closed unexpected'))
this.connect()
}
// stop timeout interval
if (this.timeoutInterval) {
clearInterval(this.timeoutInterval)
this.timeoutInterval = null
}
// reject all pending promises
this.pendingRequests.forEach(
(value): void => {
value.reject(eventstoreError.newConnectionError('Connection closed'))
}
)
this.pendingRequests = new Map()
//drop all subscriptions
this.subscriptionList.forEach(
(subscription): void => {
subscription.emit('dropped', 'Connection closed')
}
)
//drop all persistent subscriptions
this.persistentSubscriptionList.forEach(
(subscription): void => {
subscription.emit('dropped', 'Connection closed')
}
)
}
/**
* Emit when connection starts draining
*/
protected onDrain(): void {
this.log.debug('Eventstore connection draining')
this.state = connectionState.drain
this.emit('drain')
}
/**
* Emit when connection secured
*/
protected onSecureConnect(): void {
this.log.debug('Eventstore connection secured')
this.emit('secureConnect')
}
}