Skip to content

Commit 58638d1

Browse files
committed
Fix input & not-serialized output on reconnect
1 parent d840bcf commit 58638d1

File tree

10 files changed

+126
-53
lines changed

10 files changed

+126
-53
lines changed

packages/adapters/src/process-instance-adapter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ class ProcessInstanceAdapter implements
254254
process.kill(this.processPID, 0);
255255
} catch (e) {
256256
this.logger.error("Runner process not exists", e);
257-
/** process not exists */
257+
258+
clearInterval(interval);
259+
258260
reject("pid not exists");
259261
}
260262
}

packages/host/src/lib/csi-controller.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export class CSIController extends TypedEmitter<Events> {
116116
apiInputEnabled = true;
117117

118118
executionTime: number = -1;
119+
inputHeadersSent = false;
119120

120121
/**
121122
* Topic to which the output stream should be routed
@@ -403,9 +404,10 @@ export class CSIController extends TypedEmitter<Events> {
403404
.pipe(this.upStreams[CC.CONTROL]);
404405

405406
this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PING, async (message) => {
406-
const { status, payload } = message[1];
407+
const { status, payload, inputHeadersSent } = message[1];
407408

408409
this.status = status || InstanceStatus.RUNNING;
410+
this.inputHeadersSent = inputHeadersSent;
409411

410412
if (!payload) {
411413
this.emit("error", "No payload in ping!");
@@ -485,6 +487,10 @@ export class CSIController extends TypedEmitter<Events> {
485487
this.logger.trace("Received a PING message with ports config");
486488
}
487489

490+
this.inputHeadersSent = !!message[1].inputHeadersSent;
491+
492+
this.logger.info("Headers already sent for input?", this.inputHeadersSent);
493+
488494
if (this.instanceAdapter.setRunner) {
489495
await this.instanceAdapter.setRunner({
490496
...message[1].payload.system,
@@ -537,8 +543,6 @@ export class CSIController extends TypedEmitter<Events> {
537543
}
538544

539545
createInstanceAPIRouter() {
540-
let inputHeadersSent = false;
541-
542546
if (!this.upStreams) {
543547
throw new AppError("UNATTACHED_STREAMS");
544548
}
@@ -551,11 +555,11 @@ export class CSIController extends TypedEmitter<Events> {
551555
* @experimental
552556
*/
553557
this.router.duplex("/inout", (duplex, _headers) => {
554-
if (!inputHeadersSent) {
558+
if (!this.inputHeadersSent) {
555559
this.downStreams![CC.IN].write(`Content-Type: ${_headers["content-type"]}\r\n`);
556560
this.downStreams![CC.IN].write("\r\n");
557561

558-
inputHeadersSent = true;
562+
this.inputHeadersSent = true;
559563
}
560564

561565
(duplex as unknown as DuplexStream).input.pipe(this.downStreams![CC.IN], { end: false });
@@ -597,15 +601,15 @@ export class CSIController extends TypedEmitter<Events> {
597601
const contentType = req.headers["content-type"];
598602

599603
// @TODO: Check if subsequent requests have the same content-type.
600-
if (!inputHeadersSent) {
604+
if (!this.inputHeadersSent) {
601605
if (contentType === undefined) {
602606
return { opStatus: ReasonPhrases.NOT_ACCEPTABLE, error: "Content-Type must be defined" };
603607
}
604608

605609
stream.write(`Content-Type: ${contentType}\r\n`);
606610
stream.write("\r\n");
607611

608-
inputHeadersSent = true;
612+
this.inputHeadersSent = true;
609613
}
610614

611615
return stream;

packages/host/src/lib/csi-dispatcher.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,14 @@ export class CSIDispatcher extends TypedEmitter<Events> {
6666
id,
6767
sequenceInfo,
6868
payload,
69-
status: InstanceStatus.INITIALIZING
69+
status: InstanceStatus.INITIALIZING,
70+
inputHeadersSent: false
7071
}, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter);
7172

7273
this.logger.trace("CSIController created", id, sequenceInfo);
7374

7475
csiController.logger.pipe(this.logger, { end: false });
76+
7577
communicationHandler.logger.pipe(this.logger, { end: false });
7678

7779
csiController
@@ -99,15 +101,15 @@ export class CSIDispatcher extends TypedEmitter<Events> {
99101
this.logger.warn("Missing topic content-type");
100102
}
101103

102-
if (data.requires && !csiController.inputRouted && data.contentType) {
103-
this.logger.trace("Routing topic to Sequence input", data.requires);
104+
if (data.requires && data.contentType) {
105+
this.logger.trace("Routing topic to Instance input", data.requires);
104106

105107
await this.serviceDiscovery.routeTopicToStream(
106108
{ topic: new TopicId(data.requires), contentType: data.contentType as ContentType },
107109
csiController.getInputStream()
108110
);
109111

110-
csiController.inputRouted = true;
112+
csiController.inputHeadersSent = true;
111113

112114
await this.serviceDiscovery.update({
113115
requires: data.requires, contentType: data.contentType, topicName: data.requires, status: "add"

packages/host/src/lib/host.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ export class Host implements IComponent {
340340

341341
const seq = this.sequenceStore.getById(instance.sequence.id);
342342

343-
if (!seq) {
343+
if (!seq && this.cpmConnector?.connected) {
344344
this.logger.info("Sequence not found. Checking Store...");
345345

346346
try {

packages/host/src/lib/serviceDiscovery/sd-adapter.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,8 @@ export class ServiceDiscovery {
179179
}
180180

181181
async update(data: STHTopicEventData) {
182-
this.logger.trace("Topic update. Send topic info to CPM", data);
183-
184182
if (this.cpmConnector?.connected) {
183+
this.logger.trace("Topic update. Send topic info to CPM", data);
185184
await this.cpmConnector?.sendTopicInfo(data);
186185
}
187186
}

packages/host/src/lib/socket-server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export class SocketServer extends TypedEmitter<Events> implements IComponent {
7777
this.server!
7878
.listen(this.port, this.hostname, () => {
7979
this.logger.info("SocketServer on", this.server?.address());
80+
8081
res();
8182
})
8283
.on("error", rej);

packages/runner/src/host-client.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import { ObjLogger } from "@scramjet/obj-logger";
33
import { CommunicationChannel as CC } from "@scramjet/symbols";
44
import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types";
5+
import { defer } from "@scramjet/utility";
56
import { Agent } from "http";
67
import net, { Socket, createConnection } from "net";
8+
import { PassThrough } from "stream";
79

810
type HostOpenConnections = [
911
net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket
@@ -42,20 +44,24 @@ class HostClient implements IHostClient {
4244
async init(id: string): Promise<void> {
4345
const openConnections = await Promise.all(
4446
Array.from(Array(9))
45-
.map(() => {
47+
.map((_e: any, i: number) => {
4648
// Error handling for each connection is process crash for now
4749
let connection: Socket;
4850

4951
try {
5052
connection = net.createConnection(this.instancesServerPort, this.instancesServerHost);
51-
connection.on("error", () => {});
53+
connection.on("error", () => {
54+
this.logger.warn(`${i} Stream error`);
55+
});
5256
connection.setNoDelay(true);
5357
} catch (e) {
5458
return Promise.reject(e);
5559
}
5660

5761
return new Promise<net.Socket>(res => {
58-
connection.on("connect", () => res(connection));
62+
connection.on("connect", () => {
63+
res(connection);
64+
});
5965
});
6066
})
6167
.map((connPromised, index) => {
@@ -74,6 +80,26 @@ class HostClient implements IHostClient {
7480

7581
this._streams = openConnections as HostOpenConnections;
7682

83+
const input = this._streams[CC.IN];
84+
85+
const inputTarget = new PassThrough({ emitClose: false });
86+
87+
input.on("end", async () => {
88+
await defer(500);
89+
90+
if ((this._streams![CC.CONTROL] as net.Socket).readableEnded) {
91+
this.logger.info("Input end. Control is also ended... We are disconnected.");
92+
} else {
93+
this.logger.info("Input end. Control not ended. We are online. Desired input end.");
94+
inputTarget.end();
95+
}
96+
});
97+
98+
input.pipe(inputTarget, { end: false });
99+
100+
this._streams[CC.IN] = inputTarget;
101+
//this._streams[CC.STDIN] = this._streams[CC.STDIN].pipe(new PassThrough({ emitClose: false }), { end: false });
102+
77103
try {
78104
this.bpmux = new BPMux(this._streams[CC.PACKAGE]);
79105
} catch (e) {
@@ -118,6 +144,11 @@ class HostClient implements IHostClient {
118144
const streamsExitedPromised: Promise<void>[] = this.streams.map((stream, i) =>
119145
new Promise(
120146
(res) => {
147+
if ([CC.IN, CC.STDIN, CC.CONTROL].includes(i)) {
148+
res();
149+
return;
150+
}
151+
121152
if (!hard && "writable" in stream!) {
122153
stream
123154
.on("error", (e) => {

0 commit comments

Comments
 (0)