@@ -14,7 +14,7 @@ const {
14
14
} = process . env ;
15
15
16
16
describe ( 'e2e Streams' , function ( ) {
17
- this . timeout ( 60_000 ) ;
17
+ this . timeout ( 120_000 ) ;
18
18
let shell : TestShell ;
19
19
20
20
before ( function ( ) {
@@ -96,26 +96,30 @@ describe('e2e Streams', function () {
96
96
97
97
const aggPipeline = [ sourceStage , mergeStage ] ;
98
98
99
- const createResult = await shell . executeLine (
100
- `sp.createStreamProcessor("${ processorName } ", ${ JSON . stringify (
101
- aggPipeline
102
- ) } )`,
103
- { timeout : 45_000 }
104
- ) ;
105
- expect ( createResult ) . to . include (
106
- `Atlas Stream Processor: ${ processorName } `
107
- ) ;
99
+ await eventually ( async ( ) => {
100
+ const createResult = await shell . executeLine (
101
+ `sp.createStreamProcessor("${ processorName } ", ${ JSON . stringify (
102
+ aggPipeline
103
+ ) } )`,
104
+ { timeout : 45_000 }
105
+ ) ;
106
+ expect ( createResult ) . to . include (
107
+ `Atlas Stream Processor: ${ processorName } `
108
+ ) ;
109
+ } ) ;
108
110
} ) ;
109
111
110
112
afterEach ( async function ( ) {
111
113
try {
112
114
await db . dropDatabase ( ) ;
113
115
await client . close ( ) ;
114
116
115
- const result = await shell . executeLine ( `sp.${ processorName } .drop()` , {
116
- timeout : 45_000 ,
117
+ await eventually ( async ( ) => {
118
+ const result = await shell . executeLine ( `sp.${ processorName } .drop()` , {
119
+ timeout : 45_000 ,
120
+ } ) ;
121
+ expect ( result ) . to . include ( `{ ok: 1 }` ) ;
117
122
} ) ;
118
- expect ( result ) . to . include ( `{ ok: 1 }` ) ;
119
123
} catch ( err : any ) {
120
124
console . error (
121
125
`Could not clean up stream processor ${ processorName } :` ,
@@ -137,48 +141,64 @@ describe('e2e Streams', function () {
137
141
const initialDocsCount = await collection . countDocuments ( ) ;
138
142
expect ( initialDocsCount ) . to . eq ( 0 ) ;
139
143
140
- const startResult = await shell . executeLine (
141
- `sp.${ processorName } .start()` ,
142
- { timeout : 45_000 }
143
- ) ;
144
- expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
144
+ await eventually ( async ( ) => {
145
+ const startResult = await shell . executeLine (
146
+ `sp.${ processorName } .start()` ,
147
+ { timeout : 45_000 }
148
+ ) ;
149
+ expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
150
+ } ) ;
145
151
146
152
let updatedDocCount = 0 ;
147
153
await eventually ( async ( ) => {
148
154
updatedDocCount = await collection . countDocuments ( ) ;
149
155
expect ( updatedDocCount ) . to . be . greaterThan ( 0 ) ;
150
156
} ) ;
151
157
152
- const stopResult = await shell . executeLine ( `sp.${ processorName } .stop()` , {
153
- timeout : 45_000 ,
158
+ await eventually ( async ( ) => {
159
+ const stopResult = await shell . executeLine (
160
+ `sp.${ processorName } .stop()` ,
161
+ {
162
+ timeout : 45_000 ,
163
+ }
164
+ ) ;
165
+ expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
154
166
} ) ;
155
- expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
156
167
157
- const statsResult = await shell . executeLine (
158
- `sp.${ processorName } .stats()` ,
159
- { timeout : 45_000 }
160
- ) ;
161
- expect ( statsResult ) . to . include ( `state: 'STOPPED'` ) ;
168
+ await eventually ( async ( ) => {
169
+ const statsResult = await shell . executeLine (
170
+ `sp.${ processorName } .stats()` ,
171
+ { timeout : 45_000 }
172
+ ) ;
173
+ expect ( statsResult ) . to . include ( `state: 'STOPPED'` ) ;
174
+ } ) ;
162
175
} ) ;
163
176
164
177
it ( `can modify an existing stream processor's pipeline` , async function ( ) {
165
178
// this field is not present on any docs emit by the stream processor
166
179
// created in the beforeEach
167
180
const newField = 'newField' ;
168
181
169
- const startResult = await shell . executeLine (
170
- `sp.${ processorName } .start()` ,
171
- { timeout : 45_000 }
172
- ) ;
173
- expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
182
+ await eventually ( async ( ) => {
183
+ const startResult = await shell . executeLine (
184
+ `sp.${ processorName } .start()` ,
185
+ { timeout : 45_000 }
186
+ ) ;
187
+ expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
188
+ } ) ;
174
189
175
190
// sleep for a bit to let the processor do stuff
176
191
await sleep ( 500 ) ;
177
192
178
- const stopResult = await shell . executeLine ( `sp.${ processorName } .stop()` , {
179
- timeout : 45_000 ,
193
+ await eventually ( async ( ) => {
194
+ const stopResult = await shell . executeLine (
195
+ `sp.${ processorName } .stop()` ,
196
+ {
197
+ timeout : 45_000 ,
198
+ }
199
+ ) ;
200
+ expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
180
201
} ) ;
181
- expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
182
202
183
203
const initialDocsWithNewField = await collection . countDocuments ( {
184
204
[ newField ] : { $exists : true } ,
@@ -211,17 +231,21 @@ describe('e2e Streams', function () {
211
231
212
232
const updatedAggPipeline = [ sourceStage , addFieldStage , mergeStage ] ;
213
233
214
- const modifyResult = await shell . executeLine (
215
- `sp.${ processorName } .modify(${ JSON . stringify ( updatedAggPipeline ) } )` ,
216
- { timeout : 45_000 }
217
- ) ;
218
- expect ( modifyResult ) . to . include ( '{ ok: 1 }' ) ;
234
+ await eventually ( async ( ) => {
235
+ const modifyResult = await shell . executeLine (
236
+ `sp.${ processorName } .modify(${ JSON . stringify ( updatedAggPipeline ) } )` ,
237
+ { timeout : 45_000 }
238
+ ) ;
239
+ expect ( modifyResult ) . to . include ( '{ ok: 1 }' ) ;
240
+ } ) ;
219
241
220
- const secondStartResult = await shell . executeLine (
221
- `sp.${ processorName } .start()` ,
222
- { timeout : 45_000 }
223
- ) ;
224
- expect ( secondStartResult ) . to . include ( '{ ok: 1 }' ) ;
242
+ await eventually ( async ( ) => {
243
+ const secondStartResult = await shell . executeLine (
244
+ `sp.${ processorName } .start()` ,
245
+ { timeout : 45_000 }
246
+ ) ;
247
+ expect ( secondStartResult ) . to . include ( '{ ok: 1 }' ) ;
248
+ } ) ;
225
249
226
250
await eventually ( async ( ) => {
227
251
const updatedDocsWithNewField = await collection . countDocuments ( {
@@ -232,17 +256,19 @@ describe('e2e Streams', function () {
232
256
} ) ;
233
257
234
258
it ( 'can view stats for a stream processor' , async function ( ) {
235
- const statsResult = await shell . executeLine (
236
- `sp.${ processorName } .stats()` ,
237
- { timeout : 45_000 }
238
- ) ;
239
- expect ( statsResult ) . to . include ( `name: '${ processorName } '` ) ;
240
- expect ( statsResult ) . to . include ( `state: 'CREATED'` ) ;
241
- expect ( statsResult ) . to . include ( 'stats: {' ) ;
242
- expect ( statsResult ) . to . include ( `pipeline: [` ) ;
243
- expect ( statsResult ) . to . include (
244
- `{ '$source': { connectionName: 'sample_stream_solar' } },`
245
- ) ;
259
+ await eventually ( async ( ) => {
260
+ const statsResult = await shell . executeLine (
261
+ `sp.${ processorName } .stats()` ,
262
+ { timeout : 45_000 }
263
+ ) ;
264
+ expect ( statsResult ) . to . include ( `name: '${ processorName } '` ) ;
265
+ expect ( statsResult ) . to . include ( `state: 'CREATED'` ) ;
266
+ expect ( statsResult ) . to . include ( 'stats: {' ) ;
267
+ expect ( statsResult ) . to . include ( `pipeline: [` ) ;
268
+ expect ( statsResult ) . to . include (
269
+ `{ '$source': { connectionName: 'sample_stream_solar' } },`
270
+ ) ;
271
+ } ) ;
246
272
} ) ;
247
273
} ) ;
248
274
0 commit comments