Home Reference Source Test

src/subscription/Subscription.ts

  1. import {EventEmitter} from 'events'
  2. import * as model from '../protobuf/model'
  3. import {UserCredentials, Position} from '../eventstore'
  4. import {Stream} from '../stream'
  5. import {TCPConnection} from '../eventstore/TCPConnection'
  6. import * as bunyan from 'bunyan'
  7. import {Event} from '../event'
  8.  
  9. /**
  10. * Base class for handling subscriptions
  11. * @emits {subscribed}
  12. * @emits {dropped}
  13. * @emits {event}
  14. * @emits {event-eventnametolowercase}
  15. * @emits {error}
  16. */
  17. export class Subscription extends EventEmitter {
  18. /** uuid4 of subscription */
  19. protected subscriptionId: string
  20. /** indicates if subscription is running */
  21. public isSubscribed: boolean = false
  22. /** connection to use */
  23. protected tcpConnection: TCPConnection
  24. /** credentials for subscription */
  25. protected credentials: UserCredentials | null = null
  26. /** instance of corresponding stream */
  27. protected stream: Stream
  28. /** indicates if events should be full resolved */
  29. protected resolveLinkTos: boolean
  30. /** logger */
  31. public log: bunyan
  32. /** global log position */
  33. protected position: Position | null = null
  34.  
  35. /**
  36. * Creates an instance of subscription.
  37. */
  38. public constructor(
  39. subscriptionId: string,
  40. tcpConnection: TCPConnection,
  41. stream: Stream,
  42. resolveLinkTos: boolean,
  43. credentials: UserCredentials | null
  44. ) {
  45. super()
  46. this.subscriptionId = subscriptionId
  47. this.tcpConnection = tcpConnection
  48. this.credentials = credentials
  49. this.stream = stream
  50. this.resolveLinkTos = resolveLinkTos
  51. this.on('subscribed', this.onSubscribed)
  52. this.on('dropped', this.onDropped)
  53. this.on('event', this.onEvent)
  54. this.on('error', this.onError)
  55. this.log = stream.log.child({module: 'Subscription', subscriptionId: this.id})
  56. }
  57.  
  58. /**
  59. * Returns subscriptionId
  60. */
  61. public get id(): string {
  62. return this.subscriptionId
  63. }
  64.  
  65. /**
  66. * Returns subscription nam
  67. */
  68. public get name(): string {
  69. return 'Subscription: ' + this.subscriptionId
  70. }
  71.  
  72. /**
  73. * Gets get credentials
  74. */
  75. public get getCredentials(): UserCredentials | null {
  76. return this.credentials
  77. }
  78.  
  79. /**
  80. * Gets resolve link tos
  81. */
  82. public getResolveLinkTos(): boolean {
  83. return this.resolveLinkTos
  84. }
  85.  
  86. /**
  87. * Unsubscribe from stream
  88. */
  89. public async unsubscribe(): Promise<void> {
  90. this.log.debug({fn: 'unsubscribe'}, 'unsubscribe subscription')
  91. await this.tcpConnection.unsubscribeFromStream(this.id)
  92. }
  93.  
  94. /**
  95. * Called when event from eventstore arrives
  96. */
  97. public eventAppeared(event: Event, position: Position): void {
  98. this.emit('event', event, position)
  99. this.emit(`event-${event.name.toLocaleLowerCase()}`, event, position)
  100. }
  101.  
  102. /**
  103. * Called when subscription was dropped
  104. */
  105. protected onDropped(
  106. reason: model.eventstore.proto.SubscriptionDropped.SubscriptionDropReason
  107. ): void {
  108. this.log.debug({reason}, 'Subscription dropped')
  109. }
  110.  
  111. /**
  112. * Determines whether event on
  113. */
  114. protected onEvent(event: Event, position: Position): void {
  115. this.position = position
  116. this.log.debug({eventName: event.name, eventId: event.id}, 'Event received')
  117. }
  118.  
  119. /**
  120. * Called when subscription is established
  121. */
  122. protected onSubscribed(): void {
  123. this.log.debug(
  124. {subscriptionId: this.subscriptionId, stream: this.stream.id},
  125. 'Subscription started'
  126. )
  127. }
  128.  
  129. /**
  130. * Called when error appears
  131. */
  132. protected onError(err: Error): void {
  133. this.log.error({err, subscriptionId: this.subscriptionId}, 'Error on subscription')
  134. }
  135. }