Skip to content

Commit 98530d0

Browse files
authored
Merge pull request #698 from ButterflyNetwork/kinesis-error
[Kinesis] Ensure submitAllRecords returns an error on failure to submit
2 parents 97df0b4 + a3cfcc8 commit 98530d0

File tree

5 files changed

+79
-23
lines changed

5 files changed

+79
-23
lines changed

AWSKinesis/AWSAbstractKinesisRecorder.m

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,19 @@ - (AWSTask *)submitAllRecords {
279279

280280
NSString *streamName = temporaryRecords[0][@"stream_name"];
281281

282-
[[self.recorderHelper submitRecordsForStream:streamName
283-
records:temporaryRecords
284-
partitionKeys:partitionKeys
285-
putPartitionKeys:putPartitionKeys
286-
retryPartitionKeys:retryPartitionKeys
287-
stop:&stop] waitUntilFinished];
282+
AWSTask *submitTask = \
283+
[self.recorderHelper submitRecordsForStream:streamName
284+
records:temporaryRecords
285+
partitionKeys:partitionKeys
286+
putPartitionKeys:putPartitionKeys
287+
retryPartitionKeys:retryPartitionKeys
288+
stop:&stop];
289+
290+
[submitTask waitUntilFinished];
291+
292+
if (submitTask.error) {
293+
error = submitTask.error;
294+
}
288295

289296
for (NSString *partitionKey in putPartitionKeys) {
290297
BOOL result = [db executeUpdate:@"DELETE FROM record WHERE partition_key = :partition_key"

AWSKinesis/AWSFirehoseRecorder.m

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,11 @@ - (AWSTask *)submitRecordsForStream:(NSString *)streamName
177177
return [[self.firehose putRecordBatch:putRecordBatchInput] continueWithBlock:^id(AWSTask *task) {
178178
if (task.error) {
179179
AWSDDLogError(@"Error: [%@]", task.error);
180-
if ([task.error.domain isEqualToString:NSURLErrorDomain]) {
180+
const NSArray *stopErrorDomains = @[NSURLErrorDomain, AWSCognitoIdentityErrorDomain];
181+
182+
if (task.error && [stopErrorDomains containsObject:task.error.domain]) {
181183
*stop = YES;
184+
return [AWSTask taskWithError:task.error];
182185
}
183186
}
184187
if (task.result) {

AWSKinesis/AWSKinesisRecorder.m

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ - (AWSTask *)submitRecordsForStream:(NSString *)streamName
181181
if ([task.error.domain isEqualToString:NSURLErrorDomain]) {
182182
*stop = YES;
183183
}
184+
185+
return [AWSTask taskWithError:task.error];
184186
}
185187
if (task.result) {
186188
AWSKinesisPutRecordsOutput *putRecordsOutput = task.result;

AWSKinesisTests/AWSFirehoseRecorderTests.m

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -191,23 +191,35 @@ - (void)testDiskAgeLimit {
191191
FirehoseRecorder.diskAgeLimit = 0.0;
192192
}
193193

194-
- (void)testAll {
195-
AWSFirehoseRecorder *firehoseRecorder = [AWSFirehoseRecorder defaultFirehoseRecorder];
196-
197-
NSMutableArray *tasks = [NSMutableArray new];
198-
for (int32_t i = 0; i < 1234; i++) {
199-
[tasks addObject:[firehoseRecorder saveRecord:[[NSString stringWithFormat:@"TestString-%02d\n", i] dataUsingEncoding:NSUTF8StringEncoding]
200-
streamName:AWSFirehoseRecorderTestStream]];
201-
}
202-
203-
[[[[AWSTask taskForCompletionOfAllTasks:tasks] continueWithSuccessBlock:^id(AWSTask *task) {
204-
sleep(10);
205-
return [firehoseRecorder submitAllRecords];
206-
}] continueWithBlock:^id(AWSTask *task) {
207-
XCTAssertNil(task.error);
208-
194+
- (void)testSubmitAllRecordsReturnsErrorOnInvalidPoolId {
195+
XCTestExpectation *expectation = [self expectationWithDescription:@"Test finished running."];
196+
197+
NSString *poolId = @"invalidPoolId";
198+
AWSCognitoCredentialsProvider *invalidCreds = \
199+
[[AWSCognitoCredentialsProvider alloc] initWithRegionType:AWSRegionUSEast1
200+
identityPoolId:poolId];
201+
202+
AWSServiceConfiguration *configuration = \
203+
[[AWSServiceConfiguration alloc] initWithRegion:AWSRegionUSEast1
204+
credentialsProvider:invalidCreds];
205+
206+
[AWSFirehoseRecorder registerFirehoseRecorderWithConfiguration:configuration
207+
forKey:poolId];
208+
AWSFirehoseRecorder *firehoseRecorder = [AWSFirehoseRecorder FirehoseRecorderForKey:poolId];
209+
[firehoseRecorder saveRecord:[@"testString" dataUsingEncoding:NSUTF8StringEncoding]
210+
streamName:@"streamName"];
211+
212+
AWSTask *submitTask = firehoseRecorder.submitAllRecords;
213+
214+
[submitTask continueWithBlock:^id(AWSTask *task) {
215+
XCTAssertNotNil(task.error, @"Task should have an error due to invalid pool id.");
216+
[expectation fulfill];
209217
return nil;
210-
}] waitUntilFinished];
218+
}];
219+
220+
[self waitForExpectationsWithTimeout:5 handler:^(NSError * _Nullable error) {
221+
XCTAssertNil(error);
222+
}];
211223
}
212224

213225
@end

AWSKinesisTests/AWSKinesisRecorderTests.m

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,38 @@ - (void)testAllWithPartitionKey {
397397
}];
398398
}
399399

400+
- (void)testSubmitAllRecordsReturnsError {
401+
XCTestExpectation *expectation = [self expectationWithDescription:@"Test finished running."];
402+
403+
NSString *poolId = @"invalidPoolId";
404+
AWSCognitoCredentialsProvider *invalidCreds = \
405+
[[AWSCognitoCredentialsProvider alloc] initWithRegionType:AWSRegionUSEast1
406+
identityPoolId:poolId];
407+
408+
AWSServiceConfiguration *configuration = \
409+
[[AWSServiceConfiguration alloc] initWithRegion:AWSRegionUSEast1
410+
credentialsProvider:invalidCreds];
411+
412+
[AWSKinesisRecorder registerKinesisRecorderWithConfiguration:configuration
413+
forKey:poolId];
414+
415+
AWSKinesisRecorder *kinesisRecorder = [AWSKinesisRecorder KinesisRecorderForKey:poolId];
416+
[kinesisRecorder saveRecord:[@"testString" dataUsingEncoding:NSUTF8StringEncoding]
417+
streamName:testStreamName];
418+
419+
AWSTask *submitTask = kinesisRecorder.submitAllRecords;
420+
421+
[submitTask continueWithBlock:^id(AWSTask *task) {
422+
XCTAssertNotNil(task.error, @"Task should have an error due to invalid pool id.");
423+
[expectation fulfill];
424+
return nil;
425+
}];
426+
427+
[self waitForExpectationsWithTimeout:5 handler:^(NSError * _Nullable error) {
428+
XCTAssertNil(error);
429+
}];
430+
}
431+
400432
- (AWSTask *)getRecords:(NSMutableArray *)returnedRecords shardIterator:(NSString *)shardIterator counter:(int32_t)counter {
401433
AWSKinesis *kinesis = [AWSKinesis defaultKinesis];
402434
AWSKinesisGetRecordsInput *getRecordsInput = [AWSKinesisGetRecordsInput new];

0 commit comments

Comments
 (0)