From 38db7c1b8261f34b254fd2b98ddf3937682c5058 Mon Sep 17 00:00:00 2001 From: Vraj Routu Date: Thu, 13 Nov 2025 12:39:24 -0500 Subject: [PATCH] Add proactive messaging helper and endpoints --- compat/baseline/agents-hosting.api.md | 83 ++++-- packages/agents-hosting/README.md | 71 +++-- .../src/app/agentApplication.ts | 12 + .../src/app/agentApplicationOptions.ts | 8 + packages/agents-hosting/src/app/index.ts | 1 + .../agents-hosting/src/app/proactive/index.ts | 3 + .../src/app/proactive/proactiveActions.ts | 256 ++++++++++++++++++ .../proactive/proactiveServiceExtensions.ts | 124 +++++++++ .../src/app/proactive/proactiveTypes.ts | 58 ++++ .../test/hosting/app/proactiveActions.test.ts | 136 ++++++++++ 10 files changed, 715 insertions(+), 37 deletions(-) create mode 100644 packages/agents-hosting/src/app/proactive/index.ts create mode 100644 packages/agents-hosting/src/app/proactive/proactiveActions.ts create mode 100644 packages/agents-hosting/src/app/proactive/proactiveServiceExtensions.ts create mode 100644 packages/agents-hosting/src/app/proactive/proactiveTypes.ts create mode 100644 packages/agents-hosting/test/hosting/app/proactiveActions.test.ts diff --git a/compat/baseline/agents-hosting.api.md b/compat/baseline/agents-hosting.api.md index 008333e1..eaa9b93c 100644 --- a/compat/baseline/agents-hosting.api.md +++ b/compat/baseline/agents-hosting.api.md @@ -154,11 +154,12 @@ export const adaptiveCardsSearchParamsZodSchema: z.ZodObject<{ }>; // @public -export class AgentApplication { - constructor(options?: Partial>); - get adapter(): BaseAdapter; - get adaptiveCards(): AdaptiveCardsActions; - addRoute(selector: RouteSelector, handler: RouteHandler, isInvokeRoute?: boolean, rank?: number, authHandlers?: string[]): this; +export class AgentApplication { + constructor(options?: Partial>); + get adapter(): BaseAdapter; + get adaptiveCards(): AdaptiveCardsActions; + get proactive(): ProactiveActions; + addRoute(selector: RouteSelector, handler: RouteHandler, isInvokeRoute?: boolean, rank?: number, authHandlers?: string[]): this; // (undocumented) protected readonly _afterTurn: ApplicationEventHandler[]; get authorization(): Authorization; @@ -203,21 +204,63 @@ export class AgentApplicationBuilder { } // @public -export interface AgentApplicationOptions { - adapter?: CloudAdapter; - adaptiveCardsOptions?: AdaptiveCardsOptions; - agentAppId?: string; - authorization?: AuthorizationOptions; - fileDownloaders?: InputFileDownloader[]; - headerPropagation?: HeaderPropagationDefinition; - longRunningMessages: boolean; - normalizeMentions?: boolean; - removeRecipientMention?: boolean; - startTypingTimer: boolean; - storage?: Storage_2; - transcriptLogger?: TranscriptLogger; - turnStateFactory: () => TState; -} +export interface AgentApplicationOptions { + adapter?: CloudAdapter; + adaptiveCardsOptions?: AdaptiveCardsOptions; + agentAppId?: string; + authorization?: AuthorizationOptions; + fileDownloaders?: InputFileDownloader[]; + headerPropagation?: HeaderPropagationDefinition; + longRunningMessages: boolean; + normalizeMentions?: boolean; + proactiveOptions?: ProactiveOptions; + removeRecipientMention?: boolean; + startTypingTimer: boolean; + storage?: Storage_2; + transcriptLogger?: TranscriptLogger; + turnStateFactory: () => TState; +} + +// @public +export class ProactiveActions { + constructor(app: AgentApplication, options?: ProactiveOptions); + deleteReference(conversationId: string, channelId: string): Promise; + getReference(conversationId: string, channelId: string): Promise; + saveReference(conversationId: string, channelId: string, identity: JwtPayload, reference: ConversationReference, ttlOverrideSeconds?: number): Promise; + sendActivities(conversationId: string, channelId: string, activities: (Activity | Partial)[]): Promise; + sendToReference(identity: JwtPayload, reference: ConversationReference, activities: (Activity | Partial)[]): Promise; +} + +// @public +export interface ProactiveHttpOptions { + prefix?: string; +} + +// @public +export interface ProactiveOptions { + autoPersistReferences?: boolean; + keyFactory?: (channelId: string, conversationId: string) => string | Promise; + referenceTtlSeconds?: number; + storage?: Storage_2; +} + +// @public +export interface ProactiveReferenceRecord extends StoreItem { + channelId: string; + conversationId: string; + expiresUtc?: string; + identity: JwtPayload; + reference: ConversationReference; + updatedUtc: string; +} + +// @public +export interface ProactiveSendResult { + activityIds: string[]; +} + +// @public +export function registerProactiveRoutes(app: Application, agent: AgentApplication, options?: ProactiveHttpOptions): void; // @public export const AgentCallbackHandlerKey = "agentCallbackHandler"; diff --git a/packages/agents-hosting/README.md b/packages/agents-hosting/README.md index 3d741b94..09d8e607 100644 --- a/packages/agents-hosting/README.md +++ b/packages/agents-hosting/README.md @@ -54,20 +54,57 @@ Host the bot with express // index.ts import express, { Response } from 'express' import { Request, CloudAdapter, authorizeJWT, AuthConfiguration, loadAuthConfigFromEnv } from '@microsoft/agents-hosting' -import { EchoBot } from './myHandler' - -const authConfig: AuthConfiguration = loadAuthConfigFromEnv() - -const adapter = new CloudAdapter(authConfig) -const myHandler = new MyHandler() - -const app = express() - -app.use(express.json()) -app.use(authorizeJWT(authConfig)) - -app.post('/api/messages', async (req: Request, res: Response) => { - await adapter.process(req, res, async (context) => await myHandler.run(context)) -}) - -``` +import { EchoBot } from './myHandler' + +const authConfig: AuthConfiguration = loadAuthConfigFromEnv() + +const adapter = new CloudAdapter(authConfig) +const myHandler = new MyHandler() + +const app = express() + +app.use(express.json()) +app.use(authorizeJWT(authConfig)) + +app.post('/api/messages', async (req: Request, res: Response) => { + await adapter.process(req, res, async (context) => await myHandler.run(context)) +}) + +``` + +## Proactive messaging + +`AgentApplication.proactive` simplifies persisting conversation references and sending activities outside the normal turn flow. + +```ts +import { Activity, AgentApplication, MemoryStorage } from '@microsoft/agents-hosting' + +const app = new AgentApplication({ + storage: new MemoryStorage(), + proactiveOptions: { autoPersistReferences: true } +}) + +app.onMessage(async (context) => { + await context.sendActivity('Thanks, I will keep you posted!') +}) + +await app.proactive.sendActivities( + 'conversation-id', + 'msteams', + [Activity.fromObject({ type: 'message', text: 'Here is a proactive update.' })] +) +``` + +To integrate with external schedulers or services, register the optional HTTP endpoints: + +```ts +import express from 'express' +import { registerProactiveRoutes } from '@microsoft/agents-hosting' + +const server = express() +server.use(express.json()) + +registerProactiveRoutes(server, app) +``` + +The extension adds `/api/sendactivity` and `/api/sendtoreference` endpoints that call the proactive helper internally. diff --git a/packages/agents-hosting/src/app/agentApplication.ts b/packages/agents-hosting/src/app/agentApplication.ts index ed823038..86cef70d 100644 --- a/packages/agents-hosting/src/app/agentApplication.ts +++ b/packages/agents-hosting/src/app/agentApplication.ts @@ -8,6 +8,7 @@ import { ResourceResponse } from '../connector-client' import { debug } from '@microsoft/agents-activity/logger' import { TurnContext } from '../turnContext' import { AdaptiveCardsActions } from './adaptiveCards' +import { ProactiveActions } from './proactive' import { AgentApplicationOptions } from './agentApplicationOptions' import { ConversationUpdateEvents } from './conversationUpdateEvents' import { AgentExtension } from './extensions' @@ -79,6 +80,7 @@ export class AgentApplication { private _typingTimer: NodeJS.Timeout | undefined protected readonly _extensions: AgentExtension[] = [] private readonly _adaptiveCards: AdaptiveCardsActions + private readonly _proactive: ProactiveActions /** * Creates a new instance of AgentApplication. @@ -118,6 +120,7 @@ export class AgentApplication { } this._adaptiveCards = new AdaptiveCardsActions(this) + this._proactive = new ProactiveActions(this, this._options.proactiveOptions) if (this._options.adapter) { this._adapter = this._options.adapter @@ -195,6 +198,15 @@ export class AgentApplication { return this._adaptiveCards } + /** + * Gets the proactive messaging helper for the application. + * + * @returns The proactive actions instance. + */ + public get proactive (): ProactiveActions { + return this._proactive + } + /** * Sets an error handler for the application. * diff --git a/packages/agents-hosting/src/app/agentApplicationOptions.ts b/packages/agents-hosting/src/app/agentApplicationOptions.ts index 660cea69..d71dd551 100644 --- a/packages/agents-hosting/src/app/agentApplicationOptions.ts +++ b/packages/agents-hosting/src/app/agentApplicationOptions.ts @@ -7,6 +7,7 @@ import { CloudAdapter } from '../cloudAdapter' import { Storage } from '../storage' import { TranscriptLogger } from '../transcript' import { AdaptiveCardsOptions } from './adaptiveCards' +import { ProactiveOptions } from './proactive' import { InputFileDownloader } from './inputFileDownloader' import { TurnState } from './turnState' import { HeaderPropagationDefinition } from '../headerPropagation' @@ -107,6 +108,13 @@ export interface AgentApplicationOptions { */ adaptiveCardsOptions?: AdaptiveCardsOptions; + /** + * Configuration for proactive messaging support. + * + * @default undefined (no proactive helper configured) + */ + proactiveOptions?: ProactiveOptions; + /** * Whether to automatically remove mentions of the bot's name from incoming messages. * When enabled, if a user mentions the bot by name (e.g., "@BotName hello"), the mention diff --git a/packages/agents-hosting/src/app/index.ts b/packages/agents-hosting/src/app/index.ts index da9825ff..a22e8972 100644 --- a/packages/agents-hosting/src/app/index.ts +++ b/packages/agents-hosting/src/app/index.ts @@ -16,5 +16,6 @@ export * from './inputFileDownloader' export * from './appMemory' export * from './extensions' export * from './adaptiveCards' +export * from './proactive' export * from './streaming/streamingResponse' export * from './streaming/citation' diff --git a/packages/agents-hosting/src/app/proactive/index.ts b/packages/agents-hosting/src/app/proactive/index.ts new file mode 100644 index 00000000..06070cd8 --- /dev/null +++ b/packages/agents-hosting/src/app/proactive/index.ts @@ -0,0 +1,3 @@ +export * from './proactiveActions' +export * from './proactiveTypes' +export * from './proactiveServiceExtensions' diff --git a/packages/agents-hosting/src/app/proactive/proactiveActions.ts b/packages/agents-hosting/src/app/proactive/proactiveActions.ts new file mode 100644 index 00000000..5b179e02 --- /dev/null +++ b/packages/agents-hosting/src/app/proactive/proactiveActions.ts @@ -0,0 +1,256 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import { Activity, ActivityTypes, ConversationReference } from '@microsoft/agents-activity' +import { debug } from '@microsoft/agents-activity/logger' +import { JwtPayload } from 'jsonwebtoken' +import { AgentApplication } from '../agentApplication' +import { TurnContext } from '../../turnContext' +import { TurnState } from '../turnState' +import { Storage } from '../../storage' +import { ProactiveOptions, ProactiveReferenceRecord, ProactiveSendResult } from './proactiveTypes' + +const logger = debug('agents:app:proactive') + +interface StoredReferenceMap { + [key: string]: ProactiveReferenceRecord | undefined; +} + +const DEFAULT_KEY_PREFIX = 'proactive' + +type ActivityLike = Activity | Partial + +/** + * Provides helper APIs for proactive communication scenarios. + * + * @typeParam TState - The turn state type used by the parent application. + */ +export class ProactiveActions { + private readonly _app: AgentApplication + private readonly _options: ProactiveOptions + private readonly _storage?: Storage + + constructor (app: AgentApplication, options?: ProactiveOptions) { + this._app = app + this._options = options ?? {} + this._storage = this._options.storage ?? app.options.storage + + if (this._options.autoPersistReferences) { + if (!this._storage) { + logger.warn('Proactive auto persistence requested but no storage was configured. Auto persistence disabled.') + } else { + this.registerAutoPersistence() + } + } + } + + /** + * Saves the conversation reference and identity for a conversation. + * + * @param conversationId - Conversation identifier. + * @param channelId - Channel identifier. + * @param identity - Identity associated with the conversation. + * @param reference - Conversation reference to persist. + * @param ttlOverrideSeconds - Optional TTL override in seconds. + */ + public async saveReference ( + conversationId: string, + channelId: string, + identity: JwtPayload, + reference: ConversationReference, + ttlOverrideSeconds?: number + ): Promise { + const storage = this.ensureStorage() + const key = await this.getStorageKey(channelId, conversationId) + const existing = await storage.read([key]) as StoredReferenceMap + const now = new Date() + + const expiresUtc = this.computeExpiry(now, ttlOverrideSeconds ?? this._options.referenceTtlSeconds) + + const record: ProactiveReferenceRecord = { + conversationId, + channelId, + identity, + reference, + updatedUtc: now.toISOString(), + expiresUtc, + eTag: existing?.[key]?.eTag + } + + await storage.write({ [key]: { ...record, eTag: record.eTag ?? '*' } }) + } + + /** + * Retrieves a stored conversation reference. + * + * @param conversationId - Conversation identifier. + * @param channelId - Channel identifier. + * @returns The stored record or undefined when not found/expired. + */ + public async getReference ( + conversationId: string, + channelId: string + ): Promise { + const storage = this.ensureStorage() + const key = await this.getStorageKey(channelId, conversationId) + const data = await storage.read([key]) as StoredReferenceMap + const record = data[key] + + if (!record) { + return undefined + } + + if (this.isExpired(record)) { + logger.debug(`Discarding expired proactive reference for ${channelId}:${conversationId}`) + await storage.delete([key]) + return undefined + } + + return record + } + + /** + * Deletes a stored conversation reference. + * + * @param conversationId - Conversation identifier. + * @param channelId - Channel identifier. + */ + public async deleteReference ( + conversationId: string, + channelId: string + ): Promise { + const storage = this.ensureStorage() + const key = await this.getStorageKey(channelId, conversationId) + await storage.delete([key]) + } + + /** + * Sends activities to a previously stored conversation reference. + * + * @param conversationId - Conversation identifier used as storage key. + * @param channelId - Channel identifier used as storage key. + * @param activities - Activities to send. + * @returns Activity IDs returned from the channel. + */ + public async sendActivities ( + conversationId: string, + channelId: string, + activities: ActivityLike[] + ): Promise { + if (activities.length === 0) { + return { activityIds: [] } + } + + const record = await this.getReference(conversationId, channelId) + if (!record) { + throw new Error(`No proactive reference found for conversation ${channelId}:${conversationId}`) + } + + return await this.sendToReference(record.identity, record.reference, activities) + } + + /** + * Sends activities directly using the provided identity and conversation reference. + * + * @param identity - Claims identity to use. + * @param reference - Conversation reference to continue. + * @param activities - Activities to send. + * @returns Activity IDs returned from the channel. + */ + public async sendToReference ( + identity: JwtPayload, + reference: ConversationReference, + activities: ActivityLike[] + ): Promise { + if (!identity) { + throw new TypeError('identity is required to send proactive activities.') + } + + if (!reference) { + throw new TypeError('reference is required to send proactive activities.') + } + + if (!this._app.adapter) { + throw new Error('Cannot send proactive activities because no adapter was configured.') + } + + const activityIds: string[] = [] + + await this._app.adapter.continueConversation(identity, reference, async (turnContext: TurnContext) => { + const toSend = activities.map((activity) => Activity.fromObject(activity)) + + const responses = await turnContext.sendActivities(toSend) + responses.forEach((response) => { + if (response?.id) { + activityIds.push(response.id) + } else { + activityIds.push('') + } + }) + }) + + return { activityIds } + } + + private registerAutoPersistence (): void { + this._app.onTurn('afterTurn', async (context) => { + try { + if (!context.activity?.conversation?.id || !context.activity.channelId) { + return true + } + + const identity = context.identity + if (!identity) { + logger.debug('Unable to persist proactive reference because context identity is missing.') + return true + } + + if (context.activity.type === ActivityTypes.EndOfConversation) { + await this.deleteReference(context.activity.conversation.id, context.activity.channelId) + return true + } + + const reference = context.activity.getConversationReference() + await this.saveReference(context.activity.conversation.id, context.activity.channelId, identity, reference) + } catch (err) { + const message = err instanceof Error ? err.stack ?? err.message : String(err) + logger.error(message) + } + + return true + }) + } + + private ensureStorage (): Storage { + if (!this._storage) { + throw new Error('Proactive messaging requires storage to be configured.') + } + return this._storage + } + + private async getStorageKey (channelId: string, conversationId: string): Promise { + if (this._options.keyFactory) { + return await this._options.keyFactory(channelId, conversationId) + } + return `${DEFAULT_KEY_PREFIX}:${channelId}:${conversationId}` + } + + private computeExpiry (now: Date, ttlSeconds?: number): string | undefined { + if (!ttlSeconds || ttlSeconds <= 0) { + return undefined + } + + return new Date(now.getTime() + ttlSeconds * 1000).toISOString() + } + + private isExpired (record: ProactiveReferenceRecord): boolean { + if (!record.expiresUtc) { + return false + } + + const expiry = new Date(record.expiresUtc).getTime() + return !isNaN(expiry) && expiry <= Date.now() + } +} diff --git a/packages/agents-hosting/src/app/proactive/proactiveServiceExtensions.ts b/packages/agents-hosting/src/app/proactive/proactiveServiceExtensions.ts new file mode 100644 index 00000000..40c354a7 --- /dev/null +++ b/packages/agents-hosting/src/app/proactive/proactiveServiceExtensions.ts @@ -0,0 +1,124 @@ +import { Application, Request, Response } from 'express' +import { Activity, ConversationReference } from '@microsoft/agents-activity' +import { AgentApplication } from '../agentApplication' +import { JwtPayload } from 'jsonwebtoken' +import { ProactiveSendResult } from './proactiveTypes' +import { TurnState } from '../turnState' + +/** + * Options for configuring the proactive HTTP endpoints. + */ +export interface ProactiveHttpOptions { + /** + * Optional path prefix applied to all generated routes. + * + * @default '/api' + */ + prefix?: string; +} + +interface SendActivityRequestBody { + conversationId?: string; + channelId?: string; + activity?: Activity; + activities?: Activity[]; +} + +interface SendToReferenceRequestBody { + identity?: JwtPayload; + reference?: ConversationReference; + activities?: Activity[]; +} + +/** + * Registers HTTP endpoints that expose the proactive messaging helper. + * + * @param app - Express application to augment. + * @param agent - Agent application instance. + * @param options - Optional configuration. + */ +export const registerProactiveRoutes = ( + app: Application, + agent: AgentApplication, + options?: ProactiveHttpOptions +): void => { + const prefix = options?.prefix ?? '/api' + + app.post(`${prefix}/sendactivity`, async (req: Request, res: Response) => { + const body = req.body as SendActivityRequestBody | undefined + + if (!body?.conversationId || !body.channelId) { + return res.status(400).json({ + status: 'Error', + error: { code: 'Validation', message: 'Both conversationId and channelId are required.' } + }) + } + + const activities = asActivityArray(body) + if (activities.length === 0) { + return res.status(400).json({ + status: 'Error', + error: { code: 'Validation', message: 'At least one activity must be supplied.' } + }) + } + + try { + const result = await agent.proactive.sendActivities(body.conversationId, body.channelId, activities) + respondWithResult(res, result, body.conversationId) + } catch (err: any) { + res.status(500).json({ + status: 'Error', + error: { code: 'ProactiveSendFailure', message: err?.message ?? 'Unable to send activities.' } + }) + } + }) + + app.post(`${prefix}/sendtoreference`, async (req: Request, res: Response) => { + const body = req.body as SendToReferenceRequestBody | undefined + + if (!body?.identity || !body.reference) { + return res.status(400).json({ + status: 'Error', + error: { code: 'Validation', message: 'Both identity and reference are required.' } + }) + } + + const activities = Array.isArray(body.activities) ? body.activities : [] + if (activities.length === 0) { + return res.status(400).json({ + status: 'Error', + error: { code: 'Validation', message: 'At least one activity must be supplied.' } + }) + } + + try { + const result = await agent.proactive.sendToReference(body.identity, body.reference, activities) + respondWithResult(res, result) + } catch (err: any) { + res.status(500).json({ + status: 'Error', + error: { code: 'ProactiveSendFailure', message: err?.message ?? 'Unable to send activities.' } + }) + } + }) +} + +const asActivityArray = (body: SendActivityRequestBody): Activity[] => { + if (Array.isArray(body.activities) && body.activities.length > 0) { + return body.activities.map((activity) => Activity.fromObject(activity)) + } + + if (body.activity) { + return [Activity.fromObject(body.activity)] + } + + return [] +} + +const respondWithResult = (res: Response, result: ProactiveSendResult, conversationId?: string) => { + res.status(200).json({ + conversationId, + status: 'Delivered', + activityIds: result.activityIds + }) +} diff --git a/packages/agents-hosting/src/app/proactive/proactiveTypes.ts b/packages/agents-hosting/src/app/proactive/proactiveTypes.ts new file mode 100644 index 00000000..edb80547 --- /dev/null +++ b/packages/agents-hosting/src/app/proactive/proactiveTypes.ts @@ -0,0 +1,58 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import { ConversationReference } from '@microsoft/agents-activity' +import { JwtPayload } from 'jsonwebtoken' +import { Storage, StoreItem } from '../../storage' + +/** + * Options for configuring proactive messaging support on an {@link AgentApplication}. + */ +export interface ProactiveOptions { + /** + * Storage provider used to persist conversation references. + * Defaults to the application's configured storage. + */ + storage?: Storage; + + /** + * When true, the SDK automatically persists conversation references + * after each turn (defaults to false). + */ + autoPersistReferences?: boolean; + + /** + * Optional time-to-live in seconds for stored references. + * When set, references expiring in the past are automatically removed. + */ + referenceTtlSeconds?: number; + + /** + * Optional factory to customize how a storage key is derived for a conversation. + */ + keyFactory?: (channelId: string, conversationId: string) => string | Promise; +} + +/** + * Represents a stored conversation reference and associated identity. + */ +export interface ProactiveReferenceRecord extends StoreItem { + conversationId: string; + channelId: string; + identity: JwtPayload; + reference: ConversationReference; + updatedUtc: string; + expiresUtc?: string; +} + +/** + * Result of sending proactive activities. + */ +export interface ProactiveSendResult { + /** + * Activity identifiers returned from the connector. + */ + activityIds: string[]; +} diff --git a/packages/agents-hosting/test/hosting/app/proactiveActions.test.ts b/packages/agents-hosting/test/hosting/app/proactiveActions.test.ts new file mode 100644 index 00000000..26065d7c --- /dev/null +++ b/packages/agents-hosting/test/hosting/app/proactiveActions.test.ts @@ -0,0 +1,136 @@ +import { strict as assert } from 'assert' +import { describe, it, beforeEach } from 'node:test' +import { Activity, ActivityTypes, ConversationReference } from '@microsoft/agents-activity' +import { BaseAdapter } from '../../../src/baseAdapter' +import { AgentApplication, MemoryStorage, TurnContext, ResourceResponse } from '../../../src' +import { JwtPayload } from 'jsonwebtoken' +import { AttachmentData, AttachmentInfo } from '../../../src' + +class ProactiveTestAdapter extends BaseAdapter { + authConfig = { clientId: 'test-app-id', issuers: [] } + public sentBatches: Activity[][] = [] + public continueCalls: Array<{ identity: JwtPayload; reference: ConversationReference }> = [] + + async sendActivities (_context: TurnContext, activities: Activity[]): Promise { + this.sentBatches.push(activities) + return activities.map((_, index) => ({ id: `activity-${this.sentBatches.length}-${index}` })) + } + + async continueConversation ( + botAppIdOrIdentity: string | JwtPayload, + reference: Partial, + logic: (revocableContext: TurnContext) => Promise + ): Promise { + const identity = typeof botAppIdOrIdentity === 'string' + ? { aud: botAppIdOrIdentity } + : botAppIdOrIdentity + + const activity = Activity.fromObject({ + type: ActivityTypes.Event, + channelId: reference.channelId, + serviceUrl: reference.serviceUrl, + conversation: { id: reference.conversation?.id }, + recipient: reference.bot ?? { id: 'bot' }, + from: reference.user ?? { id: 'user' } + }) + + const turnContext = new TurnContext(this, activity, identity as JwtPayload) + this.continueCalls.push({ identity: identity as JwtPayload, reference: reference as ConversationReference }) + await logic(turnContext) + } + + // Unused abstract members + updateActivity (): Promise { throw new Error('Not implemented') } + deleteActivity (): Promise { throw new Error('Not implemented') } + uploadAttachment (_context: TurnContext, _conversationId: string, _attachmentData: AttachmentData): Promise { throw new Error('Not implemented') } + getAttachmentInfo (): Promise { throw new Error('Not implemented') } + getAttachment (): Promise { throw new Error('Not implemented') } +} + +const createMessageActivity = () => Activity.fromObject({ + type: ActivityTypes.Message, + text: 'hello world', + channelId: 'msteams', + serviceUrl: 'https://example.org', + conversation: { id: 'conversation-1' }, + recipient: { id: 'bot-id' }, + from: { id: 'user-id' } +}) + +describe('ProactiveActions', () => { + let adapter: ProactiveTestAdapter + let storage: MemoryStorage + let app: AgentApplication + const identity: JwtPayload = { aud: 'bot-app-id' } as JwtPayload + + beforeEach(() => { + adapter = new ProactiveTestAdapter() + storage = new MemoryStorage() + app = new AgentApplication({ + adapter, + storage, + proactiveOptions: { + autoPersistReferences: true, + referenceTtlSeconds: 3600 + } + }) + + app.onActivity(ActivityTypes.Message, async (context) => { + await context.sendActivity('ack') + }) + }) + + it('automatically persists conversation references after a turn', async () => { + const activity = createMessageActivity() + const context = new TurnContext(adapter, activity, identity) + + const handled = await app.runInternal(context) + assert.equal(handled, true) + + const record = await app.proactive.getReference('conversation-1', 'msteams') + assert.ok(record, 'reference should be stored') + assert.equal(record?.identity.aud, 'bot-app-id') + assert.equal(typeof record?.updatedUtc, 'string') + assert.equal(record?.channelId, 'msteams') + }) + + it('sends proactive activities using stored references', async () => { + const activity = createMessageActivity() + const context = new TurnContext(adapter, activity, identity) + await app.runInternal(context) + + const proactiveActivity = Activity.fromObject({ + type: ActivityTypes.Message, + text: 'proactive message' + }) + + const result = await app.proactive.sendActivities('conversation-1', 'msteams', [proactiveActivity]) + assert.deepEqual(result.activityIds.length, 1) + assert.equal(adapter.sentBatches.length, 2) // one from turn, one proactive + assert.equal(adapter.sentBatches.at(-1)?.[0].text, 'proactive message') + }) + + it('removes expired references on retrieval', async () => { + const activity = createMessageActivity() + const reference = activity.getConversationReference() + await app.proactive.saveReference('conversation-expired', 'msteams', identity, reference, 1) + + const key = 'proactive:msteams:conversation-expired' + const stored = await storage.read([key]) as any + stored[key].expiresUtc = new Date(Date.now() - 1000).toISOString() + await storage.write({ [key]: stored[key] }) + + const result = await app.proactive.getReference('conversation-expired', 'msteams') + assert.equal(result, undefined) + + const after = await storage.read([key]) as any + assert.equal(after[key], undefined) + }) + + it('throws when sending to an unknown conversation', async () => { + await assert.rejects( + () => app.proactive.sendActivities('missing', 'msteams', [Activity.fromObject({ type: ActivityTypes.Message })]), + /No proactive reference found/ + ) + }) +})