Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/live-status-gateway/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ let deviceToken: string = process.env.DEVICE_TOKEN || ''
let disableWatchdog: boolean = process.env.DISABLE_WATCHDOG === '1' || false
let unsafeSSL: boolean = process.env.UNSAFE_SSL === '1' || false
const certs: string[] = (process.env.CERTIFICATES || '').split(';') || []
let healthPort: number | undefined = parseInt(process.env.HEALTH_PORT + '') || undefined

let prevProcessArg = ''
process.argv.forEach((val) => {
Expand All @@ -37,12 +38,14 @@ process.argv.forEach((val) => {
} else if (val.match(/-unsafeSSL/i)) {
// Will cause the Node applocation to blindly accept all certificates. Not recommenced unless in local, controlled networks.
unsafeSSL = true
} else if (prevProcessArg.match(/-healthPort/i)) {
healthPort = parseInt(val)
}
prevProcessArg = nextPrevProcessArg + ''
})

const config: Config = {
process: {
certificates: {
unsafeSSL: unsafeSSL,
certificates: certs.filter((c) => c !== undefined && c !== null && c.length !== 0),
},
Expand All @@ -55,6 +58,9 @@ const config: Config = {
port: port,
watchdog: !disableWatchdog,
},
health: {
port: healthPort,
},
}

export { config, logPath, logLevel, disableWatchdog }
38 changes: 23 additions & 15 deletions packages/live-status-gateway/src/connector.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
import { CoreHandler, CoreConfig } from './coreHandler.js'
import { Logger } from 'winston'
import { Process } from './process.js'
import { PeripheralDeviceId } from '@sofie-automation/shared-lib/dist/core/model/Ids'
import { LiveStatusServer } from './liveStatusServer.js'
import {
CertificatesConfig,
HealthConfig,
HealthEndpoints,
IConnector,
loadCertificatesFromDisk,
stringifyError,
} from '@sofie-automation/server-core-integration'

export interface Config {
process: ProcessConfig
certificates: CertificatesConfig
device: DeviceConfig
core: CoreConfig
health: HealthConfig
}
export interface ProcessConfig {
/** Will cause the Node applocation to blindly accept all certificates. Not recommenced unless in local, controlled networks. */
unsafeSSL: boolean
/** Paths to certificates to load, for SSL-connections */
certificates: string[]
}

export interface DeviceConfig {
deviceId: PeripheralDeviceId
deviceToken: string
}
export class Connector {
export class Connector implements IConnector {
public initialized = false
public initializedError: string | undefined = undefined

private coreHandler: CoreHandler | undefined
private _logger: Logger
private _process: Process | undefined
private _liveStatusServer: LiveStatusServer | undefined

constructor(logger: Logger) {
Expand All @@ -31,14 +36,15 @@ export class Connector {

public async init(config: Config): Promise<void> {
try {
this._logger.info('Initializing Process...')
this._process = new Process(this._logger)
this._process.init(config.process)
this._logger.info('Process initialized')
this._logger.info('Initializing Certificates...')
const certificates = loadCertificatesFromDisk(this._logger, config.certificates)
this._logger.info('Certificates initialized')

this._logger.info('Initializing Core...')
this.coreHandler = new CoreHandler(this._logger, config.device)
await this.coreHandler.init(config.core, this._process)
new HealthEndpoints(this, this.coreHandler, config.health)

await this.coreHandler.init(config.core, certificates)
this._logger.info('Core initialized')

if (!this.coreHandler.studioId) throw new Error('Device has no studioId')
Expand All @@ -53,6 +59,8 @@ export class Connector {
this._logger.error(e)
this._logger.error(e.stack)

this.initializedError = stringifyError(e)

try {
if (this.coreHandler) {
this.coreHandler.destroy().catch(this._logger.error)
Expand Down
45 changes: 19 additions & 26 deletions packages/live-status-gateway/src/coreHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
CoreConnection,
CoreOptions,
DDPConnectorOptions,
ICoreHandler,
Observer,
PeripheralDevicePubSub,
PeripheralDevicePubSubCollections,
Expand All @@ -13,7 +14,6 @@ import {
} from '@sofie-automation/server-core-integration'
import { DeviceConfig } from './connector.js'
import { Logger } from 'winston'
import { Process } from './process.js'
import { LIVE_STATUS_DEVICE_CONFIG } from './configManifest.js'
import {
PeripheralDeviceCategory,
Expand All @@ -36,7 +36,7 @@ export interface CoreConfig {
/**
* Represents a connection between the Gateway and Core
*/
export class CoreHandler {
export class CoreHandler implements ICoreHandler {
core!: CoreConnection<
CorelibPubSubTypes & PeripheralDevicePubSubTypes,
CorelibPubSubCollections & PeripheralDevicePubSubCollections
Expand All @@ -45,30 +45,27 @@ export class CoreHandler {
public _observers: Array<any> = []
public deviceSettings: LiveStatusGatewayConfig = {}

public errorReporting = false
public multithreading = false
public reportAllCommands = false

private _deviceOptions: DeviceConfig
private _onConnected?: () => any
private _executedFunctions = new Set<PeripheralDeviceCommandId>()
private _coreConfig?: CoreConfig
private _process?: Process

private _studioId: StudioId | undefined

private _statusInitialized = false
private _statusDestroyed = false

public get connectedToCore(): boolean {
return this.core && this.core.connected
}

constructor(logger: Logger, deviceOptions: DeviceConfig) {
this.logger = logger
this._deviceOptions = deviceOptions
}

async init(config: CoreConfig, process: Process): Promise<void> {
async init(config: CoreConfig, certificates: Buffer[]): Promise<void> {
this._statusInitialized = false
this._coreConfig = config
this._process = process

this.core = new CoreConnection<CorelibPubSubTypes & PeripheralDevicePubSubTypes>(
this.getCoreConnectionOptions()
Expand All @@ -79,7 +76,6 @@ export class CoreHandler {
this.setupObserversAndSubscriptions().catch((e) => {
this.logger.error('Core Error during setupObserversAndSubscriptions:', e)
})
if (this._onConnected) this._onConnected()
})
this.core.onDisconnected(() => {
this.logger.warn('Core Disconnected!')
Expand All @@ -91,19 +87,14 @@ export class CoreHandler {
const ddpConfig: DDPConnectorOptions = {
host: config.host,
port: config.port,
}
if (this._process && this._process.certificates.length) {
ddpConfig.tlsOpts = {
ca: this._process.certificates,
}
tlsOpts: certificates.length ? { ca: certificates } : undefined,
}

await this.core.init(ddpConfig)
await this.updateCoreStatus()

this.logger.info('Core id: ' + this.core.deviceId)
await this.setupObserversAndSubscriptions()
if (this._onConnected) this._onConnected()

this._statusInitialized = true
await this.updateCoreStatus()
Expand Down Expand Up @@ -186,9 +177,6 @@ export class CoreHandler {

return options
}
onConnected(fcn: () => any): void {
this._onConnected = fcn
}

onDeviceChanged(): void {
const col = this.core.getCollection(PeripheralDevicePubSubCollectionsNames.peripheralDeviceForDevice)
Expand Down Expand Up @@ -325,7 +313,10 @@ export class CoreHandler {
this.logger.info('getDevicesInfo')
return []
}
async updateCoreStatus(): Promise<any> {
getCoreStatus(): {
statusCode: StatusCode
messages: string[]
} {
let statusCode = StatusCode.GOOD
const messages: Array<string> = []

Expand All @@ -337,11 +328,13 @@ export class CoreHandler {
statusCode = StatusCode.BAD
messages.push('Shut down')
}

return this.core.setStatus({
statusCode: statusCode,
messages: messages,
})
return {
statusCode,
messages,
}
}
async updateCoreStatus(): Promise<any> {
return this.core.setStatus(this.getCoreStatus())
}
private _getVersions() {
const versions: { [packageName: string]: string } = {}
Expand Down
3 changes: 3 additions & 0 deletions packages/live-status-gateway/src/liveStatusServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { NotificationsHandler } from './collections/notifications/notificationsH
import { NotificationsTopic } from './topics/notificationsTopic.js'
import { PlaylistNotificationsHandler } from './collections/notifications/playlistNotificationsHandler.js'
import { RundownNotificationsHandler } from './collections/notifications/rundownNotificationsHandler.js'
import { wsConnectionsGauge } from './wsMetrics.js'

export interface CollectionHandlers {
studioHandler: StudioHandler
Expand Down Expand Up @@ -153,8 +154,10 @@ export class LiveStatusServer {
this._logger.info(`Closing websocket`)
rootChannel.removeSubscriber(ws)
this._clients.delete(ws)
wsConnectionsGauge.set(this._clients.size)
})
this._clients.add(ws)
wsConnectionsGauge.set(this._clients.size)

if (typeof request.url === 'string' && request.url === '/') {
rootChannel.addSubscriber(ws)
Expand Down
31 changes: 0 additions & 31 deletions packages/live-status-gateway/src/process.ts

This file was deleted.

14 changes: 14 additions & 0 deletions packages/live-status-gateway/src/topics/root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
SubscriptionStatus,
SubscriptionName,
} from '@sofie-automation/live-status-gateway-api'
import { activeSubscriptionsGauge, subscriptionSubscribersGauge } from '../wsMetrics.js'

enum PublishMsg {
ping = 'ping',
Expand Down Expand Up @@ -41,6 +42,7 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic {
removeSubscriber(ws: WebSocket): void {
super.removeSubscriber(ws)
this._topics.forEach((h) => h.removeSubscriber(ws))
this._updateSubscriptionMetrics()
}

processMessage(ws: WebSocket, msg: object): void {
Expand Down Expand Up @@ -74,6 +76,16 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic {
if (Object.values<string>(SubscriptionName).includes(channel)) this._topics.set(channel, topic)
}

private _updateSubscriptionMetrics(): void {
let total = 0
for (const [name, topic] of this._topics) {
const count = topic.subscriberCount
subscriptionSubscribersGauge.set({ subscription: name }, count)
total += count
}
activeSubscriptionsGauge.set(total)
}

subscribe(ws: WebSocket, name: SubscriptionName, reqid: number): void {
const topic = this._topics.get(name)
const curUnsubscribed =
Expand All @@ -91,6 +103,7 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic {
})
)
topic.addSubscriber(ws)
this._updateSubscriptionMetrics()
} else {
this.sendMessage(
ws,
Expand All @@ -112,6 +125,7 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic {
const curSubscribed = topic && topic.hasSubscriber(ws) && Object.values<string>(SubscriptionName).includes(name)
if (curSubscribed) {
topic.removeSubscriber(ws)
this._updateSubscriptionMetrics()
this.sendMessage(
ws,
literal<SubscriptionStatusSuccess>({
Expand Down
5 changes: 5 additions & 0 deletions packages/live-status-gateway/src/wsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ export abstract class WebSocketTopicBase {
: this.sendStatusToAll
}

get subscriberCount(): number {
return this._subscribers.size
}

addSubscriber(ws: WebSocket): void {
this._logger.info(`${this._name} adding a websocket subscriber`)
this._subscribers.add(ws)
Expand Down Expand Up @@ -77,6 +81,7 @@ export abstract class WebSocketTopicBase {
}

export interface WebSocketTopic {
subscriberCount: number
addSubscriber(ws: WebSocket): void
hasSubscriber(ws: WebSocket): boolean
removeSubscriber(ws: WebSocket): void
Expand Down
17 changes: 17 additions & 0 deletions packages/live-status-gateway/src/wsMetrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MetricsGauge } from '@sofie-automation/server-core-integration/dist/lib/prometheus'

export const wsConnectionsGauge = new MetricsGauge({
name: 'sofie_lsg_websocket_connections',
help: 'Number of open WebSocket connections',
})

export const activeSubscriptionsGauge = new MetricsGauge({
name: 'sofie_lsg_active_subscriptions_total',
help: 'Total number of active subscriptions across all topics',
})

export const subscriptionSubscribersGauge = new MetricsGauge({
name: 'sofie_lsg_subscription_subscribers',
help: 'Number of subscribers per subscription',
labelNames: ['subscription'] as const,
})
Loading
Loading