Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified assets/diagram.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ jest.mock('./services/linkedin-profile.service', () => ({
jest.mock('./services/queue.client', () => ({
QueueClient: {
sendToResultQueue: jest.fn().mockResolvedValue(undefined),
resendMessage: jest.fn().mockResolvedValue(undefined)
resendMessage: jest.fn().mockResolvedValue(undefined),
removeMessage: jest.fn().mockResolvedValue(undefined)
}
}));

Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { logger } from './util/logger';
export const handler: Handler = async (event: SQSEvent): Promise<LinkedinProfileResponse | undefined> => {
const request = LinkedinProfileRequestMapper.toDomain(event);
const env = Environment.setupEnvironment(request);
const receiptHandle = event.Records[0].receiptHandle;

try {
logger.info(`⌛️ [handler] Starting Linkedin profile request for linkedinProfileUrl: ${request.linkedinProfileUrl}`, request);
Expand All @@ -38,6 +39,7 @@ export const handler: Handler = async (event: SQSEvent): Promise<LinkedinProfile
logger.error(`❌ [handler] Error processing Linkedin profile request`, { error, errorType, errorMessage, event });
const result = LinkedinProfileResponseMapper.toErrorResponse(errorType, errorMessage, request);
await QueueClient.sendToResultQueue(result, env);
await QueueClient.removeMessage(receiptHandle, env);
throw error;
}
};
17 changes: 15 additions & 2 deletions src/services/queue.client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ describe('QueueClient', () => {
);
});

it('should remove a message from the queue', async () => {
const receiptHandle = 'fake-receipt-handle';
const spyDeleteMessageCommand = jest.spyOn(require('@aws-sdk/client-sqs'), 'DeleteMessageCommand');

await QueueClient.removeMessage(receiptHandle, mockEnvironment);

expect(spyDeleteMessageCommand).toHaveBeenCalledWith(
expect.objectContaining({
QueueUrl: mockEnvironment.AWS_QUEUE_URL,
ReceiptHandle: receiptHandle
})
);
});

it('should resend a message to the queue', async () => {
const request = createMockedLinkedinProfileRequest();
const spySendMessageCommand = jest.spyOn(require('@aws-sdk/client-sqs'), 'SendMessageCommand');
Expand All @@ -55,8 +69,7 @@ describe('QueueClient', () => {
MessageBody: JSON.stringify({ ...request, attempt: 2 }),
QueueUrl: mockEnvironment.AWS_QUEUE_URL,
MessageGroupId: expect.any(String),
MessageDeduplicationId: expect.any(String),
DelaySeconds: 120
MessageDeduplicationId: expect.any(String)
})
);
});
Expand Down
19 changes: 16 additions & 3 deletions src/services/queue.client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SendMessageCommand, SendMessageCommandInput, SQSClient } from '@aws-sdk/client-sqs';
import { DeleteMessageCommand, SendMessageCommand, SendMessageCommandInput, SQSClient } from '@aws-sdk/client-sqs';
import { v4 as uuid } from 'uuid';
import { LinkedinProfileRequest } from '../contracts/linkedin-profile.request';
import { LinkedinProfileResponse } from '../contracts/linkedin-profile.response';
Expand Down Expand Up @@ -40,8 +40,7 @@ export class QueueClient {
MessageBody: JSON.stringify({ ...request, attempt }),
QueueUrl: queueUrl,
MessageGroupId: uuid(),
MessageDeduplicationId: uuid(),
DelaySeconds: 60 * attempt // 1-2-3 minutes
MessageDeduplicationId: uuid()
};

if (queueUrl) {
Expand All @@ -52,4 +51,18 @@ export class QueueClient {
logger.warn(`💌 [QueueClient] Sending message again to queue: no queue provided`, message);
}
}

public static async removeMessage(receiptHandle: string, environment: Environment): Promise<void> {
const queueUrl = environment.AWS_QUEUE_URL;
const region = environment.AWS_REGION;
const endpoint = environment.AWS_SQS_ENDPOINT;
const messageClient = new SQSClient({ region, endpoint });

if (queueUrl) {
logger.info(`💌 [QueueClient] Remove message with receiptHandle: ${receiptHandle} from queue: ${queueUrl}`);
await messageClient.send(new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: receiptHandle }));
} else {
logger.warn(`💌 [QueueClient] Remove message with receiptHandle: ${receiptHandle} from queue: no queue provided`);
}
}
}
Loading