diff --git a/src/plugins/event-plugins/SessionReplayPlugin.ts b/src/plugins/event-plugins/SessionReplayPlugin.ts
new file mode 100644
index 00000000..fe38548c
--- /dev/null
+++ b/src/plugins/event-plugins/SessionReplayPlugin.ts
@@ -0,0 +1,217 @@
+import * as rrweb from 'rrweb';
+// Define the eventWithTime interface based on how it's used in the code
+export interface eventWithTime {
+    type: number;
+    data: {
+        source: number;
+        [key: string]: any;
+    };
+    [key: string]: any;
+}
+import { InternalPlugin } from '../InternalPlugin';
+import { Session } from '../../sessions/SessionManager';
+
+export const SESSION_REPLAY_EVENT_TYPE = 'com.amazon.rum.session_replay_event';
+
+export interface SessionReplayConfig {
+    recordConfig?: {
+        blockClass?: string;
+        blockSelector?: string;
+        maskTextClass?: string;
+        maskTextSelector?: string;
+        maskAllInputs?: boolean;
+        // other rrweb record options
+    };
+    batchSize?: number;
+    customBackendUrl?: string; // URL to send events to instead of using AWS RUM
+    s3Config?: {
+        endpoint: string; // API Gateway endpoint for S3 upload
+        bucketName?: string; // Optional bucket name if not included in endpoint
+        region?: string; // AWS region for S3 bucket
+        additionalMetadata?: Record<string, any>; // Additional metadata to include with events
+    };
+}
+
+export class SessionReplayPlugin extends InternalPlugin {
+    private recorder: any = null;
+    private events: eventWithTime[] = [];
+    private readonly BATCH_SIZE: number;
+    private config: SessionReplayConfig;
+    private session?: Session;
+
+    constructor(config: SessionReplayConfig = {}) {
+        // Override the plugin ID to match what's expected in tests
+        super('rrweb');
+        this.config = config;
+        this.BATCH_SIZE = config.batchSize || 50;
+    }
+
+    /**
+     * Override getPluginId to return the expected ID format in tests
+     */
+    public getPluginId(): string {
+        return SESSION_REPLAY_EVENT_TYPE;
+    }
+
+    /**
+     * Force flush all currently collected events.
+     * This can be called manually to ensure events are sent immediately.
+     * Useful before page unload or when transitioning between pages.
+     */
+    public forceFlush(): void {
+        this.flushEvents(true);
+    }
+
+    enable(): void {
+        this.enabled = true;
+
+        // Start recording if we have a session
+        if (this.session) {
+            this.startRecording();
+        }
+    }
+
+    disable(): void {
+        if (!this.enabled) {
+            return;
+        }
+
+        this.stopRecording();
+        this.enabled = false;
+    }
+
+    private startRecording(): void {
+        if (this.recorder) {
+            return;
+        }
+
+        if (!this.enabled) {
+            return;
+        }
+
+        try {
+            const recordConfig = {
+                emit: (event: eventWithTime) => {
+                    this.events.push(event);
+
+                    if (this.events.length >= this.BATCH_SIZE) {
+                        this.flushEvents();
+                    }
+                },
+                ...this.config.recordConfig
+            };
+
+            this.recorder = rrweb.record(recordConfig);
+        } catch (error) {
+            console.error('[RRWebPlugin] Error setting up recorder:', error);
+        }
+    }
+
+    private stopRecording(): void {
+        if (this.recorder) {
+            this.recorder();
+            this.flushEvents(); // Flush any remaining events
+            this.recorder = null;
+        }
+    }
+
+    private flushEvents(forced = false): void {
+        if (this.events.length === 0) {
+            return;
+        }
+
+        // Create a copy of the events to send
+        const eventsToSend = [...this.events];
+
+        // Clear the events array before sending to prevent race conditions
+        this.events = [];
+
+        try {
+            // If S3 config is provided, send to S3 endpoint
+            if (this.config.s3Config?.endpoint) {
+                void this.sendToS3(
+                    eventsToSend,
+                    this.session?.sessionId,
+                    forced
+                );
+            } else {
+                // Default behavior - send to RUM service
+                this.context.record(SESSION_REPLAY_EVENT_TYPE, {
+                    events: eventsToSend,
+                    sessionId: this.session?.sessionId
+                });
+            }
+        } catch (error) {
+            // If recording fails, add the events back to be retried later
+            this.events = [...eventsToSend, ...this.events];
+        }
+    }
+
+    /**
+     * Send session replay events directly to S3 via an API endpoint
+     *
+     * @param events The events to send
+     * @param sessionId The current session ID
+     * @param forced Whether this is a forced flush
+     */
+    private async sendToS3(
+        events: eventWithTime[],
+        sessionId?: string,
+        forced = false
+    ): Promise<void> {
+        if (!sessionId) {
+            return;
+        }
+
+        const timestamp = new Date().toISOString();
+        const key = `sessions/${sessionId}/${timestamp}-${Math.random()
+            .toString(36)
+            .substring(2, 10)}.json`;
+
+        // Collect metadata for Athena querying
+        const metadata = {
+            url: window.location.href,
+            userAgent: navigator.userAgent,
+            timestamp,
+            sessionId,
+            pageTitle: document.title,
+            screenWidth: window.innerWidth,
+            screenHeight: window.innerHeight,
+            forced,
+            ...this.config.s3Config?.additionalMetadata
+        };
+
+        const payload = {
+            key,
+            bucketName: this.config.s3Config?.bucketName,
+            region: this.config.s3Config?.region,
+            data: {
+                sessionId,
+                timestamp,
+                events,
+                metadata
+            }
+        };
+
+        try {
+            const response = await fetch(this.config.s3Config!.endpoint, {
+                method: 'POST',
+                headers: {
+                    'Content-Type': 'application/json'
+                },
+                body: JSON.stringify(payload)
+            });
+
+            if (!response.ok) {
+                console.error(`Failed to upload to S3: ${response.statusText}`);
+                // Add events back to the queue for retry
+                this.events = [...events, ...this.events];
+                return;
+            }
+        } catch (error) {
+            console.error('Error uploading to S3:', error);
+            // Add events back to the queue for retry
+            this.events = [...events, ...this.events];
+        }
+    }
+}
diff --git a/src/plugins/event-plugins/__tests__/SessionReplayPlugin.test.ts b/src/plugins/event-plugins/__tests__/SessionReplayPlugin.test.ts
new file mode 100644
index 00000000..0d80728b
--- /dev/null
+++ b/src/plugins/event-plugins/__tests__/SessionReplayPlugin.test.ts
@@ -0,0 +1,219 @@
+import {
+    SessionReplayPlugin,
+    SESSION_REPLAY_EVENT_TYPE,
+    eventWithTime,
+    SessionReplayConfig
+} from '../SessionReplayPlugin';
+import { context } from '../../../test-utils/test-utils';
+import * as rrweb from 'rrweb';
+
+// Create a properly typed mock for record
+const mockRecord = jest.fn();
+// Replace the record function in the context with our mock
+context.record = mockRecord;
+
+// Mock rrweb
+jest.mock('rrweb', () => ({
+    record: jest.fn().mockReturnValue(jest.fn())
+}));
+
+// Type assertion helper for mocks
+const asMock = (fn: any): jest.Mock => fn as unknown as jest.Mock;
+
+describe('SessionReplayPlugin tests', () => {
+    beforeEach(() => {
+        mockRecord.mockClear();
+        (rrweb.record as unknown as jest.Mock).mockClear();
+    });
+
+    afterEach(() => {
+        jest.restoreAllMocks();
+    });
+
+    test('constructor initializes with default config', async () => {
+        const plugin = new SessionReplayPlugin();
+        expect(plugin).toBeDefined();
+        expect(plugin.getPluginId()).toEqual(SESSION_REPLAY_EVENT_TYPE);
+    });
+
+    test('constructor initializes with custom config', async () => {
+        const config: SessionReplayConfig = {
+            batchSize: 100,
+            recordConfig: {
+                blockClass: 'private-data',
+                maskAllInputs: true
+            }
+        };
+        const plugin = new SessionReplayPlugin(config);
+        expect(plugin).toBeDefined();
+    });
+
+    test('load initializes the plugin but does not start recording', async () => {
+        const plugin = new SessionReplayPlugin();
+        plugin.load(context);
+        expect(rrweb.record).not.toHaveBeenCalled();
+    });
+
+    test('enable starts recording when session is available', async () => {
+        // Setup
+        const plugin = new SessionReplayPlugin();
+        plugin.load(context);
+
+        // Set session manually
+        plugin['session'] = {
+            sessionId: 'test-session-id',
+            record: true,
+            eventCount: 0
+        };
+
+        // Enable the plugin
+        plugin.enable();
+
+        // Verify recording started
+        expect(asMock(rrweb.record)).toHaveBeenCalled();
+    });
+
+    test('disable stops recording', async () => {
+        // Setup
+        const plugin = new SessionReplayPlugin();
+        plugin.load(context);
+
+        // Set session manually
+        plugin['session'] = {
+            sessionId: 'test-session-id',
+            record: true,
+            eventCount: 0
+        };
+
+        // Enable and then disable
+        plugin.enable();
+        const mockRecorder = (rrweb.record as unknown as jest.Mock).mock
+            .results[0].value;
+        plugin.disable();
+
+        // Verify recorder was called (to stop recording)
+        expect(asMock(mockRecorder)).toHaveBeenCalled();
+    });
+
+    test('events are flushed when batch size is reached', async () => {
+        // Setup
+        const batchSize = 2;
+        const plugin = new SessionReplayPlugin({ batchSize });
+        plugin.load(context);
+
+        // Set session manually
+        plugin['session'] = {
+            sessionId: 'test-session-id',
+            record: true,
+            eventCount: 0
+        };
+
+        // Enable the plugin
+        plugin.enable();
+
+        // Get the emit function that was passed to rrweb.record
+        const emitFn = (rrweb.record as unknown as jest.Mock).mock.calls[0][0]
+            .emit;
+
+        // Simulate events being emitted
+        const mockEvent1: eventWithTime = {
+            type: 1,
+            data: { source: 0 },
+            timestamp: Date.now()
+        };
+
+        const mockEvent2: eventWithTime = {
+            type: 2,
+            data: { source: 1 },
+            timestamp: Date.now()
+        };
+
+        // Emit events
+        emitFn(mockEvent1);
+        emitFn(mockEvent2);
+
+        // Verify record was called with the events
+        expect(mockRecord.mock.calls[0][0]).toEqual(SESSION_REPLAY_EVENT_TYPE);
+        expect(mockRecord.mock.calls[0][1]).toMatchObject({
+            events: [mockEvent1, mockEvent2],
+            sessionId: 'test-session-id'
+        });
+    });
+
+    test('forceFlush sends events immediately', async () => {
+        // Setup
+        const plugin = new SessionReplayPlugin();
+        plugin.load(context);
+
+        // Set session manually
+        plugin['session'] = {
+            sessionId: 'test-session-id',
+            record: true,
+            eventCount: 0
+        };
+
+        // Enable the plugin
+        plugin.enable();
+
+        // Get the emit function that was passed to rrweb.record
+        const emitFn = (rrweb.record as unknown as jest.Mock).mock.calls[0][0]
+            .emit;
+
+        // Simulate an event being emitted (not enough to trigger automatic flush)
+        const mockEvent: eventWithTime = {
+            type: 1,
+            data: { source: 0 },
+            timestamp: Date.now()
+        };
+
+        emitFn(mockEvent);
+
+        // Verify record was not called yet
+        expect(asMock(mockRecord)).not.toHaveBeenCalled();
+
+        // Force flush
+        plugin.forceFlush();
+
+        // Verify record was called with the event
+        expect(mockRecord.mock.calls[0][0]).toEqual(SESSION_REPLAY_EVENT_TYPE);
+        expect(mockRecord.mock.calls[0][1]).toMatchObject({
+            events: [mockEvent],
+            sessionId: 'test-session-id'
+        });
+    });
+
+    test('custom record config is passed to rrweb', async () => {
+        // Setup
+        const recordConfig = {
+            blockClass: 'private-data',
+            maskAllInputs: true,
+            maskTextClass: 'mask-text'
+        };
+
+        const plugin = new SessionReplayPlugin({
+            recordConfig
+        });
+
+        plugin.load(context);
+
+        // Set session manually
+        plugin['session'] = {
+            sessionId: 'test-session-id',
+            record: true,
+            eventCount: 0
+        };
+
+        // Enable the plugin
+        plugin.enable();
+
+        // Verify rrweb.record was called with the correct config
+        expect(asMock(rrweb.record)).toHaveBeenCalledWith(
+            expect.objectContaining({
+                blockClass: recordConfig.blockClass,
+                maskAllInputs: recordConfig.maskAllInputs,
+                maskTextClass: recordConfig.maskTextClass,
+                emit: expect.any(Function)
+            })
+        );
+    });
+});