Skip to content

Commit ea65d38

Browse files
Merge pull request #748 from scramjetorg/fix/topics
Fix/topics
2 parents 7250213 + 5e79b55 commit ea65d38

File tree

10 files changed

+46
-73
lines changed

10 files changed

+46
-73
lines changed

packages/host/src/lib/csi-controller.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ export class CSIController extends TypedEmitter<Events> {
125125
*/
126126
public inputTopic?: string;
127127

128+
public outputRouted = false;
129+
public inputRouted = false;
130+
128131
/**
129132
* Logger.
130133
*
@@ -470,17 +473,18 @@ export class CSIController extends TypedEmitter<Events> {
470473
this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PANG, async (message) => {
471474
const pangData = message[1];
472475

473-
if (pangData.provides) {
474-
this.provides ||= pangData.provides;
475-
} else if (pangData.requires) {
476-
this.requires ||= pangData.requires;
476+
this.provides ||= this.outputTopic || pangData.provides;
477+
this.requires ||= this.inputTopic || pangData.requires;
477478

478-
if (pangData.requires !== "") {
479-
this.apiInputEnabled = false;
480-
}
479+
if (this.requires) {
480+
this.apiInputEnabled = false;
481481
}
482482

483-
this.emit("pang", message[1]);
483+
this.emit("pang", {
484+
provides: this.provides,
485+
requires: this.requires,
486+
contentType: pangData.contentType
487+
});
484488
});
485489

486490
this.communicationHandler.addMonitoringHandler(RunnerMessageCode.MONITORING, async message => {

packages/host/src/lib/host.ts

Lines changed: 16 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ export class Host implements IComponent {
360360
await this.startCSIController(sequence, {
361361
appConfig: seqenceConfig.appConfig || {},
362362
args: seqenceConfig.args,
363+
instanceId: seqenceConfig.instanceId
363364
});
364365
this.logger.debug("Starting sequence based on config", seqenceConfig);
365366
})
@@ -488,9 +489,6 @@ export class Host implements IComponent {
488489
}
489490

490491
topicsMiddleware(req: ParsedMessage, res: ServerResponse, next: NextCallback) {
491-
req.socket?.setTimeout(0);
492-
req.socket?.setNoDelay(true);
493-
494492
req.url = req.url?.substring(this.topicsBase.length);
495493

496494
return this.serviceDiscovery.router.lookup(req, res, next);
@@ -861,7 +859,7 @@ export class Host implements IComponent {
861859
*/
862860
async startCSIController(sequence: SequenceInfo, payload: STHRestAPI.StartSequencePayload): Promise<CSIController> {
863861
const communicationHandler = new CommunicationHandler();
864-
const id = IDProvider.generate();
862+
const id = payload.instanceId || IDProvider.generate();
865863

866864
this.logger.debug("CSIC start payload", payload);
867865

@@ -880,64 +878,33 @@ export class Host implements IComponent {
880878
});
881879

882880
csic.on("pang", async (data) => {
883-
// @TODO REFACTOR possibly send only one PANG in Runner and throw on more pangs
884881
this.logger.trace("PANG received", data);
885882

886-
// On First empty PANG
887-
if (!data.requires && !data.provides) {
888-
if (csic.inputTopic) {
889-
this.logger.trace("Routing topic to Sequence input, name from API:", csic.inputTopic);
890-
891-
csic.requires = csic.inputTopic;
892-
893-
await this.serviceDiscovery.routeTopicToStream(
894-
{ topic: csic.inputTopic, contentType: "" },
895-
csic.getInputStream()
896-
);
897-
}
898-
899-
if (csic.outputTopic) {
900-
this.logger.trace("Routing Sequence output to topic, name from API", csic.outputTopic);
901-
902-
csic.provides = csic.outputTopic;
903-
904-
// @TODO use pang data for contentType, right now it's a bit tricky bc there are multiple pangs
905-
await this.serviceDiscovery.routeStreamToTopic(
906-
csic.getOutputStream(),
907-
{ topic: csic.outputTopic, contentType: "" },
908-
csic.id
909-
);
910-
}
911-
}
912-
913-
// Do not route original topic to input stream, if --input-topic is specified
914-
if (!csic.inputTopic && data.requires) {
915-
this.logger.trace("Routing topic to sequence input, name from Sequence:", data.requires);
916-
917-
csic.requires = data.requires;
883+
if (data.requires && !csic.inputRouted) {
884+
this.logger.trace("Routing Sequence input to topic", data.requires);
918885

919886
await this.serviceDiscovery.routeTopicToStream(
920887
{ topic: data.requires, contentType: data.contentType! },
921888
csic.getInputStream()
922889
);
923890

891+
csic.inputRouted = true;
892+
924893
await this.serviceDiscovery.update({
925894
requires: data.requires, contentType: data.contentType!, topicName: data.requires
926895
});
927896
}
928897

929-
// Do not route output stream to original topic if --output-topic is specified
930-
if (!csic.outputTopic && data.provides) {
931-
this.logger.trace("Routing sequence output to topic, name from Sequence", data.provides);
932-
933-
csic.provides = data.provides;
934-
898+
if (data.provides && !csic.outputRouted) {
899+
this.logger.trace("Routing Sequence output to topic", data.requires);
935900
await this.serviceDiscovery.routeStreamToTopic(
936901
csic.getOutputStream(),
937-
{ topic: data.provides, contentType: "" },
902+
{ topic: data.provides, contentType: data.contentType! },
938903
csic.id
939904
);
940905

906+
csic.outputRouted = true;
907+
941908
await this.serviceDiscovery.update({
942909
provides: data.provides, contentType: data.contentType!, topicName: data.provides
943910
});
@@ -1039,6 +1006,8 @@ export class Host implements IComponent {
10391006
* @returns {STHRestAPI.GetSequencesResponse} List of Sequences.
10401007
*/
10411008
getSequences(): STHRestAPI.GetSequencesResponse {
1009+
this.logger.info("List Sequences");
1010+
10421011
return Array.from(this.sequencesStore.values()).map((sequence) => ({
10431012
id: sequence.id,
10441013
name: sequence.name,
@@ -1067,10 +1036,9 @@ export class Host implements IComponent {
10671036
}
10681037

10691038
getTopics() {
1070-
return this.serviceDiscovery.getTopics().map((topic) => ({
1071-
name: topic.topic,
1072-
contentType: topic.contentType,
1073-
}));
1039+
this.logger.info("List topics");
1040+
1041+
return this.serviceDiscovery.getTopics();
10741042
}
10751043

10761044
getStatus(): STHRestAPI.GetStatusResponse {

packages/host/src/lib/sd-adapter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ export class ServiceDiscovery {
8686
this.logger.debug(`Incoming topic '${topic}' request`);
8787
let target = this.getByTopic(topic);
8888

89+
req.on("close", () => {
90+
//@TODO: Should remove this actor"
91+
});
92+
8993
if (!target) {
9094
target = this.addData(
9195
{ contentType, topic },

packages/runner/src/host-client.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ class HostClient implements IHostClient {
3535
// Error handling for each connection is process crash for now
3636
const connection = net.createConnection(this.instancesServerPort, this.instancesServerHost);
3737

38-
connection.setNoDelay();
39-
4038
return new Promise<net.Socket>(res => {
4139
connection.on("connect", () => res(connection));
4240
});
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { AppConfig } from "../app-config";
22

33
export type StartSequenceDTO = {
4-
id: string,
5-
appConfig?: AppConfig,
6-
args?: string[]
4+
id: string;
5+
appConfig?: AppConfig;
6+
args?: string[];
7+
instanceId?: string;
78
}

packages/types/src/rest-api-sth/start-sequence.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ export type StartSequencePayload = {
88
args?: any[],
99
outputTopic?: string,
1010
inputTopic?: string,
11-
limits?: InstanceLimits
11+
limits?: InstanceLimits,
12+
instanceId?: string
1213
}

packages/utility/src/typeguards/dto/sequence-start.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import { StartSequenceDTO } from "@scramjet/types";
22

3+
// eslint-disable-next-line complexity
34
export function isStartSequenceDTO(arg: any): arg is StartSequenceDTO {
45
if (typeof arg !== "object") throw new Error("DTO is not an object");
5-
const { id, appConfig, args, ...rest } = arg;
6+
const { id, appConfig, args, instanceId, ...rest } = arg;
67

78
if (typeof id !== "string") throw new Error("DTO id is not string");
89
if (!["object", "undefined"].includes(typeof appConfig)) throw new Error(`DTO appConfig is ${typeof appConfig}, not an object`);
910
if (typeof args !== "undefined") {
1011
if (!Array.isArray(args)) throw new Error("DTO args are not an array");
1112
if ((args as string[]).some(x => typeof x !== "string")) throw new Error("DTO args are all strings");
1213
}
14+
if (instanceId !== undefined && typeof instanceId === "string" && instanceId.length !== 36) throw new Error("DTO instanceId is 36 long");
1315
if (Object.keys(rest).length > 0) throw new Error(`DTO has unknown ${Object.keys(rest)} keys`);
1416

1517
return true;

packages/verser/src/lib/verser-client.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ export class VerserClient extends TypedEmitter<Events> {
103103
ca: typeof this.opts.https === "object" ? this.opts.https.ca : undefined,
104104
});
105105

106-
connectRequest.socket?.setTimeout(0);
107-
connectRequest.socket?.setNoDelay(true);
108-
109106
connectRequest.on("error", (err) => {
110107
this.logger.error("Connect error", err);
111108
reject(err);
@@ -143,7 +140,7 @@ export class VerserClient extends TypedEmitter<Events> {
143140
this.emit("error", err);
144141
});
145142

146-
this._verserAgent = new HttpAgent({ keepAlive: true }) as HttpAgent & {
143+
this._verserAgent = new HttpAgent() as HttpAgent & {
147144
createConnection: typeof createConnection
148145
}; // lack of types?
149146

packages/verser/src/lib/verser-connection.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export class VerserConnection {
2929
private channelListeners: ((socket: Duplex, data?: any) => any)[] = [];
3030

3131
get connected() {
32-
return !this._socket.destroyed && this.bpmux;
32+
return !(this._socket.destroyed && this.bpmux);
3333
}
3434

3535
get socket(): Duplex {
@@ -163,16 +163,17 @@ export class VerserConnection {
163163
* @returns {Promise<VerserRequestResult>} Promise resolving to Response and Request objects.
164164
*/
165165
public async makeRequest(options: RequestOptions): Promise<VerserRequestResult> {
166-
if (!this.connected) throw new Error("BPMux not connected");
166+
if (!this.connected) throw new Error("Not connected");
167167

168+
this.logger.debug("making request", options);
168169
return new Promise((resolve, reject) => {
169170
const clientRequest = httpRequest({ ...options, agent: this.agent })
170171
.on("response", (incomingMessage: IncomingMessage) => {
171172
this.logger.debug("Got Response", options);
172173
resolve({ incomingMessage, clientRequest });
173174
})
174175
.on("error", (error: Error) => {
175-
this.logger.error("Error making request", options);
176+
this.logger.error("Request error", options, error);
176177
reject(error);
177178
});
178179

packages/verser/src/lib/verser.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ export class Verser extends TypedEmitter<Events> {
2727
this.server.on("connect", (req, socket: Socket) => {
2828
this.logger.info("New connection:", req.url);
2929

30-
socket.setTimeout(0);
31-
socket.setNoDelay(true);
32-
3330
const connection = new VerserConnection(req, socket);
3431

3532
this.connections.push(connection);

0 commit comments

Comments
 (0)