diff --git a/packages/live-status-gateway/src/config.ts b/packages/live-status-gateway/src/config.ts index 95366edebd..ef71d6892b 100644 --- a/packages/live-status-gateway/src/config.ts +++ b/packages/live-status-gateway/src/config.ts @@ -11,6 +11,7 @@ let deviceToken: string = process.env.DEVICE_TOKEN || '' let disableWatchdog: boolean = process.env.DISABLE_WATCHDOG === '1' || false let unsafeSSL: boolean = process.env.UNSAFE_SSL === '1' || false const certs: string[] = (process.env.CERTIFICATES || '').split(';') || [] +let healthPort: number | undefined = parseInt(process.env.HEALTH_PORT + '') || undefined let prevProcessArg = '' process.argv.forEach((val) => { @@ -37,12 +38,14 @@ process.argv.forEach((val) => { } else if (val.match(/-unsafeSSL/i)) { // Will cause the Node applocation to blindly accept all certificates. Not recommenced unless in local, controlled networks. unsafeSSL = true + } else if (prevProcessArg.match(/-healthPort/i)) { + healthPort = parseInt(val) } prevProcessArg = nextPrevProcessArg + '' }) const config: Config = { - process: { + certificates: { unsafeSSL: unsafeSSL, certificates: certs.filter((c) => c !== undefined && c !== null && c.length !== 0), }, @@ -55,6 +58,9 @@ const config: Config = { port: port, watchdog: !disableWatchdog, }, + health: { + port: healthPort, + }, } export { config, logPath, logLevel, disableWatchdog } diff --git a/packages/live-status-gateway/src/connector.ts b/packages/live-status-gateway/src/connector.ts index d38c3b2585..4b059a6444 100644 --- a/packages/live-status-gateway/src/connector.ts +++ b/packages/live-status-gateway/src/connector.ts @@ -1,28 +1,33 @@ import { CoreHandler, CoreConfig } from './coreHandler.js' import { Logger } from 'winston' -import { Process } from './process.js' import { PeripheralDeviceId } from '@sofie-automation/shared-lib/dist/core/model/Ids' import { LiveStatusServer } from './liveStatusServer.js' +import { + CertificatesConfig, + HealthConfig, + HealthEndpoints, + IConnector, + loadCertificatesFromDisk, + stringifyError, +} from '@sofie-automation/server-core-integration' export interface Config { - process: ProcessConfig + certificates: CertificatesConfig device: DeviceConfig core: CoreConfig + health: HealthConfig } -export interface ProcessConfig { - /** Will cause the Node applocation to blindly accept all certificates. Not recommenced unless in local, controlled networks. */ - unsafeSSL: boolean - /** Paths to certificates to load, for SSL-connections */ - certificates: string[] -} + export interface DeviceConfig { deviceId: PeripheralDeviceId deviceToken: string } -export class Connector { +export class Connector implements IConnector { + public initialized = false + public initializedError: string | undefined = undefined + private coreHandler: CoreHandler | undefined private _logger: Logger - private _process: Process | undefined private _liveStatusServer: LiveStatusServer | undefined constructor(logger: Logger) { @@ -31,14 +36,15 @@ export class Connector { public async init(config: Config): Promise { try { - this._logger.info('Initializing Process...') - this._process = new Process(this._logger) - this._process.init(config.process) - this._logger.info('Process initialized') + this._logger.info('Initializing Certificates...') + const certificates = loadCertificatesFromDisk(this._logger, config.certificates) + this._logger.info('Certificates initialized') this._logger.info('Initializing Core...') this.coreHandler = new CoreHandler(this._logger, config.device) - await this.coreHandler.init(config.core, this._process) + new HealthEndpoints(this, this.coreHandler, config.health) + + await this.coreHandler.init(config.core, certificates) this._logger.info('Core initialized') if (!this.coreHandler.studioId) throw new Error('Device has no studioId') @@ -53,6 +59,8 @@ export class Connector { this._logger.error(e) this._logger.error(e.stack) + this.initializedError = stringifyError(e) + try { if (this.coreHandler) { this.coreHandler.destroy().catch(this._logger.error) diff --git a/packages/live-status-gateway/src/coreHandler.ts b/packages/live-status-gateway/src/coreHandler.ts index a78966e33f..79767e4021 100644 --- a/packages/live-status-gateway/src/coreHandler.ts +++ b/packages/live-status-gateway/src/coreHandler.ts @@ -3,6 +3,7 @@ import { CoreConnection, CoreOptions, DDPConnectorOptions, + ICoreHandler, Observer, PeripheralDevicePubSub, PeripheralDevicePubSubCollections, @@ -13,7 +14,6 @@ import { } from '@sofie-automation/server-core-integration' import { DeviceConfig } from './connector.js' import { Logger } from 'winston' -import { Process } from './process.js' import { LIVE_STATUS_DEVICE_CONFIG } from './configManifest.js' import { PeripheralDeviceCategory, @@ -36,7 +36,7 @@ export interface CoreConfig { /** * Represents a connection between the Gateway and Core */ -export class CoreHandler { +export class CoreHandler implements ICoreHandler { core!: CoreConnection< CorelibPubSubTypes & PeripheralDevicePubSubTypes, CorelibPubSubCollections & PeripheralDevicePubSubCollections @@ -45,30 +45,27 @@ export class CoreHandler { public _observers: Array = [] public deviceSettings: LiveStatusGatewayConfig = {} - public errorReporting = false - public multithreading = false - public reportAllCommands = false - private _deviceOptions: DeviceConfig - private _onConnected?: () => any private _executedFunctions = new Set() private _coreConfig?: CoreConfig - private _process?: Process private _studioId: StudioId | undefined private _statusInitialized = false private _statusDestroyed = false + public get connectedToCore(): boolean { + return this.core && this.core.connected + } + constructor(logger: Logger, deviceOptions: DeviceConfig) { this.logger = logger this._deviceOptions = deviceOptions } - async init(config: CoreConfig, process: Process): Promise { + async init(config: CoreConfig, certificates: Buffer[]): Promise { this._statusInitialized = false this._coreConfig = config - this._process = process this.core = new CoreConnection( this.getCoreConnectionOptions() @@ -79,7 +76,6 @@ export class CoreHandler { this.setupObserversAndSubscriptions().catch((e) => { this.logger.error('Core Error during setupObserversAndSubscriptions:', e) }) - if (this._onConnected) this._onConnected() }) this.core.onDisconnected(() => { this.logger.warn('Core Disconnected!') @@ -91,11 +87,7 @@ export class CoreHandler { const ddpConfig: DDPConnectorOptions = { host: config.host, port: config.port, - } - if (this._process && this._process.certificates.length) { - ddpConfig.tlsOpts = { - ca: this._process.certificates, - } + tlsOpts: certificates.length ? { ca: certificates } : undefined, } await this.core.init(ddpConfig) @@ -103,7 +95,6 @@ export class CoreHandler { this.logger.info('Core id: ' + this.core.deviceId) await this.setupObserversAndSubscriptions() - if (this._onConnected) this._onConnected() this._statusInitialized = true await this.updateCoreStatus() @@ -186,9 +177,6 @@ export class CoreHandler { return options } - onConnected(fcn: () => any): void { - this._onConnected = fcn - } onDeviceChanged(): void { const col = this.core.getCollection(PeripheralDevicePubSubCollectionsNames.peripheralDeviceForDevice) @@ -325,7 +313,10 @@ export class CoreHandler { this.logger.info('getDevicesInfo') return [] } - async updateCoreStatus(): Promise { + getCoreStatus(): { + statusCode: StatusCode + messages: string[] + } { let statusCode = StatusCode.GOOD const messages: Array = [] @@ -337,11 +328,13 @@ export class CoreHandler { statusCode = StatusCode.BAD messages.push('Shut down') } - - return this.core.setStatus({ - statusCode: statusCode, - messages: messages, - }) + return { + statusCode, + messages, + } + } + async updateCoreStatus(): Promise { + return this.core.setStatus(this.getCoreStatus()) } private _getVersions() { const versions: { [packageName: string]: string } = {} diff --git a/packages/live-status-gateway/src/liveStatusServer.ts b/packages/live-status-gateway/src/liveStatusServer.ts index 00baa68fea..f36afb9195 100644 --- a/packages/live-status-gateway/src/liveStatusServer.ts +++ b/packages/live-status-gateway/src/liveStatusServer.ts @@ -34,6 +34,7 @@ import { NotificationsHandler } from './collections/notifications/notificationsH import { NotificationsTopic } from './topics/notificationsTopic.js' import { PlaylistNotificationsHandler } from './collections/notifications/playlistNotificationsHandler.js' import { RundownNotificationsHandler } from './collections/notifications/rundownNotificationsHandler.js' +import { wsConnectionsGauge } from './wsMetrics.js' export interface CollectionHandlers { studioHandler: StudioHandler @@ -153,8 +154,10 @@ export class LiveStatusServer { this._logger.info(`Closing websocket`) rootChannel.removeSubscriber(ws) this._clients.delete(ws) + wsConnectionsGauge.set(this._clients.size) }) this._clients.add(ws) + wsConnectionsGauge.set(this._clients.size) if (typeof request.url === 'string' && request.url === '/') { rootChannel.addSubscriber(ws) diff --git a/packages/live-status-gateway/src/process.ts b/packages/live-status-gateway/src/process.ts deleted file mode 100644 index 58eaef7cdd..0000000000 --- a/packages/live-status-gateway/src/process.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { Logger } from 'winston' -import * as fs from 'fs' -import { ProcessConfig } from './connector.js' - -export class Process { - logger: Logger - - public certificates: Buffer[] = [] - - constructor(logger: Logger) { - this.logger = logger - } - init(processConfig: ProcessConfig): void { - if (processConfig.unsafeSSL) { - this.logger.info('Disabling NODE_TLS_REJECT_UNAUTHORIZED, be sure to ONLY DO THIS ON A LOCAL NETWORK!') - process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0' - } - - if (processConfig.certificates.length) { - this.logger.info(`Loading certificates...`) - for (const certificate of processConfig.certificates) { - try { - this.certificates.push(fs.readFileSync(certificate)) - this.logger.info(`Using certificate "${certificate}"`) - } catch (error) { - this.logger.error(`Error loading certificate "${certificate}"`, error) - } - } - } - } -} diff --git a/packages/live-status-gateway/src/topics/root.ts b/packages/live-status-gateway/src/topics/root.ts index 807af7ad1c..5215ba6031 100644 --- a/packages/live-status-gateway/src/topics/root.ts +++ b/packages/live-status-gateway/src/topics/root.ts @@ -10,6 +10,7 @@ import { SubscriptionStatus, SubscriptionName, } from '@sofie-automation/live-status-gateway-api' +import { activeSubscriptionsGauge, subscriptionSubscribersGauge } from '../wsMetrics.js' enum PublishMsg { ping = 'ping', @@ -41,6 +42,7 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic { removeSubscriber(ws: WebSocket): void { super.removeSubscriber(ws) this._topics.forEach((h) => h.removeSubscriber(ws)) + this._updateSubscriptionMetrics() } processMessage(ws: WebSocket, msg: object): void { @@ -74,6 +76,16 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic { if (Object.values(SubscriptionName).includes(channel)) this._topics.set(channel, topic) } + private _updateSubscriptionMetrics(): void { + let total = 0 + for (const [name, topic] of this._topics) { + const count = topic.subscriberCount + subscriptionSubscribersGauge.set({ subscription: name }, count) + total += count + } + activeSubscriptionsGauge.set(total) + } + subscribe(ws: WebSocket, name: SubscriptionName, reqid: number): void { const topic = this._topics.get(name) const curUnsubscribed = @@ -91,6 +103,7 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic { }) ) topic.addSubscriber(ws) + this._updateSubscriptionMetrics() } else { this.sendMessage( ws, @@ -112,6 +125,7 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic { const curSubscribed = topic && topic.hasSubscriber(ws) && Object.values(SubscriptionName).includes(name) if (curSubscribed) { topic.removeSubscriber(ws) + this._updateSubscriptionMetrics() this.sendMessage( ws, literal({ diff --git a/packages/live-status-gateway/src/wsHandler.ts b/packages/live-status-gateway/src/wsHandler.ts index b3d7f36b76..fef08aa3c3 100644 --- a/packages/live-status-gateway/src/wsHandler.ts +++ b/packages/live-status-gateway/src/wsHandler.ts @@ -23,6 +23,10 @@ export abstract class WebSocketTopicBase { : this.sendStatusToAll } + get subscriberCount(): number { + return this._subscribers.size + } + addSubscriber(ws: WebSocket): void { this._logger.info(`${this._name} adding a websocket subscriber`) this._subscribers.add(ws) @@ -77,6 +81,7 @@ export abstract class WebSocketTopicBase { } export interface WebSocketTopic { + subscriberCount: number addSubscriber(ws: WebSocket): void hasSubscriber(ws: WebSocket): boolean removeSubscriber(ws: WebSocket): void diff --git a/packages/live-status-gateway/src/wsMetrics.ts b/packages/live-status-gateway/src/wsMetrics.ts new file mode 100644 index 0000000000..d9ce8983cd --- /dev/null +++ b/packages/live-status-gateway/src/wsMetrics.ts @@ -0,0 +1,17 @@ +import { MetricsGauge } from '@sofie-automation/server-core-integration/dist/lib/prometheus' + +export const wsConnectionsGauge = new MetricsGauge({ + name: 'sofie_lsg_websocket_connections', + help: 'Number of open WebSocket connections', +}) + +export const activeSubscriptionsGauge = new MetricsGauge({ + name: 'sofie_lsg_active_subscriptions_total', + help: 'Total number of active subscriptions across all topics', +}) + +export const subscriptionSubscribersGauge = new MetricsGauge({ + name: 'sofie_lsg_subscription_subscribers', + help: 'Number of subscribers per subscription', + labelNames: ['subscription'] as const, +}) diff --git a/packages/mos-gateway/src/CoreMosDeviceHandler.ts b/packages/mos-gateway/src/CoreMosDeviceHandler.ts index faff9a50a7..d9a5dfaa92 100644 --- a/packages/mos-gateway/src/CoreMosDeviceHandler.ts +++ b/packages/mos-gateway/src/CoreMosDeviceHandler.ts @@ -35,6 +35,12 @@ import { PartialDeep } from 'type-fest' import type { CoreHandler } from './coreHandler.js' import { CoreConnectionChild } from '@sofie-automation/server-core-integration/dist/lib/CoreConnectionChild' import { Queue } from '@sofie-automation/server-core-integration/dist/lib/queue' +import { + mosDeviceConnectedGauge, + mosMessagesFailedCounter, + mosMessagesReceivedCounter, + mosQueueDepthGauge, +} from './mosMetrics.js' function deepMatch(object: any, attrs: any, deep: boolean): boolean { const keys = Object.keys(attrs) @@ -197,6 +203,18 @@ export class CoreMosDeviceHandler { messages: messages, }) .catch((e) => this._coreParentHandler.logger.warn('Error when setting status:' + e)) + + const deviceId = this._mosDevice.idPrimary + mosDeviceConnectedGauge.set( + { device_id: deviceId, connection: 'primary' }, + connectionStatus.PrimaryConnected ? 1 : 0 + ) + if (this._mosDevice.idSecondary) { + mosDeviceConnectedGauge.set( + { device_id: deviceId, connection: 'secondary' }, + connectionStatus.SecondaryConnected ? 1 : 0 + ) + } } async getMachineInfo(): Promise { const info: IMOSListMachInfo = { @@ -456,8 +474,15 @@ export class CoreMosDeviceHandler { return this.fixMosData(attr) }) as any + const deviceId = this._mosDevice.idPrimary + const commandName = methodName as string + mosMessagesReceivedCounter.inc({ device_id: deviceId, command: commandName }) + mosQueueDepthGauge.inc({ device_id: deviceId }) + // Make the commands be sent sequantially: return this._messageQueue.putOnQueue(async () => { + mosQueueDepthGauge.dec({ device_id: deviceId }) + // Log info about the sent command: let msg = 'Command: ' + methodName const attr0 = attrs[0] as any | undefined @@ -476,6 +501,7 @@ export class CoreMosDeviceHandler { const res = (this.core.coreMethods[methodName] as any)(...attrs) return res.catch((e: any) => { this._coreParentHandler.logger.info('MOS command rejected: ' + ((e && JSON.stringify(e)) || e)) + mosMessagesFailedCounter.inc({ device_id: deviceId, command: commandName }) throw e }) }) diff --git a/packages/mos-gateway/src/connector.ts b/packages/mos-gateway/src/connector.ts index f7fa4d7dde..043b50fc75 100644 --- a/packages/mos-gateway/src/connector.ts +++ b/packages/mos-gateway/src/connector.ts @@ -39,9 +39,9 @@ export class Connector implements IConnector { this._config = config try { - this._logger.info('Initializing Process...') + this._logger.info('Initializing Certificates...') const certificates = loadCertificatesFromDisk(this._logger, config.certificates) - this._logger.info('Process initialized') + this._logger.info('Certificates initialized') this._logger.info('Initializing Core...') this.coreHandler = await CoreHandler.create( diff --git a/packages/mos-gateway/src/coreHandler.ts b/packages/mos-gateway/src/coreHandler.ts index 2fbbf479d0..bf90a0bebc 100644 --- a/packages/mos-gateway/src/coreHandler.ts +++ b/packages/mos-gateway/src/coreHandler.ts @@ -34,7 +34,6 @@ export class CoreHandler implements ICoreHandler { core: CoreConnection | undefined logger: Winston.Logger public _observers: Array> = [] - public connectedToCore = false private _deviceOptions: DeviceConfig private _coreMosHandlers: Array = [] private _onConnected?: () => any @@ -42,7 +41,10 @@ export class CoreHandler implements ICoreHandler { private _isDestroyed = false private _executedFunctions = new Set() private _coreConfig?: CoreConfig - private _certificates?: Buffer[] + + public get connectedToCore(): boolean { + return !!this.core && this.core.connected + } public static async create( logger: Winston.Logger, @@ -63,17 +65,14 @@ export class CoreHandler implements ICoreHandler { private async init(config: CoreConfig, certificates: Buffer[]): Promise { // this.logger.info('========') this._coreConfig = config - this._certificates = certificates this.core = new CoreConnection(this.getCoreConnectionOptions()) this.core.onConnected(() => { this.logger.info('Core Connected!') - this.connectedToCore = true if (this._isInitialized) this.onConnectionRestored() }) this.core.onDisconnected(() => { this.logger.info('Core Disconnected!') - this.connectedToCore = false }) this.core.onError((err) => { this.logger.error('Core Error: ' + (typeof err === 'string' ? err : err.message || err.toString())) @@ -82,11 +81,7 @@ export class CoreHandler implements ICoreHandler { const ddpConfig: DDPConnectorOptions = { host: config.host, port: config.port, - } - if (this._certificates?.length) { - ddpConfig.tlsOpts = { - ca: this._certificates, - } + tlsOpts: certificates.length ? { ca: certificates } : undefined, } await this.core.init(ddpConfig) diff --git a/packages/mos-gateway/src/mosHandler.ts b/packages/mos-gateway/src/mosHandler.ts index c6337a072b..b2ac62abac 100644 --- a/packages/mos-gateway/src/mosHandler.ts +++ b/packages/mos-gateway/src/mosHandler.ts @@ -40,6 +40,7 @@ import { PeripheralDeviceForDevice } from '@sofie-automation/server-core-integra import _ from 'underscore' import { MosStatusHandler } from './mosStatus/handler.js' import { isPromise } from 'util/types' +import { mosDevicesTotalGauge } from './mosMetrics.js' export interface MosConfig { self: IConnectionConfig @@ -539,6 +540,7 @@ export class MosHandler { mosDevice: mosDevice, deviceOptions, }) + mosDevicesTotalGauge.set(this._allMosDevices.size) await this.setupMosDevice(mosDevice) @@ -592,6 +594,7 @@ export class MosHandler { private async _removeDevice(deviceId: string): Promise { const deviceEntry = this._allMosDevices.get(deviceId) this._allMosDevices.delete(deviceId) + mosDevicesTotalGauge.set(this._allMosDevices.size) if (deviceEntry) { const mosDevice = deviceEntry.mosDevice diff --git a/packages/mos-gateway/src/mosMetrics.ts b/packages/mos-gateway/src/mosMetrics.ts new file mode 100644 index 0000000000..60992a4e73 --- /dev/null +++ b/packages/mos-gateway/src/mosMetrics.ts @@ -0,0 +1,51 @@ +import { + MetricsCounter, + MetricsGauge, +} from '@sofie-automation/server-core-integration/dist/lib/prometheus' + +export const mosDevicesTotalGauge = new MetricsGauge({ + name: 'sofie_mos_gateway_devices_total', + help: 'Number of configured MOS sub-devices', +}) + +export const mosDeviceConnectedGauge = new MetricsGauge({ + name: 'sofie_mos_gateway_device_connected', + help: 'Connection status of a MOS device (1 = connected, 0 = disconnected)', + labelNames: ['device_id', 'connection'] as const, +}) + +export const mosMessagesReceivedCounter = new MetricsCounter({ + name: 'sofie_mos_gateway_messages_received_total', + help: 'Total number of MOS commands received from the NRCS', + labelNames: ['device_id', 'command'] as const, +}) + +export const mosMessagesFailedCounter = new MetricsCounter({ + name: 'sofie_mos_gateway_messages_failed_total', + help: 'Total number of MOS commands that failed when forwarding to Core', + labelNames: ['device_id', 'command'] as const, +}) + +export const mosQueueDepthGauge = new MetricsGauge({ + name: 'sofie_mos_gateway_queue_depth', + help: 'Number of MOS commands currently waiting in the Core-forwarding queue', + labelNames: ['device_id'] as const, +}) + +export const mosStatusSentCounter = new MetricsCounter({ + name: 'sofie_mos_gateway_status_sent_total', + help: 'Total number of story/item status messages sent back to the NRCS', + labelNames: ['device_id', 'status_type', 'mos_status'] as const, +}) + +export const mosStatusSkippedCounter = new MetricsCounter({ + name: 'sofie_mos_gateway_status_skipped_total', + help: 'Total number of story/item status updates that were skipped', + labelNames: ['device_id', 'reason'] as const, +}) + +export const mosStatusQueueDepthGauge = new MetricsGauge({ + name: 'sofie_mos_gateway_status_queue_depth', + help: 'Number of status write-back operations currently waiting in the queue', + labelNames: ['device_id'] as const, +}) diff --git a/packages/mos-gateway/src/mosStatus/handler.ts b/packages/mos-gateway/src/mosStatus/handler.ts index 38b89616d9..bdc6feb6a9 100644 --- a/packages/mos-gateway/src/mosStatus/handler.ts +++ b/packages/mos-gateway/src/mosStatus/handler.ts @@ -21,6 +21,7 @@ import type { RundownId } from '@sofie-automation/shared-lib/dist/core/model/Ids import type * as winston from 'winston' import { Queue } from '@sofie-automation/server-core-integration/dist/lib/queue' import { diffStatuses } from './diff.js' +import { mosStatusQueueDepthGauge, mosStatusSentCounter, mosStatusSkippedCounter } from '../mosMetrics.js' export class MosStatusHandler { readonly #logger: winston.Logger @@ -98,10 +99,15 @@ export class MosStatusHandler { // New implementation 2022 only sends PLAY, never stop, after getting advice from AP // Reason 1: NRK ENPS "sendt tid" (elapsed time) stopped working in ENPS 8/9 when doing STOP prior to PLAY // Reason 2: there's a delay between the STOP (yellow line disappears) and PLAY (yellow line re-appears), which annoys the users - if (this.#config.onlySendPlay && status.mosStatus !== IMOSObjectStatus.PLAY) continue + if (this.#config.onlySendPlay && status.mosStatus !== IMOSObjectStatus.PLAY) { + mosStatusSkippedCounter.inc({ device_id: this.#mosDevice.idPrimary, reason: 'only_send_play' }) + continue + } + mosStatusQueueDepthGauge.inc({ device_id: this.#mosDevice.idPrimary }) this.#messageQueue .putOnQueue(async () => { + mosStatusQueueDepthGauge.dec({ device_id: this.#mosDevice.idPrimary }) if (this.#isDeviceConnected()) { if (status.type === 'item') { const newStatus: IMOSItemStatus = { @@ -115,6 +121,11 @@ export class MosStatusHandler { // Send status await this.#mosDevice.sendItemStatus(newStatus) + mosStatusSentCounter.inc({ + device_id: this.#mosDevice.idPrimary, + status_type: 'item', + mos_status: String(status.mosStatus), + }) } else if (status.type === 'story') { const newStatus: IMOSStoryStatus = { RunningOrderId: this.#mosTypes.mosString128.create(status.rundownExternalId), @@ -126,6 +137,11 @@ export class MosStatusHandler { // Send status await this.#mosDevice.sendStoryStatus(newStatus) + mosStatusSentCounter.inc({ + device_id: this.#mosDevice.idPrimary, + status_type: 'story', + mos_status: String(status.mosStatus), + }) } else { this.#logger.debug(`Discarding unknown queued status: ${JSON.stringify(status)}`) assertNever(status) @@ -133,8 +149,10 @@ export class MosStatusHandler { } else if (this.#config.onlySendPlay) { // No need to do anything. this.#logger.info(`Not connected, skipping play status: ${JSON.stringify(status)}`) + mosStatusSkippedCounter.inc({ device_id: this.#mosDevice.idPrimary, reason: 'not_connected' }) } else { this.#logger.info(`Not connected, discarding status: ${JSON.stringify(status)}`) + mosStatusSkippedCounter.inc({ device_id: this.#mosDevice.idPrimary, reason: 'not_connected' }) } }) .catch((e) => { diff --git a/packages/playout-gateway/src/connector.ts b/packages/playout-gateway/src/connector.ts index 71dbaf6eed..aa88fda008 100644 --- a/packages/playout-gateway/src/connector.ts +++ b/packages/playout-gateway/src/connector.ts @@ -32,7 +32,6 @@ export class Connector implements IConnector { private tsrHandler: TSRHandler | undefined private coreHandler: CoreHandler | undefined private _logger: Logger - private _certificates: Buffer[] | undefined constructor(logger: Logger) { this._logger = logger @@ -41,14 +40,14 @@ export class Connector implements IConnector { public async init(config: Config): Promise { try { this._logger.info('Initializing Certificates...') - this._certificates = loadCertificatesFromDisk(this._logger, config.certificates) + const certificates = loadCertificatesFromDisk(this._logger, config.certificates) this._logger.info('Certificates initialized') this._logger.info('Initializing Core...') this.coreHandler = new CoreHandler(this._logger, config.device) new HealthEndpoints(this, this.coreHandler, config.health) - await this.coreHandler.init(config.core, this._certificates) + await this.coreHandler.init(config.core, certificates) this._logger.info('Core initialized') this._logger.info('Initializing TSR...') diff --git a/packages/playout-gateway/src/coreHandler.ts b/packages/playout-gateway/src/coreHandler.ts index fe6cbdecf2..48be2f3fc5 100644 --- a/packages/playout-gateway/src/coreHandler.ts +++ b/packages/playout-gateway/src/coreHandler.ts @@ -55,12 +55,13 @@ export class CoreHandler implements ICoreHandler { private _executedFunctions = new Set() private _tsrHandler?: TSRHandler private _coreConfig?: CoreConfig - private _certificates?: Buffer[] private _statusInitialized = false private _statusDestroyed = false - public connectedToCore = false + public get connectedToCore(): boolean { + return this.core && this.core.connected + } constructor(logger: Logger, deviceOptions: DeviceConfig) { this.logger = logger @@ -70,19 +71,16 @@ export class CoreHandler implements ICoreHandler { async init(config: CoreConfig, certificates: Buffer[]): Promise { this._statusInitialized = false this._coreConfig = config - this._certificates = certificates this.core = new CoreConnection(this.getCoreConnectionOptions()) this.core.onConnected(() => { this.logger.info('Core Connected!') - this.connectedToCore = true if (this._onConnected) this._onConnected() }) this.core.onDisconnected(() => { this.logger.warn('Core Disconnected!') - this.connectedToCore = false }) this.core.onError((err: any) => { this.logger.error('Core Error: ' + (typeof err === 'string' ? err : err.message || err.toString() || err)) @@ -91,11 +89,7 @@ export class CoreHandler implements ICoreHandler { const ddpConfig: DDPConnectorOptions = { host: config.host, port: config.port, - } - if (this._certificates.length) { - ddpConfig.tlsOpts = { - ca: this._certificates, - } + tlsOpts: certificates.length ? { ca: certificates } : undefined, } await this.core.init(ddpConfig) diff --git a/packages/playout-gateway/src/playoutMetrics.ts b/packages/playout-gateway/src/playoutMetrics.ts new file mode 100644 index 0000000000..271f135996 --- /dev/null +++ b/packages/playout-gateway/src/playoutMetrics.ts @@ -0,0 +1,57 @@ +import { + MetricsCounter, + MetricsGauge, + MetricsHistogram, +} from '@sofie-automation/server-core-integration/dist/lib/prometheus' + +export const playoutDevicesTotalGauge = new MetricsGauge({ + name: 'sofie_playout_gateway_devices_total', + help: 'Total number of TSR devices under management', +}) + +export const playoutDeviceConnectedGauge = new MetricsGauge({ + name: 'sofie_playout_gateway_device_connected', + help: 'Whether a TSR device is connected (1) or not (0)', + labelNames: ['device_id', 'device_type'] as const, +}) + +export const playoutResolveDurationHistogram = new MetricsHistogram({ + name: 'sofie_playout_gateway_resolve_duration_seconds', + help: 'Time spent resolving the timeline, in seconds', + buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10], +}) + +export const playoutTimelineAgeGauge = new MetricsGauge({ + name: 'sofie_playout_gateway_timeline_age_seconds', + help: 'Age of the timeline at the moment it finished resolving, measured from when Sofie Core generated it', +}) + +export const playoutSlowSentCommandsCounter = new MetricsCounter({ + name: 'sofie_playout_gateway_slow_sent_commands_total', + help: 'Number of commands that were slow to be sent', + labelNames: ['device_id', 'device_type'] as const, +}) + +export const playoutSlowFulfilledCommandsCounter = new MetricsCounter({ + name: 'sofie_playout_gateway_slow_fulfilled_commands_total', + help: 'Number of commands that were slow to be fulfilled by the device', + labelNames: ['device_id', 'device_type'] as const, +}) + +export const playoutCommandErrorsCounter = new MetricsCounter({ + name: 'sofie_playout_gateway_command_errors_total', + help: 'Number of commands that resulted in an error', + labelNames: ['device_id', 'device_type'] as const, +}) + +export const playoutCommandsSentCounter = new MetricsCounter({ + name: 'sofie_playout_gateway_commands_sent_total', + help: 'Number of commands sent to devices (only increments when reportAllCommands is enabled per device)', + labelNames: ['device_id', 'device_type'] as const, +}) + +export const playoutPlaybackCallbacksCounter = new MetricsCounter({ + name: 'sofie_playout_gateway_playback_callbacks_total', + help: 'Number of playback timeline callbacks received from TSR', + labelNames: ['type'] as const, +}) diff --git a/packages/playout-gateway/src/tsrHandler.ts b/packages/playout-gateway/src/tsrHandler.ts index 617f3c96b6..7fcd6fb356 100644 --- a/packages/playout-gateway/src/tsrHandler.ts +++ b/packages/playout-gateway/src/tsrHandler.ts @@ -48,6 +48,17 @@ import { } from '@sofie-automation/server-core-integration' import { BaseRemoteDeviceIntegration } from 'timeline-state-resolver/dist/service/remoteDeviceInstance' import { TSRDeviceRegistry } from './tsrDeviceRegistry.js' +import { + playoutDevicesTotalGauge, + playoutDeviceConnectedGauge, + playoutResolveDurationHistogram, + playoutTimelineAgeGauge, + playoutSlowSentCommandsCounter, + playoutSlowFulfilledCommandsCounter, + playoutCommandErrorsCounter, + playoutCommandsSentCounter, + playoutPlaybackCallbacksCounter, +} from './playoutMetrics.js' const debug = Debug('playout-gateway') @@ -85,6 +96,7 @@ export class TSRHandler { private _triggerUpdateDevicesTimeout: NodeJS.Timeout | undefined private _debugStates: Map = new Map() + private _pendingTimelineGeneratedAt: Map = new Map() constructor(logger: Logger) { this.logger = logger @@ -165,6 +177,13 @@ export class TSRHandler { this.handleTSRTimelineCallback(time, objId, callbackName, data) }) this.tsr.on('resolveDone', (timelineHash: string, resolveDuration: number) => { + playoutResolveDurationHistogram.observe(resolveDuration / 1000) + const generatedAt = this._pendingTimelineGeneratedAt.get(timelineHash) + if (generatedAt !== undefined) { + playoutTimelineAgeGauge.set((Date.now() - generatedAt) / 1000) + this._pendingTimelineGeneratedAt.delete(timelineHash) + } + // Make sure we only report back once, per update timeline if (this._lastReportedObjHashes.includes(timelineHash)) return @@ -216,6 +235,7 @@ export class TSRHandler { this.tsr.connectionManager.on('connectionAdded', (id, container) => { const coreTsrHandler = new CoreTSRDeviceHandler(this._coreHandler, Promise.resolve(container), id) this._coreTsrHandlers[id] = coreTsrHandler + playoutDevicesTotalGauge.set(Object.keys(this._coreTsrHandlers).length) // set the status to uninitialized for now: coreTsrHandler.statusChanged( @@ -248,10 +268,16 @@ export class TSRHandler { return } + const removedDeviceType = coreTsrHandler._device?.deviceType + const removedDeviceTypeName = + removedDeviceType !== undefined ? (DeviceType[removedDeviceType] ?? 'unknown') : 'unknown' + coreTsrHandler.dispose('removeSubDevice').catch((e) => { this.logger.error('Failed to dispose of coreTsrHandler for ' + id + ': ' + e) }) delete this._coreTsrHandlers[id] + playoutDevicesTotalGauge.set(Object.keys(this._coreTsrHandlers).length) + playoutDeviceConnectedGauge.set({ device_id: id, device_type: removedDeviceTypeName }, 0) }) const fixLog = (id: string, e: string): string => { @@ -286,6 +312,13 @@ export class TSRHandler { if (!coreTsrHandler) return if (!coreTsrHandler._device) return // Not initialized yet + const changedDeviceType = coreTsrHandler._device.deviceType + const changedDeviceTypeName = DeviceType[changedDeviceType] ?? 'unknown' + playoutDeviceConnectedGauge.set( + { device_id: id, device_type: changedDeviceTypeName }, + status.statusCode <= StatusCode.WARNING_MAJOR ? 1 : 0 + ) + coreTsrHandler.statusChanged(status) if (!coreTsrHandler._device) return @@ -315,6 +348,12 @@ export class TSRHandler { } }) this.tsr.connectionManager.on('connectionEvent:slowSentCommand', (id, info) => { + const deviceType0 = this._coreTsrHandlers[id]?._device?.deviceType + playoutSlowSentCommandsCounter.inc({ + device_id: id, + device_type: deviceType0 !== undefined ? (DeviceType[deviceType0] ?? 'unknown') : 'unknown', + }) + // If the internalDelay is too large, it should be logged as an error, // since something took too long internally. @@ -331,6 +370,12 @@ export class TSRHandler { } }) this.tsr.connectionManager.on('connectionEvent:slowFulfilledCommand', (id, info) => { + const deviceType1 = this._coreTsrHandlers[id]?._device?.deviceType + playoutSlowFulfilledCommandsCounter.inc({ + device_id: id, + device_type: deviceType1 !== undefined ? (DeviceType[deviceType1] ?? 'unknown') : 'unknown', + }) + // Note: we don't emit slow fulfilled commands as error, since // the fulfillment of them lies on the device being controlled, not on us. @@ -340,10 +385,22 @@ export class TSRHandler { }) }) this.tsr.connectionManager.on('connectionEvent:commandError', (id, error, context) => { + const deviceType2 = this._coreTsrHandlers[id]?._device?.deviceType + playoutCommandErrorsCounter.inc({ + device_id: id, + device_type: deviceType2 !== undefined ? (DeviceType[deviceType2] ?? 'unknown') : 'unknown', + }) + // todo: handle this better this.logger.error(fixError(id, error), { context }) }) - this.tsr.connectionManager.on('connectionEvent:commandReport', (_id, commandReport) => { + this.tsr.connectionManager.on('connectionEvent:commandReport', (id, commandReport) => { + const deviceType3 = this._coreTsrHandlers[id]?._device?.deviceType + playoutCommandsSentCounter.inc({ + device_id: id, + device_type: deviceType3 !== undefined ? (DeviceType[deviceType3] ?? 'unknown') : 'unknown', + }) + if (this._reportAllCommands) { // Todo: send these to Core this.logger.info('commandReport', { @@ -589,6 +646,7 @@ export class TSRHandler { } const transformedTimeline = this._transformTimeline(deserializeTimelineBlob(timeline.timelineBlob)) + this._pendingTimelineGeneratedAt.set(unprotectString(timeline.timelineHash), timeline.generated) this.tsr.timelineHash = unprotectString(timeline.timelineHash) this.tsr.setTimelineAndMappings(transformedTimeline, unprotectObject(mappingsObject.mappings)) } @@ -882,6 +940,7 @@ export class TSRHandler { return } const callbackName = callbackName0 as PeripheralDeviceAPI.PlayoutChangedType + playoutPlaybackCallbacksCounter.inc({ type: callbackName }) // debounce if (this.changedResults && this.changedResults.rundownPlaylistId !== data.rundownPlaylistId) { // The playlistId changed. Send what we have right away and reset: diff --git a/packages/server-core-integration/package.json b/packages/server-core-integration/package.json index ff8e3f3ed9..ba28063d5c 100644 --- a/packages/server-core-integration/package.json +++ b/packages/server-core-integration/package.json @@ -77,6 +77,7 @@ "faye-websocket": "^0.11.4", "got": "^11.8.6", "koa": "^3.1.1", + "prom-client": "^15.1.3", "tslib": "^2.8.1", "underscore": "^1.13.7" }, diff --git a/packages/server-core-integration/src/lib/coreConnection.ts b/packages/server-core-integration/src/lib/coreConnection.ts index c5f59ab703..ec832f2920 100644 --- a/packages/server-core-integration/src/lib/coreConnection.ts +++ b/packages/server-core-integration/src/lib/coreConnection.ts @@ -148,11 +148,11 @@ export class CoreConnection< }) this._ddp.on('connected', () => { // this.emit('connected') - if (this._watchDog) this._watchDog.addCheck(async () => this._watchDogCheck()) + if (this._watchDog) this._watchDog.addCheck(this._watchDogCheck) }) this._ddp.on('disconnected', () => { // this.emit('disconnected') - if (this._watchDog) this._watchDog.removeCheck(async () => this._watchDogCheck()) + if (this._watchDog) this._watchDog.removeCheck(this._watchDogCheck) }) this._ddp.on('message', () => { if (this._watchDog) this._watchDog.receivedData() @@ -434,7 +434,7 @@ export class CoreConnection< return this.coreMethods.initialize(options) } - private async _watchDogCheck() { + private _watchDogCheck = async () => { /* Randomize a message and send it to Core. Core should then reply with triggering executeFunction with the "pingResponse" method. diff --git a/packages/server-core-integration/src/lib/health.ts b/packages/server-core-integration/src/lib/health.ts index 05c2e6c9c9..62e46cb8b6 100644 --- a/packages/server-core-integration/src/lib/health.ts +++ b/packages/server-core-integration/src/lib/health.ts @@ -3,6 +3,7 @@ import Router from '@koa/router' import { StatusCode } from '@sofie-automation/shared-lib/dist/lib/status' import { assertNever } from '@sofie-automation/shared-lib/dist/lib/lib' import { IConnector, ICoreHandler } from './gateway-types.js' +import { getPrometheusMetricsString, PrometheusHTTPContentType, setupPrometheusMetrics } from './prometheus.js' export interface HealthConfig { /** If set, exposes health HTTP endpoints on the given port */ @@ -18,10 +19,14 @@ export class HealthEndpoints { constructor( private connector: IConnector, private coreHandler: ICoreHandler, - private config: HealthConfig + private config: HealthConfig, + private customMetrics?: () => Promise ) { if (!config.port) return // disabled + // Setup default prometheus metrics when endpoints are enabled + setupPrometheusMetrics() + const router = new Router() router.get('/healthz', async (ctx) => { @@ -64,6 +69,22 @@ export class HealthEndpoints { ctx.body = 'READY' }) + router.get('/metrics', async (ctx) => { + try { + ctx.response.type = PrometheusHTTPContentType + + const [meteorMetrics, workerMetrics] = await Promise.all([ + getPrometheusMetricsString(), + this.customMetrics?.(), + ]) + + ctx.body = [meteorMetrics, ...(workerMetrics || [])].join('\n\n') + } catch (ex) { + ctx.response.status = 500 + ctx.body = ex + '' + } + }) + this.app.use(router.routes()).use(router.allowedMethods()) this.app.listen(this.config.port) } diff --git a/packages/server-core-integration/src/lib/prometheus.ts b/packages/server-core-integration/src/lib/prometheus.ts new file mode 100644 index 0000000000..4d20a535c5 --- /dev/null +++ b/packages/server-core-integration/src/lib/prometheus.ts @@ -0,0 +1,34 @@ +import { register, collectDefaultMetrics } from 'prom-client' + +// Re-export types, to ensure the correct 'instance' of 'prom-client' is used +export { + Gauge as MetricsGauge, + Counter as MetricsCounter, + Histogram as MetricsHistogram, + Summary as MetricsSummary, +} from 'prom-client' + +/** + * HTTP Content-type header for the metrics + */ +export const PrometheusHTTPContentType = register.contentType + +/** + * Stringified metrics for serving over HTTP + */ +export async function getPrometheusMetricsString(): Promise { + return register.metrics() +} + +/** + * Setup metric reporting for this app + */ +export function setupPrometheusMetrics(): void { + // Label all metrics with the source 'thread' + // register.setDefaultLabels({ + // threadName: threadName, + // }) + + // Collect the default metrics nodejs metrics + collectDefaultMetrics() +} diff --git a/packages/yarn.lock b/packages/yarn.lock index 81b254972a..29ceb8092b 100644 --- a/packages/yarn.lock +++ b/packages/yarn.lock @@ -7131,6 +7131,7 @@ __metadata: faye-websocket: "npm:^0.11.4" got: "npm:^11.8.6" koa: "npm:^3.1.1" + prom-client: "npm:^15.1.3" tslib: "npm:^2.8.1" underscore: "npm:^1.13.7" languageName: unknown