Home Reference Source Test

src/eventstore/TCPConnection.ts

  1. import {EventstoreSettings} from './EventstoreSettings'
  2. import * as net from 'net'
  3. import * as tls from 'tls'
  4. import * as bunyan from 'bunyan'
  5. import {EventEmitter} from 'events'
  6. import {uuidToBuffer, uuidFromBuffer} from '../protobuf/uuidBufferConvert'
  7. import {EventstoreCommand} from '../protobuf/EventstoreCommand'
  8. import * as eventstoreError from '../errors'
  9. import {EventstoreError} from '../errors'
  10. import * as model from '../protobuf/model'
  11. import {
  12. Subscription,
  13. PersistentSubscription,
  14. SubscriptionDropReason,
  15. SubscriptionStatus
  16. } from '../subscription'
  17. import {Stream} from '../stream'
  18. import {UserCredentials} from '../eventstore/EventstoreSettings'
  19. import uuid = require('uuid/v4')
  20. import {Event} from '../event'
  21. import {getIpAndPort} from './getConnectInfo'
  22. import {Position} from './Position'
  23. import {WriteResult} from './Eventstore'
  24. import Long from 'long'
  25.  
  26. /** protobuf shorthand */
  27. const protobuf = model.eventstore.proto
  28.  
  29. /** typescript enumeration of connection states */
  30. enum connectionState {
  31. closed,
  32. init,
  33. connected,
  34. drain
  35. }
  36.  
  37. /** raw tcp communication constant */
  38. const FLAGS_NONE = 0x00
  39. /** raw tcp communication constant */
  40. const FLAGS_AUTH = 0x01
  41. /** raw tcp communication constant */
  42. const UINT32_LENGTH = 4
  43. /** raw tcp communication constant */
  44. const GUID_LENGTH = 16
  45. /** raw tcp communication constant */
  46. const HEADER_LENGTH = 1 + 1 + GUID_LENGTH // Cmd + Flags + CorrelationId
  47. /** raw tcp communication constant */
  48. const COMMAND_OFFSET = UINT32_LENGTH
  49. /** raw tcp communication constant */
  50. const FLAGS_OFFSET = COMMAND_OFFSET + 1
  51. /** raw tcp communication constant */
  52. const CORRELATION_ID_OFFSET = FLAGS_OFFSET + 1
  53. /** raw tcp communication constant */
  54. const DATA_OFFSET = CORRELATION_ID_OFFSET + GUID_LENGTH // Length + Cmd + Flags + CorrelationId
  55.  
  56. /**
  57. * Raw tcp communication to eventstore
  58. * This class handles basic communication with eventstore
  59. */
  60. export class TCPConnection extends EventEmitter {
  61. /** initial config */
  62. protected initialConfig: EventstoreSettings
  63. /** config after discovery process */
  64. protected connectionConfig: EventstoreSettings
  65. /** tcp socket */
  66. protected socket: net.Socket | tls.TLSSocket
  67. /** connection id */
  68. protected connectionId: string | null = null
  69. /** list of pending requests */
  70. protected pendingRequests: Map<
  71. string,
  72. {resolve: Function; reject: Function; sendTime: number}
  73. > = new Map()
  74. /** timeout interval for timed out pending requests */
  75. protected timeoutInterval: null | NodeJS.Timeout = null
  76. /** logger instance */
  77. public log: bunyan
  78. /** connection state */
  79. protected state: connectionState = connectionState.closed
  80. /** message offset of tcp data */
  81. protected messageCurrentOffset: number = 0
  82. /** message length of tcp data */
  83. protected messageCurrentLength: number = 0
  84. /** message buffer of tcp data */
  85. protected messageData: Buffer | null = null
  86. /** list of subscriptions */
  87. protected subscriptionList: Map<string, Subscription> = new Map()
  88. /** list of persistent subscriptions */
  89. protected persistentSubscriptionList: Map<string, PersistentSubscription> = new Map()
  90. /** indicates if connection close is wanted by user or not */
  91. protected isUnexpectedClosed: boolean = true
  92. /** counter for re-connections */
  93. protected reconnectCount: number = 0
  94.  
  95. /**
  96. *Creates an instance of TCPConnection.
  97. */
  98. public constructor(connectionConfiguration: EventstoreSettings) {
  99. super()
  100. this.initialConfig = {...connectionConfiguration}
  101. this.connectionConfig = connectionConfiguration
  102. this.log = this.connectionConfig.logger.child
  103. ? this.connectionConfig.logger.child({module: 'TCPConnection'})
  104. : this.connectionConfig.logger
  105.  
  106. this.socket = new net.Socket()
  107. }
  108.  
  109. /**
  110. * Returns true if connected to eventstore otherwise false
  111. */
  112. public get isConnected(): boolean {
  113. return this.state === connectionState.connected
  114. }
  115.  
  116. /**
  117. * Called to connect to eventstore
  118. */
  119. public async connect(): Promise<void> {
  120. let connected = false
  121. while (!connected && this.reconnectCount < this.initialConfig.maxReconnections) {
  122. this.state = connectionState.init
  123. this.connectionConfig = await getIpAndPort({...this.initialConfig}, this.log)
  124.  
  125. try {
  126. await this.tryToConnect()
  127. connected = true
  128. } catch (err) {
  129. this.log.error({err, count: this.reconnectCount, fn: 'connect'}, 'Try to connect failed ')
  130. this.reconnectCount++
  131. this.emit('reconnect', this.reconnectCount)
  132. await new Promise(
  133. (resolve): void => {
  134. setTimeout(resolve, this.initialConfig.reconnectionDelay)
  135. }
  136. )
  137. }
  138. }
  139. }
  140.  
  141. /**
  142. * Connect to eventstore
  143. */
  144. protected async tryToConnect(): Promise<void> {
  145. const port = this.connectionConfig.port
  146. const host = this.connectionConfig.host
  147.  
  148. if (port === 0 || host === '') {
  149. throw eventstoreError.newConnectionError('Invalid connection settings on host and port')
  150. }
  151.  
  152. this.log.debug(`Start connecting to ${host}:${port}`)
  153.  
  154. await new Promise(
  155. (resolve, reject): void => {
  156. const errorListener = (err: Error | EventstoreError): void => {
  157. this.state = connectionState.closed
  158. if (err instanceof Error) {
  159. this.onError(eventstoreError.newConnectionError(err.message, err))
  160. } else {
  161. this.onError(err)
  162. }
  163.  
  164. reject(err)
  165. }
  166.  
  167. const successListener = (): void => {
  168. if (this.socket instanceof tls.TLSSocket) {
  169. if (!this.socket.authorized) {
  170. this.log.warn({err: this.socket.authorizationError}, 'SSL authorization warning')
  171. }
  172. }
  173. this.socket.removeListener('error', errorListener)
  174. this.socket.on('error', this.onError.bind(this))
  175. this.onConnect()
  176. resolve()
  177. }
  178.  
  179. if (this.connectionConfig.useSSL) {
  180. let secureContext
  181. if (this.connectionConfig.secureContext) {
  182. try {
  183. secureContext = tls.createSecureContext(this.connectionConfig.secureContext)
  184. } catch (err) {
  185. const conErr = eventstoreError.newConnectionError(
  186. 'Error creating secure context',
  187. err
  188. )
  189. reject(conErr)
  190. }
  191. }
  192.  
  193. const options = {
  194. port,
  195. host,
  196. servername: host,
  197. requestCert: this.connectionConfig.validateServer,
  198. rejectUnauthorized: this.connectionConfig.validateServer,
  199. timeout: this.connectionConfig.connectTimeout,
  200. secureContext
  201. }
  202.  
  203. this.socket = tls.connect(options, successListener)
  204. } else {
  205. const options = {
  206. port,
  207. host,
  208. servername: host,
  209. timeout: this.connectionConfig.connectTimeout
  210. }
  211. this.socket = net.connect(options, successListener)
  212. }
  213.  
  214. this.socket.once('error', errorListener.bind(this))
  215. this.socket.on('close', this.onClose.bind(this))
  216. this.socket.on('data', this.onData.bind(this))
  217. this.socket.on('secureConnect', this.onSecureConnect.bind(this))
  218. }
  219. )
  220. }
  221.  
  222. /**
  223. * Disconnect from eventstore.
  224. * It tries to drain pending queue to prevent data loose before connection gets closed
  225. * If disconnect() is call no new outgoing requests accepted
  226. */
  227. public async disconnect(): Promise<void> {
  228. if (!this.isConnected) {
  229. return
  230. }
  231. this.isUnexpectedClosed = false
  232. await new Promise(
  233. (resolve): void => {
  234. this.onDrain()
  235. if (this.pendingRequests.size <= 0) {
  236. this.state = connectionState.closed
  237. this.socket.destroy()
  238. resolve()
  239. } else {
  240. this.log.debug(
  241. {
  242. pendingRequests: this.pendingRequests.size,
  243. timeout:
  244. this.initialConfig.operationTimeout + this.initialConfig.operationTimeoutCheckPeriod
  245. },
  246. 'Wait for pending requests'
  247. )
  248. // wait for pending requests/timeouts
  249. setTimeout((): void => {
  250. this.state = connectionState.closed
  251. this.socket.destroy()
  252. this.log.debug('Timeout finished')
  253. this.pendingRequests.forEach(
  254. (value, id): void => {
  255. this.rejectCommandPromise(
  256. id,
  257. eventstoreError.newConnectionError('Connection closed')
  258. )
  259. }
  260. )
  261. resolve()
  262. }, this.initialConfig.operationTimeout + this.initialConfig.operationTimeoutCheckPeriod)
  263. }
  264. }
  265. )
  266. }
  267.  
  268. /**
  269. * Called by interval function to check if there are some pending requests which should be rejected with time out error
  270. */
  271. protected checkTimeout(): void {
  272. this.log.trace('Check timeout queue')
  273. const timeout: string[] = []
  274. const now = Date.now() - this.initialConfig.operationTimeout
  275. for (var [key, value] of this.pendingRequests) {
  276. if (value.sendTime < now) {
  277. timeout.push(key)
  278. }
  279. }
  280. for (let x = 0, xMax = timeout.length; x < xMax; x++) {
  281. try {
  282. this.rejectCommandPromise(
  283. timeout[x],
  284. eventstoreError.newTimeoutError('Timeout by eventstore-ts-client')
  285. )
  286. } catch (err) {
  287. this.log.error({err, fn: 'checkTimeout'}, 'Error on rejectCommandPromise')
  288. }
  289. }
  290. }
  291.  
  292. /**
  293. * Creates and sends raw data message to eventstore and adds given promise to pending queue
  294. */
  295. public sendCommand(
  296. correlationId: string,
  297. command: EventstoreCommand,
  298. data: Buffer | null = null,
  299. credentials: UserCredentials | null = null,
  300. promise: {resolve: Function; reject: Function} | null = null
  301. ): void {
  302. this.log.trace(`Sending ${EventstoreCommand[command]} with ${correlationId}`)
  303. if (
  304. this.state !== connectionState.connected &&
  305. command !== EventstoreCommand.HeartbeatResponseCommand &&
  306. command !== EventstoreCommand.Pong
  307. ) {
  308. throw eventstoreError.newConnectionError(
  309. 'Connection to eventstore is: ' + connectionState[this.state]
  310. )
  311. }
  312.  
  313. if (promise) {
  314. if (this.pendingRequests.size >= this.connectionConfig.maxQueueSize) {
  315. promise.reject(eventstoreError.newConnectionError('Maximum concurrent items reached'))
  316. throw eventstoreError.newConnectionError('Maximum concurrent items reached')
  317. }
  318. this.pendingRequests.set(correlationId, {...promise, sendTime: Date.now()})
  319. }
  320.  
  321. try {
  322. let authLength = 0
  323. let flags = FLAGS_NONE
  324. if (credentials) {
  325. flags = FLAGS_AUTH
  326. authLength = 1 + credentials.username.length + 1 + credentials.password.length
  327. }
  328.  
  329. let commandLength = HEADER_LENGTH + authLength
  330. if (data) {
  331. commandLength += data.length
  332. }
  333. const packetLength = 4 + commandLength
  334.  
  335. const buf = Buffer.alloc(packetLength)
  336. buf.writeUInt32LE(commandLength, 0)
  337. buf[COMMAND_OFFSET] = command
  338. buf[FLAGS_OFFSET] = flags
  339.  
  340. uuidToBuffer(correlationId).copy(buf, CORRELATION_ID_OFFSET, 0, GUID_LENGTH)
  341. if (credentials) {
  342. buf.writeUInt8(credentials.username.length, DATA_OFFSET)
  343. buf.write(credentials.username, DATA_OFFSET + 1)
  344. buf.writeUInt8(credentials.password.length, DATA_OFFSET + 1 + credentials.username.length)
  345. buf.write(credentials.password, DATA_OFFSET + 1 + credentials.username.length + 1)
  346. }
  347.  
  348. if (data) {
  349. data.copy(buf, DATA_OFFSET + authLength, 0, data.length)
  350. }
  351.  
  352. this.socket.write(buf)
  353. } catch (err) {
  354. const newErr = eventstoreError.newConnectionError(err.message, err)
  355. this.rejectCommandPromise(correlationId, newErr)
  356. this.onError(newErr)
  357. }
  358. }
  359.  
  360. /**
  361. * Gets called as soon as new data over tcp connection arrives as raw buffer data
  362. * Checks if
  363. * - new received data is part of previously received data
  364. * - new data contains multiple responses
  365. * - new data is single response
  366. */
  367. protected handleNewResponseData(data: Buffer): Buffer | null {
  368. const commandLength = data.readUInt32LE(0)
  369. if (commandLength < HEADER_LENGTH) {
  370. this.log.error(
  371. {
  372. connectionId: this.connectionId,
  373. fn: 'handleNewResponseData'
  374. },
  375. 'Invalid command length of ' + commandLength + ' bytes'
  376. )
  377. throw eventstoreError.newProtocolError('Invalid command length')
  378. }
  379.  
  380. const messageLength = UINT32_LENGTH + commandLength
  381. if (data.length === messageLength) {
  382. // A single packet message, no need to copy into another buffer
  383. this.handleSingleResponseData(data)
  384. return null
  385. } else if (data.length > messageLength) {
  386. // Multiple messages in one packet
  387. const firstMessage = data.slice(0, messageLength)
  388. this.messageCurrentLength = messageLength
  389. this.handleSingleResponseData(firstMessage)
  390. return data.slice(this.messageCurrentLength)
  391. } else {
  392. // The first packet of a multi-packet message
  393. this.messageData = Buffer.alloc(messageLength)
  394. const packetLength = data.copy(this.messageData, this.messageCurrentOffset, 0)
  395. this.messageCurrentOffset = packetLength
  396. return null
  397. }
  398. }
  399.  
  400. /**
  401. * This function handles raw buffer responses received within multiple tcp data package
  402. */
  403. protected handleMultiPacketResponseData(data: Buffer): Buffer | null {
  404. this.log.trace({fn: 'handleMultiPacketResponseData'}, `MultipacketResponse`)
  405. if (this.messageData === null) {
  406. return null
  407. }
  408. const packetLength = data.copy(this.messageData, this.messageCurrentOffset, 0)
  409. this.messageCurrentOffset += packetLength
  410. if (this.messageCurrentOffset >= this.messageData.length) {
  411. this.handleSingleResponseData(this.messageData)
  412. this.messageData = null
  413. this.messageCurrentOffset = 0
  414. }
  415. return null
  416. }
  417.  
  418. /**
  419. * This function handles a single raw buffer response
  420. */
  421. protected handleSingleResponseData(data: Buffer): void {
  422. const commandLength = data.readUInt32LE(0)
  423. if (commandLength < HEADER_LENGTH) {
  424. this.log.error(
  425. {
  426. connectionId: this.connectionId,
  427. fn: 'handleSingleResponseData'
  428. },
  429. 'Invalid command length of ' + commandLength + ' bytes'
  430. )
  431. throw eventstoreError.newProtocolError('Invalid command length')
  432. }
  433.  
  434. const command = data[COMMAND_OFFSET]
  435.  
  436. const correlationId = uuidFromBuffer(
  437. data.slice(CORRELATION_ID_OFFSET, CORRELATION_ID_OFFSET + GUID_LENGTH)
  438. )
  439.  
  440. this.log.trace('Incoming response: ' + EventstoreCommand[command])
  441.  
  442. //Answer Heartbeat directly without adding to promise queue
  443. if (command === EventstoreCommand.HeartbeatRequestCommand) {
  444. this.emit('heartbeat')
  445. this.sendCommand(correlationId, EventstoreCommand.HeartbeatResponseCommand)
  446. return
  447. }
  448.  
  449. //Answer Ping directly without adding to promise queue
  450. if (command === EventstoreCommand.Ping) {
  451. this.sendCommand(correlationId, EventstoreCommand.Pong)
  452. return
  453. }
  454.  
  455. const payloadLength = commandLength - HEADER_LENGTH
  456. const payload = Buffer.alloc(payloadLength)
  457. if (payloadLength > 0) {
  458. data.copy(payload, 0, DATA_OFFSET, DATA_OFFSET + payloadLength)
  459. }
  460.  
  461. let err: eventstoreError.EventstoreError
  462. switch (command) {
  463. case EventstoreCommand.BadRequest:
  464. err = eventstoreError.newBadRequestError()
  465. this.rejectCommandPromise(correlationId, err)
  466. this.onError(err)
  467. break
  468. case EventstoreCommand.NotAuthenticated:
  469. err = eventstoreError.newNotAuthenticatedError()
  470. this.rejectCommandPromise(correlationId, err)
  471. this.onError(err)
  472. break
  473. case EventstoreCommand.NotHandled:
  474. const notHandled = protobuf.NotHandled.decode(payload)
  475. err = eventstoreError.newNotHandledError(`
  476. ${protobuf.NotHandled.NotHandledReason[notHandled.reason]}
  477. `)
  478. this.rejectCommandPromise(correlationId, err)
  479. this.onError(err)
  480. break
  481. case EventstoreCommand.CreatePersistentSubscriptionCompleted:
  482. this.handleCreatePersistentSubscriptionCompleted(correlationId, payload)
  483. break
  484. case EventstoreCommand.DeletePersistentSubscriptionCompleted:
  485. this.handleDeletePersistentSubscriptionCompleted(correlationId, payload)
  486. break
  487. case EventstoreCommand.DeleteStreamCompleted:
  488. this.handleDeleteStreamCompleted(correlationId, payload)
  489. break
  490. case EventstoreCommand.PersistentSubscriptionConfirmation:
  491. this.handlePersistentSubscriptionConfirmation(correlationId, payload)
  492. break
  493. case EventstoreCommand.PersistentSubscriptionStreamEventAppeared:
  494. this.handlePersistentSubscriptionStreamEventAppeared(correlationId, payload)
  495. break
  496.  
  497. case EventstoreCommand.ReadAllEventsBackwardCompleted:
  498. this.handleReadAllEventsCompleted(correlationId, payload)
  499. break
  500. case EventstoreCommand.ReadAllEventsForwardCompleted:
  501. this.handleReadAllEventsCompleted(correlationId, payload)
  502. break
  503.  
  504. case EventstoreCommand.ReadEventCompleted:
  505. this.handleReadEventCompleted(correlationId, payload)
  506. break
  507.  
  508. case EventstoreCommand.ReadStreamEventsBackwardCompleted:
  509. this.handleReadStreamEventsCompleted(correlationId, payload)
  510. break
  511. case EventstoreCommand.ReadStreamEventsForwardCompleted:
  512. this.handleReadStreamEventsCompleted(correlationId, payload)
  513. break
  514. case EventstoreCommand.StreamEventAppeared:
  515. this.handleStreamEventAppeared(correlationId, payload)
  516. break
  517. case EventstoreCommand.SubscriptionConfirmation:
  518. this.handleSubscriptionConfirmation(correlationId, payload)
  519. break
  520. case EventstoreCommand.SubscriptionDropped:
  521. this.handleSubscriptionDropped(correlationId, payload)
  522. break
  523. case EventstoreCommand.TransactionCommitCompleted:
  524. this.handleTransactionCommitCompleted(correlationId, payload)
  525. break
  526. case EventstoreCommand.TransactionStartCompleted:
  527. this.handleTransactionStartCompleted(correlationId, payload)
  528. break
  529. case EventstoreCommand.TransactionWriteCompleted:
  530. this.handleTransactionWriteCompleted(correlationId, payload)
  531. break
  532. case EventstoreCommand.UpdatePersistentSubscriptionCompleted:
  533. this.handleUpdatePersistentSubscriptionCompleted(correlationId, payload)
  534. break
  535. case EventstoreCommand.WriteEventsCompleted:
  536. this.handleWriteEventsCompleted(correlationId, payload)
  537. break
  538. case EventstoreCommand.ClientIdentified:
  539. this.resolveCommandPromise(correlationId)
  540. break
  541. case EventstoreCommand.Pong:
  542. this.resolveCommandPromise(correlationId)
  543. break
  544. case EventstoreCommand.Authenticated:
  545. this.resolveCommandPromise(correlationId)
  546. break
  547. default:
  548. err = new eventstoreError.EventstoreError(
  549. 'Unhandled eventstore command : ' + EventstoreCommand[command] + ' -> ' + command,
  550. 'EventstoreImplementationError'
  551. )
  552. this.rejectCommandPromise(correlationId, err)
  553. this.onError(err)
  554. break
  555. }
  556. }
  557.  
  558. /**
  559. * Handle response for command CreatePersistentSubscription
  560. */
  561. protected handleCreatePersistentSubscriptionCompleted(
  562. correlationId: string,
  563. payload: Buffer
  564. ): void {
  565. const decoded = protobuf.CreatePersistentSubscriptionCompleted.decode(payload)
  566. if (
  567. decoded.result ===
  568. protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Success
  569. ) {
  570. this.resolveCommandPromise(correlationId)
  571. } else {
  572. const errorMsg =
  573. `${
  574. protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult[
  575. decoded.result
  576. ]
  577. } ` + (decoded.reason || '')
  578. let err
  579. switch (decoded.result) {
  580. case protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult
  581. .AccessDenied:
  582. err = eventstoreError.newAccessDeniedError(errorMsg)
  583. break
  584. case protobuf.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult
  585. .AlreadyExists:
  586. err = eventstoreError.newAlreadyExistError(errorMsg)
  587. break
  588. default:
  589. err = eventstoreError.newUnspecificError(errorMsg)
  590. }
  591. this.rejectCommandPromise(correlationId, err)
  592. }
  593. }
  594.  
  595. /**
  596. * Handle response for command DeletePersistentSubscription
  597. */
  598. protected handleDeletePersistentSubscriptionCompleted(
  599. correlationId: string,
  600. payload: Buffer
  601. ): void {
  602. const status = protobuf.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult
  603. const decoded = protobuf.DeletePersistentSubscriptionCompleted.decode(payload)
  604. if (decoded.result === status.Success) {
  605. this.resolveCommandPromise(correlationId)
  606. } else {
  607. let returnError
  608. switch (decoded.result) {
  609. case status.AccessDenied:
  610. returnError = eventstoreError.newAccessDeniedError(
  611. 'Delete of Subscription not allowed: ' + decoded.reason || ''
  612. )
  613. break
  614. case status.DoesNotExist:
  615. returnError = eventstoreError.newDoesNotExistError(
  616. 'Persistent subscription does not exist: ' + decoded.reason || ''
  617. )
  618. break
  619. default:
  620. returnError = eventstoreError.newUnspecificError(
  621. 'Delete persistent connection failed: ' + (decoded.reason || '')
  622. )
  623. }
  624. this.rejectCommandPromise(correlationId, returnError)
  625. }
  626. }
  627.  
  628. /**
  629. * Handle response for command DeleteStreamCompleted
  630. */
  631. protected handleDeleteStreamCompleted(correlationId: string, payload: Buffer): void {
  632. const decoded = protobuf.DeleteStreamCompleted.decode(payload)
  633. if (
  634. this.checkOperationResult(
  635. correlationId,
  636. decoded.result,
  637. 'handleDeleteStream: ' + decoded.message
  638. )
  639. ) {
  640. this.resolveCommandPromise(
  641. correlationId,
  642. new Position(decoded.commitPosition, decoded.preparePosition)
  643. )
  644. }
  645. }
  646.  
  647. /**
  648. * Handle response for command ReadAllEvents
  649. */
  650. protected handleReadAllEventsCompleted(correlationId: string, payload: Buffer): void {
  651. const decoded = protobuf.ReadAllEventsCompleted.decode(payload)
  652. let err: eventstoreError.EventstoreError
  653. const message: string = decoded.error || ''
  654. switch (decoded.result) {
  655. case protobuf.ReadAllEventsCompleted.ReadAllResult.Success:
  656. this.resolveCommandPromise(correlationId, decoded)
  657. return
  658. case protobuf.ReadAllEventsCompleted.ReadAllResult.AccessDenied:
  659. err = eventstoreError.newAccessDeniedError(message)
  660. break
  661. case protobuf.ReadAllEventsCompleted.ReadAllResult.NotModified:
  662. err = eventstoreError.newNotModifiedError(message)
  663. break
  664. default:
  665. err = eventstoreError.newUnspecificError(message)
  666. }
  667. this.rejectCommandPromise(correlationId, err)
  668. }
  669.  
  670. /**
  671. * Handle response for command ReadStreamEvents
  672. */
  673. protected handleReadStreamEventsCompleted(correlationId: string, payload: Buffer): void {
  674. const decoded = protobuf.ReadStreamEventsCompleted.decode(payload)
  675. let err: eventstoreError.EventstoreError
  676. const message: string = decoded.error || ''
  677. switch (decoded.result) {
  678. case protobuf.ReadStreamEventsCompleted.ReadStreamResult.Success:
  679. this.resolveCommandPromise(correlationId, decoded)
  680. return
  681. case protobuf.ReadStreamEventsCompleted.ReadStreamResult.NoStream:
  682. err = eventstoreError.newNoStreamError(message)
  683. break
  684. case protobuf.ReadStreamEventsCompleted.ReadStreamResult.NotModified:
  685. err = eventstoreError.newNotModifiedError(message)
  686. break
  687. case protobuf.ReadStreamEventsCompleted.ReadStreamResult.StreamDeleted:
  688. err = eventstoreError.newStreamDeletedError(message)
  689. break
  690. case protobuf.ReadStreamEventsCompleted.ReadStreamResult.AccessDenied:
  691. err = eventstoreError.newAccessDeniedError(message)
  692. break
  693.  
  694. default:
  695. err = eventstoreError.newUnspecificError(message)
  696. }
  697. this.rejectCommandPromise(correlationId, err)
  698. }
  699.  
  700. /**
  701. * Handle response for command ReadEvent
  702. */
  703. protected handleReadEventCompleted(correlationId: string, payload: Buffer): void {
  704. const decoded = protobuf.ReadEventCompleted.decode(payload)
  705.  
  706. let err: eventstoreError.EventstoreError
  707. const message: string = decoded.error || ''
  708. switch (decoded.result) {
  709. case protobuf.ReadEventCompleted.ReadEventResult.Success:
  710. this.resolveCommandPromise(correlationId, decoded.event)
  711. return
  712. case protobuf.ReadEventCompleted.ReadEventResult.NotFound:
  713. err = eventstoreError.newNotFoundError(message)
  714. break
  715. case protobuf.ReadEventCompleted.ReadEventResult.NoStream:
  716. err = eventstoreError.newNoStreamError(message)
  717. break
  718. case protobuf.ReadEventCompleted.ReadEventResult.StreamDeleted:
  719. err = eventstoreError.newStreamDeletedError(message)
  720. break
  721. case protobuf.ReadEventCompleted.ReadEventResult.AccessDenied:
  722. err = eventstoreError.newAccessDeniedError(message)
  723. break
  724. default:
  725. err = eventstoreError.newUnspecificError(message)
  726. }
  727. this.rejectCommandPromise(correlationId, err)
  728. }
  729.  
  730. /**
  731. * Handle incoming event for subscription
  732. */
  733. protected handleStreamEventAppeared(correlationId: string, payload: Buffer): void {
  734. const decoded = protobuf.StreamEventAppeared.decode(payload)
  735. const subscription = this.subscriptionList.get(correlationId)
  736. if (subscription) {
  737. const event = Event.fromRaw(decoded.event.event || decoded.event.link)
  738. subscription.eventAppeared(
  739. event,
  740. new Position(decoded.event.commitPosition, decoded.event.preparePosition)
  741. )
  742. } else {
  743. this.log.error(
  744. {subscriptionId: correlationId, fn: 'handleStreamEventAppeared'},
  745. 'Received StreamEventAppeared for unknown id'
  746. )
  747. this.emit(
  748. 'error',
  749. eventstoreError.newImplementationError(
  750. `Received StreamEventAppeared for unknown id ${correlationId}`
  751. )
  752. )
  753. }
  754. }
  755.  
  756. /**
  757. * Handle response for command Subscription
  758. */
  759. protected handleSubscriptionConfirmation(correlationId: string, payload: Buffer): void {
  760. const decoded = protobuf.SubscriptionConfirmation.decode(payload)
  761.  
  762. this.resolveCommandPromise(correlationId, {
  763. subscriptionId: correlationId,
  764. lastCommitPosition: decoded.lastCommitPosition,
  765. lastEventNumber: decoded.lastEventNumber
  766. })
  767. }
  768.  
  769. /**
  770. * Handle subscription drop
  771. */
  772. protected handleSubscriptionDropped(correlationId: string, payload: Buffer): void {
  773. const decoded = protobuf.SubscriptionDropped.decode(payload)
  774. const subscription = this.subscriptionList.get(correlationId) || null
  775. if (subscription) {
  776. subscription.emit('dropped', SubscriptionDropReason[decoded.reason])
  777. }
  778. const persistentSubscription = this.persistentSubscriptionList.get(correlationId) || null
  779. if (persistentSubscription) {
  780. persistentSubscription.emit('dropped', SubscriptionDropReason[decoded.reason])
  781. }
  782. if (this.pendingRequests.has(correlationId)) {
  783. if (decoded.reason === SubscriptionDropReason.Unsubscribed) {
  784. this.resolveCommandPromise(correlationId, SubscriptionDropReason[decoded.reason])
  785. } else {
  786. this.rejectCommandPromise(
  787. correlationId,
  788. eventstoreError.newUnspecificError(
  789. 'Subscription dropped: ' + SubscriptionDropReason[decoded.reason]
  790. )
  791. )
  792. }
  793. }
  794. }
  795.  
  796. /**
  797. * Handle response for command TransactionCommit
  798. */
  799. protected handleTransactionCommitCompleted(correlationId: string, payload: Buffer): void {
  800. const decoded = protobuf.TransactionCommitCompleted.decode(payload)
  801. if (
  802. this.checkOperationResult(
  803. correlationId,
  804. decoded.result,
  805. 'handleTransactionCommit: ' + decoded.message
  806. )
  807. ) {
  808. const result: WriteResult = {
  809. firstEventNumber: decoded.firstEventNumber,
  810. lastEventNumber: decoded.lastEventNumber,
  811. position: new Position(decoded.commitPosition, decoded.preparePosition)
  812. }
  813. this.resolveCommandPromise(correlationId, result)
  814. }
  815. }
  816.  
  817. /**
  818. * Handle response for command TransactionStart
  819. */
  820. protected handleTransactionStartCompleted(correlationId: string, payload: Buffer): void {
  821. const decoded = protobuf.TransactionStartCompleted.decode(payload)
  822. if (
  823. this.checkOperationResult(
  824. correlationId,
  825. decoded.result,
  826. 'handleTransactionStart: ' + decoded.message
  827. )
  828. ) {
  829. this.resolveCommandPromise(correlationId, decoded.transactionId)
  830. }
  831. }
  832.  
  833. /**
  834. * Handles transaction write completed
  835. */
  836. protected handleTransactionWriteCompleted(correlationId: string, payload: Buffer): void {
  837. const decoded = protobuf.TransactionWriteCompleted.decode(payload)
  838. if (
  839. this.checkOperationResult(
  840. correlationId,
  841. decoded.result,
  842. 'handleTransactionWrite: ' + decoded.message
  843. )
  844. ) {
  845. this.resolveCommandPromise(correlationId, decoded.transactionId)
  846. }
  847. }
  848.  
  849. /**
  850. * Handles update persistent subscription completed
  851. */
  852. protected handleUpdatePersistentSubscriptionCompleted(
  853. correlationId: string,
  854. payload: Buffer
  855. ): void {
  856. const decoded = protobuf.UpdatePersistentSubscriptionCompleted.decode(payload)
  857. const status = protobuf.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult
  858. const message = decoded.reason || ''
  859. switch (decoded.result) {
  860. case status.Success:
  861. this.resolveCommandPromise(correlationId)
  862. break
  863. case status.DoesNotExist:
  864. this.rejectCommandPromise(correlationId, eventstoreError.newDoesNotExistError(message))
  865. break
  866. case status.AccessDenied:
  867. this.rejectCommandPromise(correlationId, eventstoreError.newAccessDeniedError(message))
  868. break
  869. default:
  870. }
  871. }
  872.  
  873. /**
  874. * Handles write events completed
  875. */
  876. protected handleWriteEventsCompleted(correlationId: string, payload: Buffer): void {
  877. const decoded = protobuf.WriteEventsCompleted.decode(payload)
  878. if (
  879. this.checkOperationResult(
  880. correlationId,
  881. decoded.result,
  882. 'handleWriteEvents: ' + decoded.message
  883. )
  884. ) {
  885. this.resolveCommandPromise(correlationId, decoded)
  886. }
  887. }
  888.  
  889. /**
  890. * Handles persistent subscription confirmation
  891. */
  892. protected handlePersistentSubscriptionConfirmation(correlationId: string, payload: Buffer): void {
  893. const decoded = protobuf.PersistentSubscriptionConfirmation.decode(payload)
  894. this.resolveCommandPromise(correlationId, decoded)
  895. }
  896.  
  897. /**
  898. * Handles persistent subscription stream event appeared
  899. */
  900. protected handlePersistentSubscriptionStreamEventAppeared(
  901. correlationId: string,
  902. payload: Buffer
  903. ): void {
  904. const decoded = protobuf.PersistentSubscriptionStreamEventAppeared.decode(payload)
  905. const subscription = this.persistentSubscriptionList.get(correlationId)
  906. if (subscription) {
  907. const event = Event.fromRaw(decoded.event.event || decoded.event.link)
  908. subscription.eventAppeared(event)
  909. } else {
  910. this.log.error(
  911. {
  912. subscriptionId: correlationId,
  913. persistentSubscriptionList: this.persistentSubscriptionList,
  914. fn: 'handlePersistentSubscriptionStreamEventAppeared'
  915. },
  916. 'Received PersistentSubscriptionStreamEventAppeared for unknown id'
  917. )
  918. this.emit(
  919. 'error',
  920. eventstoreError.newImplementationError(
  921. `Received PersistentSubscriptionStreamEventAppeared for unknown id ${correlationId}`
  922. )
  923. )
  924. }
  925. }
  926.  
  927. /**
  928. * CHecks if given result is an error code
  929. * It returns true for successful result otherwise it returns false.
  930. * If result is an error this function rejects corresponding command promise and remove it from command queue
  931. */
  932. protected checkOperationResult(
  933. correlationId: string,
  934. result: number,
  935. message: string = ''
  936. ): boolean {
  937. let err: eventstoreError.EventstoreError
  938. switch (result) {
  939. case protobuf.OperationResult.Success:
  940. return true
  941. case protobuf.OperationResult.AccessDenied:
  942. err = eventstoreError.newAccessDeniedError(message)
  943. break
  944. case protobuf.OperationResult.CommitTimeout:
  945. err = eventstoreError.newCommitTimeoutError(message)
  946. break
  947. case protobuf.OperationResult.ForwardTimeout:
  948. err = eventstoreError.newForwardTimeoutError(message)
  949. break
  950. case protobuf.OperationResult.InvalidTransaction:
  951. err = eventstoreError.newInvalidTransactionError(message)
  952. break
  953. case protobuf.OperationResult.PrepareTimeout:
  954. err = eventstoreError.newPrepareTimeoutError(message)
  955. break
  956. case protobuf.OperationResult.StreamDeleted:
  957. err = eventstoreError.newStreamDeletedError(message)
  958. break
  959. case protobuf.OperationResult.WrongExpectedVersion:
  960. err = eventstoreError.newWrongExpectedVersionError(message)
  961. break
  962. default:
  963. err = eventstoreError.newUnspecificError('Invalid operation result')
  964. break
  965. }
  966. this.rejectCommandPromise(correlationId, err)
  967. return false
  968. }
  969.  
  970. /**
  971. * Will be called if a command send to eventstore was replied with an error
  972. * In this case corresponding promise will be rejected and removed from queue
  973. */
  974. protected rejectCommandPromise(
  975. correlationId: string,
  976. error: eventstoreError.EventstoreError
  977. ): void {
  978. const resultPromise = this.pendingRequests.get(correlationId)
  979. if (resultPromise) {
  980. resultPromise.reject(error)
  981. this.pendingRequests.delete(correlationId)
  982. } else {
  983. const err = eventstoreError.newImplementationError(
  984. `Could not find correlationId ${correlationId} on rejectCommandPromise`
  985. )
  986. this.onError(err)
  987. }
  988. }
  989.  
  990. /**
  991. * Will be called if a command send to eventstore was replied with success response
  992. * In this case corresponding promise will be resolved with result received from eventstore
  993. */
  994. protected resolveCommandPromise<T>(correlationId: string, result: null | T = null): void {
  995. const resultPromise = this.pendingRequests.get(correlationId)
  996. if (resultPromise) {
  997. resultPromise.resolve(result)
  998. this.pendingRequests.delete(correlationId)
  999. } else {
  1000. const err = eventstoreError.newImplementationError(
  1001. `Could not find correlationId ${correlationId} on resolveCommandPromise`
  1002. )
  1003. this.onError(err)
  1004. }
  1005. }
  1006.  
  1007. /**
  1008. * Subscribes to stream
  1009. */
  1010. public subscribeToStream(
  1011. stream: Stream,
  1012. resolveLinkTos: boolean = true,
  1013. credentials: UserCredentials | null
  1014. ): Promise<Subscription> {
  1015. const newSubscription = new Subscription(uuid(), this, stream, resolveLinkTos, credentials)
  1016. this.subscriptionList.set(newSubscription.id, newSubscription)
  1017. return new Promise(
  1018. (resolve, reject): void => {
  1019. const resolveFunction = (): void => {
  1020. newSubscription.isSubscribed = true
  1021. newSubscription.emit('subscribed')
  1022. resolve(newSubscription)
  1023. }
  1024. const raw = protobuf.SubscribeToStream.fromObject({
  1025. eventStreamId: stream.id,
  1026. resolveLinkTos
  1027. })
  1028. this.sendCommand(
  1029. newSubscription.id,
  1030. EventstoreCommand.SubscribeToStream,
  1031. Buffer.from(protobuf.SubscribeToStream.encode(raw).finish()),
  1032. credentials,
  1033. {
  1034. resolve: resolveFunction,
  1035. reject
  1036. }
  1037. )
  1038. }
  1039. )
  1040. }
  1041.  
  1042. /**
  1043. * Unsubscribes from stream
  1044. */
  1045. public async unsubscribeFromStream(subscriptionId: string): Promise<void> {
  1046. const subscription = this.subscriptionList.get(subscriptionId)
  1047. if (!subscription) {
  1048. throw eventstoreError.newImplementationError(
  1049. `Can not unsubscribe - subscription ${subscriptionId} not found`
  1050. )
  1051. }
  1052. const subscriptionList = this.subscriptionList
  1053. await new Promise(
  1054. (resolve, reject): void => {
  1055. const resolveFunction = (): void => {
  1056. subscription.isSubscribed = false
  1057. subscriptionList.delete(subscriptionId)
  1058. resolve()
  1059. }
  1060. this.sendCommand(
  1061. subscription.id,
  1062. EventstoreCommand.UnsubscribeFromStream,
  1063. null,
  1064. subscription.getCredentials,
  1065. {
  1066. resolve: resolveFunction,
  1067. reject
  1068. }
  1069. )
  1070. }
  1071. )
  1072. }
  1073.  
  1074. /**
  1075. * Connects to persistent subscription
  1076. */
  1077. public async connectToPersistentSubscription(
  1078. subscription: PersistentSubscription,
  1079. allowedInFlightMessages: number = 10,
  1080. credentials?: UserCredentials | null
  1081. ): Promise<model.eventstore.proto.PersistentSubscriptionConfirmation> {
  1082. this.persistentSubscriptionList.set(subscription.id, subscription)
  1083. const result: model.eventstore.proto.PersistentSubscriptionConfirmation = await new Promise(
  1084. (resolve, reject): void => {
  1085. const raw = protobuf.ConnectToPersistentSubscription.fromObject({
  1086. subscriptionId: subscription.subscriptionGroupName,
  1087. eventStreamId: subscription.stream.id,
  1088. allowedInFlightMessages
  1089. })
  1090. this.sendCommand(
  1091. subscription.id,
  1092. EventstoreCommand.ConnectToPersistentSubscription,
  1093. Buffer.from(protobuf.ConnectToPersistentSubscription.encode(raw).finish()),
  1094. credentials,
  1095. {
  1096. resolve,
  1097. reject
  1098. }
  1099. )
  1100. }
  1101. )
  1102. subscription.emit('subscribed')
  1103. subscription.lastCommitPosition = result.lastCommitPosition
  1104. ? Long.fromValue(result.lastCommitPosition)
  1105. : Long.fromValue(0)
  1106. subscription.lastEventNumber = result.lastEventNumber
  1107. ? Long.fromValue(result.lastEventNumber)
  1108. : Long.fromValue(-1)
  1109.  
  1110. return result
  1111. }
  1112.  
  1113. /**
  1114. * Stop listening on persistent subscription
  1115. */
  1116. public async unsubscribeFromPersistentSubscription(
  1117. subscriptionId: string,
  1118. credentials?: UserCredentials | null
  1119. ): Promise<void> {
  1120. const subscription = this.persistentSubscriptionList.get(subscriptionId)
  1121. if (!subscription) {
  1122. throw eventstoreError.newImplementationError(
  1123. `Can not unsubscribe - persistent subscription ${subscriptionId} not found`
  1124. )
  1125. }
  1126. const subscriptionList = this.persistentSubscriptionList
  1127. await new Promise(
  1128. (resolve, reject): void => {
  1129. const resolveFunction = (): void => {
  1130. subscription.state = SubscriptionStatus.disconnected
  1131. subscriptionList.delete(subscriptionId)
  1132. resolve()
  1133. }
  1134. this.sendCommand(
  1135. subscription.id,
  1136. EventstoreCommand.UnsubscribeFromStream,
  1137. null,
  1138. credentials,
  1139. {
  1140. resolve: resolveFunction,
  1141. reject
  1142. }
  1143. )
  1144. }
  1145. )
  1146. }
  1147.  
  1148. /**
  1149. * Emit general low level connection errors (communication errors).
  1150. * Will not emit errors on business level
  1151. */
  1152. protected onError(err?: Error): void {
  1153. let errorMessage
  1154. let error = err ? err : eventstoreError.newConnectionError('Eventstore connection error')
  1155.  
  1156. if (error.name === 'Error') {
  1157. error = eventstoreError.newConnectionError(error.message, err)
  1158. }
  1159. errorMessage = error.message
  1160. this.log.error({err: error}, errorMessage)
  1161. this.emit('error', error)
  1162. }
  1163.  
  1164. /**
  1165. * Emit as soon as connection to eventstore was established successfully
  1166. */
  1167. protected onConnect(): void {
  1168. this.reconnectCount = 0
  1169. this.isUnexpectedClosed = true
  1170. this.log.debug('Connected to eventstore')
  1171. this.state = connectionState.connected
  1172. this.emit('connected')
  1173.  
  1174. this.timeoutInterval = setInterval(
  1175. this.checkTimeout.bind(this),
  1176. this.initialConfig.operationTimeoutCheckPeriod
  1177. )
  1178. }
  1179.  
  1180. /**
  1181. * Emitted as soon as data arrives over tcp connection
  1182. */
  1183. protected onData(data: Buffer | null): void {
  1184. while (data != null) {
  1185. if (this.messageData === null) {
  1186. data = this.handleNewResponseData(data)
  1187. } else {
  1188. data = this.handleMultiPacketResponseData(data)
  1189. }
  1190. }
  1191. }
  1192.  
  1193. /**
  1194. * Emit as soon as connection to eventstore is closed
  1195. */
  1196. protected onClose(): void {
  1197. this.log.debug('Connection to eventstore closed')
  1198. this.state = connectionState.closed
  1199. this.emit('close')
  1200. if (this.isUnexpectedClosed) {
  1201. this.emit('error', eventstoreError.newConnectionError('Connection closed unexpected'))
  1202. this.connect()
  1203. }
  1204.  
  1205. // stop timeout interval
  1206. if (this.timeoutInterval) {
  1207. clearInterval(this.timeoutInterval)
  1208. this.timeoutInterval = null
  1209. }
  1210.  
  1211. // reject all pending promises
  1212. this.pendingRequests.forEach(
  1213. (value): void => {
  1214. value.reject(eventstoreError.newConnectionError('Connection closed'))
  1215. }
  1216. )
  1217. this.pendingRequests = new Map()
  1218.  
  1219. //drop all subscriptions
  1220. this.subscriptionList.forEach(
  1221. (subscription): void => {
  1222. subscription.emit('dropped', 'Connection closed')
  1223. }
  1224. )
  1225.  
  1226. //drop all persistent subscriptions
  1227. this.persistentSubscriptionList.forEach(
  1228. (subscription): void => {
  1229. subscription.emit('dropped', 'Connection closed')
  1230. }
  1231. )
  1232. }
  1233.  
  1234. /**
  1235. * Emit when connection starts draining
  1236. */
  1237. protected onDrain(): void {
  1238. this.log.debug('Eventstore connection draining')
  1239. this.state = connectionState.drain
  1240. this.emit('drain')
  1241. }
  1242.  
  1243. /**
  1244. * Emit when connection secured
  1245. */
  1246. protected onSecureConnect(): void {
  1247. this.log.debug('Eventstore connection secured')
  1248. this.emit('secureConnect')
  1249. }
  1250. }