test/2_integrationTests/5_subscription/persistentSubscription.test.ts
import {Eventstore, Event, SubscriptionStatus, NakAction} from '../../../src'
import * as assert from 'assert'
import {setPersistentSubscriptionConfig} from '../../../src/subscription'
describe('Persistent subscription test', (): void => {
describe('with admin user rights', (): void => {
const es = new Eventstore({
uri: 'discover://admin:changeit@cluster1.escluster.net:2112',
clientId: 'ts-client-test'
})
before(
async (): Promise<void> => {
await es.connect()
}
)
after(
async (): Promise<void> => {
await es.disconnect()
}
)
it('creates a persistent subscription', async (): Promise<void> => {
const stream = es.stream('subscribestream')
try {
await stream
.withCredentials({username: 'admin', password: 'changeit'})
.createPersistentSubscription('persistentsubscription')
} catch (err) {
assert.fail(err)
}
})
it('updates a persistent subscription with object', async (): Promise<void> => {
const stream = es.stream('subscribestream')
try {
const subscription = stream
.withCredentials({username: 'admin', password: 'changeit'})
.getPersistentSubscription('persistentsubscription')
await subscription.update({resolveLinkTos: false})
} catch (err) {
assert.fail(err)
}
})
it('updates a persistent subscription with given subscription config and credentials', async (): Promise<
void
> => {
const stream = es.stream('subscribestream')
try {
const subscription = stream.getPersistentSubscription('persistentsubscription')
await subscription.update(setPersistentSubscriptionConfig({resolveLinkTos: true}), {
username: 'admin',
password: 'changeit'
})
} catch (err) {
assert.fail(err)
}
})
it('throws on creating persistent subscription with same names again', async (): Promise<
void
> => {
const stream = es.stream('subscribestream')
try {
await stream.createPersistentSubscription('persistentsubscription')
assert.fail('has not thrown')
} catch (err) {
assert.strictEqual(err.name, 'EventstoreAlreadyExistError')
}
})
it('deletes a persistent subscription', async (): Promise<void> => {
const stream = es.stream('subscribestream')
try {
const subscription = stream
.withCredentials({username: 'admin', password: 'changeit'})
.getPersistentSubscription('persistentsubscription')
await subscription.delete()
} catch (err) {
assert.fail(err)
}
})
it('throws on delete', async (): Promise<void> => {
const stream = es.stream('subscribestream')
try {
const subscription = stream
.withCredentials({username: 'admin', password: 'changeit'})
.getPersistentSubscription('unknown')
await subscription.delete()
} catch (err) {
assert.strictEqual(err.name, 'EventstoreDoesNotExistError')
}
})
})
describe('without admin user rights', (): void => {
const es = new Eventstore({
uri: 'discover://restrictedUser:restrictedOnlyUserPassword@cluster1.escluster.net:2112',
clientId: 'ts-client-test'
})
before(
async (): Promise<void> => {
await es.connect()
const stream = es
.stream('subscribestream')
.withCredentials({username: 'admin', password: 'changeit'})
await stream.createPersistentSubscription('persistentsubscription2')
}
)
after(
async (): Promise<void> => {
await es.disconnect()
}
)
it('throws on create', async (): Promise<void> => {
const stream = es.stream('subscribestream')
try {
await stream.createPersistentSubscription('persistentsubscription1')
assert.fail('has not thrown')
} catch (err) {
assert.strictEqual(err.name, 'EventstoreAccessDeniedError')
}
})
it('throws on update', async (): Promise<void> => {
const stream = es.stream('subscribestream')
try {
const subscription = stream.getPersistentSubscription('persistentsubscription2')
await subscription.update({resolveLinkTos: false})
} catch (err) {
assert.strictEqual(err.name, 'EventstoreAccessDeniedError')
}
})
it('throws on delete', async (): Promise<void> => {
const stream = es.stream('subscribestream')
try {
const subscription = stream.getPersistentSubscription('persistentsubscription2')
await subscription.delete()
} catch (err) {
assert.strictEqual(err.name, 'EventstoreAccessDeniedError')
}
})
})
describe('Persistent subscription get events', (): void => {
const es = new Eventstore({
uri: 'discover://restrictedUser:restrictedOnlyUserPassword@cluster1.escluster.net:2112',
clientId: 'ts-client-test'
})
before(
async (): Promise<void> => {
await es.connect()
const stream = es.stream('persistentsubscribestream3')
const newEvent = new Event('SomeEvent')
await stream.append(newEvent)
try {
await stream.createPersistentSubscription(
'persistentsubscription3',
{},
{username: 'admin', password: 'changeit'}
)
} catch (err) {
console.log(err)
}
}
)
after(
async (): Promise<void> => {
await es.disconnect()
}
)
it('can start a subscription on none empty stream', async (): Promise<void> => {
const stream = es.stream('persistentsubscribestream3')
const newEvent = new Event('SomeEvent')
await stream.append(newEvent)
let counter = 0
const subscription = stream.getPersistentSubscription('persistentsubscription3')
subscription.on(
'event',
(event): void => {
counter++
subscription.acknowledgeEvent(event)
}
)
await subscription.subscribe(10, {
username: 'restrictedUser',
password: 'restrictedOnlyUserPassword'
})
assert.strictEqual(
subscription.name,
`PersistentSubscription: persistentsubscribestream3 :: persistentsubscription3`
)
assert.strictEqual(subscription.state, SubscriptionStatus.connected)
await new Promise(
async (resolve): Promise<void> => {
await stream.append(new Event('SomeEvent'))
setTimeout(resolve, 1000)
}
)
assert.strictEqual(counter, 3)
await subscription.unsubscribe()
assert.strictEqual(subscription.state, SubscriptionStatus.disconnected)
})
it('can notAck single event', async (): Promise<void> => {
const stream = es.stream('persistentsubscribestream3')
const newEvent = new Event('SomeEvent444')
await stream.append(newEvent)
const subscription = stream.getPersistentSubscription('persistentsubscription3')
let counter = 0
subscription.on(
'event',
(event): void => {
counter++
assert.strictEqual(event.id, newEvent.id)
subscription.notAcknowledgeEvent(event, NakAction.Unknown)
}
)
await subscription.subscribe()
await new Promise(
async (resolve): Promise<void> => {
setTimeout(resolve, 1000)
}
)
await subscription.unsubscribe()
await new Promise(
async (resolve): Promise<void> => {
setTimeout(resolve, 1000)
}
)
assert.strictEqual(counter > 0, true)
})
it('notAck single event with unknown by default', async (): Promise<void> => {
const stream = es.stream('persistentsubscribestream3')
const newEvent = new Event('SomeEvent444')
await stream.append(newEvent)
const subscription = stream.getPersistentSubscription('persistentsubscription3')
let counter = 0
subscription.on(
'event',
(event): void => {
counter++
assert.strictEqual(event.id, newEvent.id)
subscription.notAcknowledgeEvent(event)
}
)
await subscription.subscribe()
await new Promise(
async (resolve): Promise<void> => {
setTimeout(resolve, 1000)
}
)
await subscription.unsubscribe()
await new Promise(
async (resolve): Promise<void> => {
setTimeout(resolve, 1000)
}
)
assert.strictEqual(counter > 0, true)
})
it('notAck events with unknown by default', async (): Promise<void> => {
const stream = es.stream('persistentsubscribestream3')
const newEvent = new Event('SomeEvent444')
await stream.append(newEvent)
const subscription = stream.getPersistentSubscription('persistentsubscription3')
let counter = 0
subscription.on(
'event',
(event): void => {
counter++
assert.strictEqual(event.id, newEvent.id)
subscription.notAcknowledgeEvents([event])
}
)
await subscription.subscribe()
await new Promise(
async (resolve): Promise<void> => {
setTimeout(resolve, 1000)
}
)
await subscription.unsubscribe()
await new Promise(
async (resolve): Promise<void> => {
setTimeout(resolve, 1000)
}
)
assert.strictEqual(counter > 0, true)
})
})
})