Skip to content

Commit fc49cb6

Browse files
authored
Add actuating methods (#475)
1 parent 99d77f6 commit fc49cb6

File tree

6 files changed

+129
-19
lines changed

6 files changed

+129
-19
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ build-buf: $(node_modules) clean-buf
5757
$(buf) generate buf.build/googleapis/googleapis
5858
$(buf) generate buf.build/viamrobotics/api:$$(cat api_version.lock) --path common,component,robot,service,app,provisioning,tagger,stream
5959
$(buf) generate buf.build/viamrobotics/goutils
60+
$(buf) generate buf.build/grpc/grpc --path grpc/reflection/v1/reflection.proto
6061

6162
# js targets
6263

examples/vanilla/package-lock.json

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/robot/client.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,15 @@ export class RobotClient extends EventDispatcher implements Robot {
168168
this.onDisconnect();
169169
}
170170
);
171-
this.sessionManager = new SessionManager((): Transport => {
172-
if (!this.transport) {
173-
throw new Error(RobotClient.notConnectedYetStr);
171+
this.sessionManager = new SessionManager(
172+
this.serviceHost,
173+
(): Transport => {
174+
if (!this.transport) {
175+
throw new Error(RobotClient.notConnectedYetStr);
176+
}
177+
return this.transport;
174178
}
175-
return this.transport;
176-
});
179+
);
177180

178181
// For each connection event type, add a listener to capture that
179182
// event and re-emit it with the 'connectionstatechange' event

src/robot/session-manager.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const mockGetHeartBeatWindow = new Duration({
2727

2828
describe('SessionManager', () => {
2929
beforeEach(() => {
30-
sm = new SessionManager(() => mockTransport);
30+
sm = new SessionManager('', () => mockTransport);
3131
});
3232

3333
it('no session initially', () => {

src/robot/session-manager.ts

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,23 @@
1+
/* eslint-disable max-depth */
2+
import {
3+
BinaryReader,
4+
FileDescriptorProto,
5+
MethodOptions,
6+
} from '@bufbuild/protobuf';
17
import {
28
Code,
39
ConnectError,
410
createClient,
511
type Transport,
612
} from '@connectrpc/connect';
13+
import { createAsyncIterable } from '@connectrpc/connect/protocol';
14+
import { safety_heartbeat_monitored as safteyHeartbeatMonitored } from '../gen/common/v1/common_pb';
15+
import { ServerReflection } from '../gen/grpc/reflection/v1/reflection_connect';
16+
import {
17+
FileDescriptorResponse,
18+
ListServiceResponse,
19+
ServerReflectionRequest,
20+
} from '../gen/grpc/reflection/v1/reflection_pb';
721
import { RobotService } from '../gen/robot/v1/robot_connect';
822
import {
923
SendSessionHeartbeatRequest,
@@ -22,11 +36,14 @@ const timeoutBlob = new Blob(
2236
);
2337

2438
export default class SessionManager {
39+
public static heartbeatMonitoredMethods: Record<string, boolean> = {};
40+
2541
public readonly transport: Transport;
2642

2743
private currentSessionID = '';
2844
private sessionsSupported: boolean | undefined;
2945
private heartbeatIntervalMs: number | undefined;
46+
private host = '';
3047

3148
private starting: Promise<void> | undefined;
3249

@@ -35,7 +52,11 @@ export default class SessionManager {
3552
return createClient(RobotService, transport);
3653
}
3754

38-
constructor(private deferredTransport: () => Transport) {
55+
constructor(
56+
host: string,
57+
private deferredTransport: () => Transport
58+
) {
59+
this.host = host;
3960
this.transport = new SessionTransport(this.deferredTransport, this);
4061
}
4162

@@ -167,6 +188,7 @@ export default class SessionManager {
167188
(Number(heartbeatWindow.seconds) * 1e3 +
168189
heartbeatWindow.nanos / 1e6) /
169190
5;
191+
await this.applyHeartbeatMonitoredMethods();
170192
resolve();
171193
this.heartbeat().catch(console.error); // eslint-disable-line no-console
172194
})()
@@ -180,4 +202,83 @@ export default class SessionManager {
180202

181203
return this.getSessionMetadataInner();
182204
}
205+
206+
private async applyHeartbeatMonitoredMethods(): Promise<void> {
207+
try {
208+
const client = createClient(ServerReflection, this.transport);
209+
const request = new ServerReflectionRequest({
210+
host: this.host,
211+
messageRequest: { case: 'listServices', value: '' },
212+
});
213+
const responseStream = client.serverReflectionInfo(
214+
createAsyncIterable([request]),
215+
{ timeoutMs: 10_000 }
216+
);
217+
for await (const serviceResponse of responseStream) {
218+
const fdpRequests = (
219+
serviceResponse.messageResponse.value as ListServiceResponse
220+
).service.map((service) => {
221+
return new ServerReflectionRequest({
222+
messageRequest: {
223+
case: 'fileContainingSymbol',
224+
value: service.name,
225+
},
226+
});
227+
});
228+
const fdpResponseStream = client.serverReflectionInfo(
229+
createAsyncIterable(fdpRequests),
230+
{ timeoutMs: 10_000 }
231+
);
232+
for await (const fdpResponse of fdpResponseStream) {
233+
for (const fdp of (
234+
fdpResponse.messageResponse.value as FileDescriptorResponse
235+
).fileDescriptorProto) {
236+
const protoFile = FileDescriptorProto.fromBinary(fdp);
237+
for (const service of protoFile.service) {
238+
for (const method of service.method) {
239+
SessionManager.heartbeatMonitoredMethods[
240+
`/${protoFile.package}.${service.name}/${method.name}`
241+
] = SessionManager.hasHeartbeatOption(method.options);
242+
}
243+
}
244+
}
245+
}
246+
}
247+
} catch {
248+
// If can't get heartbeat monitored methods via reflection, use defaults.
249+
SessionManager.heartbeatMonitoredMethods = {
250+
'/viam.component.arm.v1.ArmService/MoveToPosition': true,
251+
'/viam.component.arm.v1.ArmService/MoveToJointPositions': true,
252+
'/viam.component.arm.v1.ArmService/MoveThroughJointPositions': true,
253+
'/viam.component.base.v1.BaseService/MoveStraight': true,
254+
'/viam.component.base.v1.BaseService/Spin': true,
255+
'/viam.component.base.v1.BaseService/SetPower': true,
256+
'/viam.component.base.v1.BaseService/SetVelocity': true,
257+
'/viam.component.gantry.v1.GantryService/MoveToPosition': true,
258+
'/viam.component.gripper.v1.GripperService/Open': true,
259+
'/viam.component.gripper.v1.GripperService/Grab': true,
260+
'/viam.component.motor.v1.MotorService/SetPower': true,
261+
'/viam.component.motor.v1.MotorService/GoFor': true,
262+
'/viam.component.motor.v1.MotorService/GoTo': true,
263+
'/viam.component.motor.v1.MotorService/SetRPM': true,
264+
'/viam.component.servo.v1.ServoService/Move': true,
265+
};
266+
}
267+
}
268+
269+
private static hasHeartbeatOption(options?: MethodOptions): boolean {
270+
if (!options) {
271+
return false;
272+
}
273+
const reader = new BinaryReader(options.toBinary());
274+
while (reader.pos < reader.len) {
275+
const tag = reader.tag();
276+
const [fieldNumber] = tag;
277+
if (fieldNumber === safteyHeartbeatMonitored.field.no) {
278+
return true;
279+
}
280+
reader.string();
281+
}
282+
return false;
283+
}
183284
}

src/robot/session-transport.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
type UnaryResponse,
1515
} from '@connectrpc/connect';
1616
import { cloneHeaders } from '../rpc/dial';
17-
import type SessionManager from './session-manager';
17+
import SessionManager from './session-manager';
1818

1919
export default class SessionTransport implements Transport {
2020
constructor(
@@ -49,10 +49,13 @@ export default class SessionTransport implements Transport {
4949
message: PartialMessage<I>,
5050
contextValues?: ContextValues
5151
): Promise<UnaryResponse<I, O>> {
52-
const md = await this.getSessionMetadata();
5352
const newHeaders = cloneHeaders(header);
54-
for (const [key, value] of md) {
55-
newHeaders.set(key, value);
53+
const methodPath = `/${service.typeName}/${method.name}`;
54+
if (SessionManager.heartbeatMonitoredMethods[methodPath] ?? false) {
55+
const md = await this.getSessionMetadata();
56+
for (const [key, value] of md) {
57+
newHeaders.set(key, value);
58+
}
5659
}
5760
return this.deferredTransport().unary(
5861
service,
@@ -77,10 +80,13 @@ export default class SessionTransport implements Transport {
7780
input: AsyncIterable<PartialMessage<I>>,
7881
contextValues?: ContextValues
7982
): Promise<StreamResponse<I, O>> {
80-
const md = await this.getSessionMetadata();
8183
const newHeaders = cloneHeaders(header);
82-
for (const [key, value] of md) {
83-
newHeaders.set(key, value);
84+
const methodPath = `/${service.typeName}/${method.name}`;
85+
if (SessionManager.heartbeatMonitoredMethods[methodPath] ?? false) {
86+
const md = await this.getSessionMetadata();
87+
for (const [key, value] of md) {
88+
newHeaders.set(key, value);
89+
}
8490
}
8591
return this.deferredTransport().stream(
8692
service,

0 commit comments

Comments
 (0)