diff --git a/src/assertion/assertion.controller.ts b/src/assertion/assertion.controller.ts index 11c7634..53d1a00 100644 --- a/src/assertion/assertion.controller.ts +++ b/src/assertion/assertion.controller.ts @@ -176,15 +176,12 @@ export class AssertionController { delete session.challenge; session.wallet = user.wallet; // Emit the signin event for the given request id - if ( - typeof body?.clientExtensionResults?.liquid?.requestId !== 'undefined' - ) { - this.client.emit('auth', { - requestId: body.clientExtensionResults.liquid.requestId, - wallet: user.wallet, - credId: body.id, - }); - } + this.client.emit('auth', { + requestId: body?.clientExtensionResults?.liquid?.requestId, + wallet: user.wallet, + credId: body.id, + sessionId: session.id, + }); this.logger.debug('User', user); return user; } diff --git a/src/attestation/attestation.controller.ts b/src/attestation/attestation.controller.ts index 1091108..f8ad704 100644 --- a/src/attestation/attestation.controller.ts +++ b/src/attestation/attestation.controller.ts @@ -133,15 +133,14 @@ export class AttestationController { // Authorize user with a wallet session session.wallet = username; // Handle Liquid Extension - if ( - typeof body?.clientExtensionResults?.liquid?.requestId !== 'undefined' - ) { - this.client.emit('auth', { - requestId: body.clientExtensionResults.liquid.requestId, - wallet: user.wallet, - credId: credential.credId, - }); - } + this.client.emit('auth', { + requestId: body?.clientExtensionResults?.liquid?.requestId, + wallet: user.wallet, + credId: credential.credId, + sessionId: session.id, + }); + + console.log('session', session); this.logger.debug('User', user); return user; diff --git a/src/signals/signals.gateway.spec.ts b/src/signals/signals.gateway.spec.ts index 785fb6f..17b7ddc 100644 --- a/src/signals/signals.gateway.spec.ts +++ b/src/signals/signals.gateway.spec.ts @@ -15,6 +15,7 @@ import { Session } from 'express-session'; const clientMock = { request: { session: sessionFixtures.authorized, + sessionID: 'authorized-session-id', }, rooms: new Set(), join: jest.fn(), @@ -35,6 +36,7 @@ jest.mock('socket.io', () => { return { emit: jest.fn(), in: jest.fn().mockReturnThis(), + socketsJoin: jest.fn(), sockets: { adapter: ioAdapterMock, }, @@ -99,7 +101,9 @@ describe('SignalsGateway', () => { await gateway.handleConnection({ request: { session: sessionFixtures.unauthorized, + sessionID: 'unauthorized-session-id', }, + join: jest.fn(), } as unknown as Socket); // @ts-expect-error, testing purposes expect(gateway.logger.debug).toHaveBeenCalled(); @@ -110,13 +114,22 @@ describe('SignalsGateway', () => { expect(gateway.logger.debug).toHaveBeenCalled(); }); it('should handle a link event', async () => { - await gateway.link( + const obs = await gateway.link( { requestId: '019097ff-bb8c-7d5d-9822-7c9eb2c0d419' }, clientMock, ); - expect(clientMock.join).toHaveBeenCalledWith( - sessionFixtures.authorized.wallet, + obs.subscribe(); + + await linkEventFn( + 'auth', + JSON.stringify({ + data: { + requestId: '019097ff-bb8c-7d5d-9822-7c9eb2c0d419', + wallet: sessionFixtures.authorized.wallet, + }, + }), ); + expect((sessionFixtures.authorized as any).reload).toHaveBeenCalled(); expect(globalThis.handleObserver).toBeInstanceOf(Function); expect( globalThis.handleObserver({ next: jest.fn(), complete: jest.fn() }), diff --git a/src/signals/signals.gateway.ts b/src/signals/signals.gateway.ts index 404df99..5679213 100644 --- a/src/signals/signals.gateway.ts +++ b/src/signals/signals.gateway.ts @@ -49,6 +49,23 @@ export class SignalsGateway */ afterInit(server: Server) { this.ioAdapter = server.sockets.adapter as unknown as RedisIoAdapter; + this.ioAdapter.subClient.subscribe('auth'); + this.ioAdapter.subClient.on('message', (channel, message) => { + if (channel === 'auth') { + try { + const parsed = JSON.parse(message); + const data = parsed.data || parsed; + if (data.sessionId && data.wallet) { + this.logger.debug( + `(*) Global Auth Event: Joining Sockets for Session ${data.sessionId} to Room ${data.wallet}`, + ); + server.in(data.sessionId).socketsJoin(data.wallet); + } + } catch (e) { + this.logger.error('Failed to handle global auth message', e); + } + } + }); } /** @@ -68,6 +85,9 @@ export class SignalsGateway session.wallet ? ` and PublicKey: ${session.wallet}` : '' }`, ); + if (typeof request.sessionID === 'string') { + await socket.join(request.sessionID); + } if ( typeof session.wallet === 'string' && !socket.rooms.has(session.wallet) @@ -105,20 +125,33 @@ export class SignalsGateway if (session) { await this.ioAdapter.subClient.subscribe('auth'); const handleObserver = (observer: Subscriber) => { - const handleAuthMessage = async (_: any, eventMessage: string) => { - const { data } = JSON.parse(eventMessage); - if (body.requestId === data.requestId) { - this.logger.debug( - `(*) Linking Wallet: ${data.wallet} to Session: ${request.sessionID}`, - ); - await this.authService.updateSessionWallet(session, data.wallet); - this.logger.debug( - `(*) Joining Room: ${data.wallet} with Session: ${request.sessionID}`, - ); - await client.join(data.wallet); - observer.next(data); + const handleAuthMessage = async ( + channel: string, + eventMessage: string, + ) => { + if (channel !== 'auth') { + return; + } + try { + const parsed = JSON.parse(eventMessage); + const data = parsed.data || parsed; + if (data && body.requestId === data.requestId) { + this.logger.debug( + `(*) Linking Wallet: ${data.wallet} to Session: ${request.sessionID}`, + ); + await this.authService.updateSessionWallet(session, data.wallet); + await reloadSession(request.session); + this.logger.debug( + `(*) Joining Room: ${data.wallet} with Session: ${request.sessionID}`, + ); + await client.join(data.wallet); + this.ioAdapter.subClient.off('message', handleAuthMessage); + observer.next(data); + observer.complete(); + } + } catch (e) { + this.logger.error('Failed to handle auth message in link', e); this.ioAdapter.subClient.off('message', handleAuthMessage); - observer.complete(); } };