diff --git a/packages/isar/pubspec.lock b/packages/isar/pubspec.lock index d26977b97..89c202684 100644 --- a/packages/isar/pubspec.lock +++ b/packages/isar/pubspec.lock @@ -356,9 +356,10 @@ packages: ndk: dependency: "direct main" description: - path: "../ndk" - relative: true - source: path + name: ndk + sha256: "88c371ca73014fa41b8f711036fb64d01c99045e7ce7e9560fa49a0976c36e18" + url: "https://pub.dev" + source: hosted version: "0.6.1-dev.1" node_preamble: dependency: transitive diff --git a/packages/ndk/lib/data_layer/data_sources/websocket_client.dart b/packages/ndk/lib/data_layer/data_sources/websocket_client.dart index d8648994b..c82a282b5 100644 --- a/packages/ndk/lib/data_layer/data_sources/websocket_client.dart +++ b/packages/ndk/lib/data_layer/data_sources/websocket_client.dart @@ -26,20 +26,18 @@ class WebsocketDSClient { } bool isOpen() { - return ws.connection.state == Connected() || - ws.connection.state == Reconnected(); + final state = ws.connection.state; + return state is Connected || state is Reconnected; } int? closeCode() { - return ws.connection.state == Disconnected() - ? (ws.connection.state as Disconnected).code - : null; + final state = ws.connection.state; + return state is Disconnected ? state.code : null; } String? closeReason() { - return ws.connection.state == Disconnected() - ? (ws.connection.state as Disconnected).reason - : null; + final state = ws.connection.state; + return state is Disconnected ? state.reason : null; } } // coverage:ignore-end diff --git a/packages/ndk/lib/domain_layer/entities/nip_01_event_raw.dart b/packages/ndk/lib/domain_layer/entities/nip_01_event_raw.dart new file mode 100644 index 000000000..a222d79bb --- /dev/null +++ b/packages/ndk/lib/domain_layer/entities/nip_01_event_raw.dart @@ -0,0 +1,50 @@ +enum NostrMessageRawType { + notice, + event, + eose, + ok, + closed, + auth, + unknown, +} + +//? needed until Nip01Event is refactored to be immutable +class Nip01EventRaw { + final String id; + + final String pubKey; + + final int createdAt; + + final int kind; + + final List> tags; + + final String content; + + final String sig; + + Nip01EventRaw({ + required this.id, + required this.pubKey, + required this.createdAt, + required this.kind, + required this.tags, + required this.content, + required this.sig, + }); +} + +class NostrMessageRaw { + final NostrMessageRawType type; + final Nip01EventRaw? nip01Event; + final String? requestId; + final dynamic otherData; + + NostrMessageRaw({ + required this.type, + this.nip01Event, + this.requestId, + this.otherData, + }); +} diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index b632d60a6..73accd62c 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -6,7 +6,9 @@ import 'package:rxdart/rxdart.dart'; import '../../config/bootstrap_relays.dart'; import '../../config/relay_defaults.dart'; +import '../../shared/decode_nostr_msg/decode_nostr_msg.dart'; import '../../shared/helpers/relay_helper.dart'; +import '../../shared/isolates/isolate_manager.dart'; import '../../shared/logger/logger.dart'; import '../../shared/nips/nip01/client_msg.dart'; import '../entities/broadcast_state.dart'; @@ -14,6 +16,7 @@ import '../entities/connection_source.dart'; import '../entities/filter.dart'; import '../entities/global_state.dart'; import '../entities/nip_01_event.dart'; +import '../entities/nip_01_event_raw.dart'; import '../entities/relay.dart'; import '../entities/relay_connectivity.dart'; import '../entities/relay_info.dart'; @@ -388,23 +391,23 @@ class RelayManager { }); } - void _handleIncomingMessage( - dynamic message, RelayConnectivity relayConnectivity) { - List eventJson; - try { - final decodedMessage = json.decode(message); - if (decodedMessage is! List) { - Logger.log.w("Received non-list JSON message from ${relayConnectivity.url}: $message"); - return; - } - eventJson = decodedMessage; - } on FormatException catch (e) { - Logger.log.e( - "FormatException in _handleIncomingMessage for relay ${relayConnectivity.url}: $e, message: $message"); + Future _handleIncomingMessage( + dynamic message, RelayConnectivity relayConnectivity) async { + /// decode in isolate + final nostrMsg = await IsolateManager.instance + .runInEncodingIsolate( + decodeNostrMsg, + message, + ); + + if (nostrMsg.type == NostrMessageRawType.unknown) { + Logger.log.w( + "Received non NostrMessageRaw message from ${relayConnectivity.url}: $nostrMsg"); return; } - if (eventJson[0] == 'OK') { + if (nostrMsg.type == NostrMessageRawType.ok) { + final eventJson = nostrMsg.otherData; //nip 20 used to notify clients if an EVENT was successful if (eventJson.length >= 2 && eventJson[2] == false) { Logger.log.e("NOT OK from ${relayConnectivity.url}: $eventJson"); @@ -426,22 +429,26 @@ class RelayManager { } return; } - if (eventJson[0] == 'NOTICE') { + if (nostrMsg.type == NostrMessageRawType.notice) { + final eventJson = nostrMsg.otherData; Logger.log.w("NOTICE from ${relayConnectivity.url}: ${eventJson[1]}"); _logActiveRequests(); - } else if (eventJson[0] == 'EVENT') { + } else if (nostrMsg.type == NostrMessageRawType.event) { _handleIncomingEvent( - eventJson, relayConnectivity, message.toString().codeUnits.length); - Logger.log.t("EVENT from ${relayConnectivity.url}: $eventJson"); - } else if (eventJson[0] == 'EOSE') { + nostrMsg, relayConnectivity, message.toString().codeUnits.length); + // Logger.log.t("EVENT from ${relayConnectivity.url}: $eventJson"); + } else if (nostrMsg.type == NostrMessageRawType.eose) { + final eventJson = nostrMsg.otherData; Logger.log.d("EOSE from ${relayConnectivity.url}: ${eventJson[1]}"); _handleEOSE(eventJson, relayConnectivity); - } else if (eventJson[0] == 'CLOSED') { + } else if (nostrMsg.type == NostrMessageRawType.closed) { + final eventJson = nostrMsg.otherData; Logger.log.w( " CLOSED subscription url: ${relayConnectivity.url} id: ${eventJson[1]} msg: ${eventJson.length > 2 ? eventJson[2] : ''}"); _handleClosed(eventJson, relayConnectivity); } - if (eventJson[0] == ClientMsgType.kAuth) { + if (nostrMsg.type == NostrMessageRawType.auth) { + final eventJson = nostrMsg.otherData; // nip 42 used to send authentication challenges final challenge = eventJson[1]; Logger.log.d("AUTH: $challenge"); @@ -468,23 +475,33 @@ class RelayManager { // } } - void _handleIncomingEvent(List eventJson, + void _handleIncomingEvent(NostrMessageRaw nostrMsgRaw, RelayConnectivity connectivity, int messageSize) { - var id = eventJson[1]; - if (globalState.inFlightRequests[id] == null) { + final requestId = nostrMsgRaw.requestId!; + final eventRaw = nostrMsgRaw.nip01Event!; + + if (globalState.inFlightRequests[requestId] == null) { Logger.log.w( - "RECEIVED EVENT from ${connectivity.url} for id $id, not in globalState inFlightRequests. Likely data after EOSE on a query"); + "RECEIVED EVENT from ${connectivity.url} for id $requestId, not in globalState inFlightRequests. Likely data after EOSE on a query"); return; } - Nip01Event event = Nip01Event.fromJson(eventJson[2]); + Nip01Event event = Nip01Event( + pubKey: eventRaw.pubKey, + createdAt: eventRaw.createdAt, + kind: eventRaw.kind, + tags: eventRaw.tags, + content: eventRaw.content, + ); + event.sig = eventRaw.sig; + event.id = eventRaw.id; connectivity.stats.incStatsByNewEvent(event, messageSize); - RequestState? state = globalState.inFlightRequests[id]; + RequestState? state = globalState.inFlightRequests[requestId]; if (state != null) { RelayRequestState? request = state.requests[connectivity.url]; if (request == null) { - Logger.log.w("No RelayRequestState found for id $id"); + Logger.log.w("No RelayRequestState found for id $requestId"); return; } event.sources.add(connectivity.url); @@ -650,3 +667,7 @@ class RelayManager { return globalState.relays[url]; } } + +dynamic decodeJson(String jsonString) { + return json.decode(jsonString); +} diff --git a/packages/ndk/lib/shared/decode_nostr_msg/decode_nostr_msg.dart b/packages/ndk/lib/shared/decode_nostr_msg/decode_nostr_msg.dart new file mode 100644 index 000000000..6fff3ffd7 --- /dev/null +++ b/packages/ndk/lib/shared/decode_nostr_msg/decode_nostr_msg.dart @@ -0,0 +1,62 @@ +import 'dart:convert'; + +import '../../domain_layer/entities/nip_01_event_raw.dart'; + +NostrMessageRaw decodeNostrMsg(String msgJsonStr) { + try { + final decoded = jsonDecode(msgJsonStr); + + if (decoded is! List || decoded.isEmpty) { + return NostrMessageRaw(type: NostrMessageRawType.unknown); + } + + final msgTypeStr = decoded[0]; + switch (msgTypeStr) { + case 'NOTICE': + return NostrMessageRaw( + type: NostrMessageRawType.notice, otherData: decoded); + case 'EVENT': + if (decoded.length < 3) { + return NostrMessageRaw(type: NostrMessageRawType.unknown); + } + final requestId = decoded[1]; + final eventData = decoded[2]; + final nip01Event = Nip01EventRaw( + id: eventData['id'], + pubKey: eventData['pubkey'], + createdAt: eventData['created_at'], + kind: eventData['kind'], + tags: (eventData['tags'] as List) + .map((tag) => List.from(tag)) + .toList(), + content: eventData['content'], + sig: eventData['sig'], + ); + return NostrMessageRaw( + type: NostrMessageRawType.event, + nip01Event: nip01Event, + requestId: requestId, + ); + case 'EOSE': + return NostrMessageRaw( + type: NostrMessageRawType.eose, + otherData: decoded, + ); + case 'OK': + return NostrMessageRaw( + type: NostrMessageRawType.ok, otherData: decoded); + case 'CLOSED': + return NostrMessageRaw( + type: NostrMessageRawType.closed, otherData: decoded); + case 'AUTH': + return NostrMessageRaw( + type: NostrMessageRawType.auth, otherData: decoded); + default: + return NostrMessageRaw( + type: NostrMessageRawType.unknown, otherData: decoded); + } + } catch (e) { + return NostrMessageRaw( + type: NostrMessageRawType.unknown, otherData: msgJsonStr); + } +} diff --git a/packages/ndk/lib/shared/isolates/isolate_manager.dart b/packages/ndk/lib/shared/isolates/isolate_manager.dart new file mode 100644 index 000000000..1b810eff3 --- /dev/null +++ b/packages/ndk/lib/shared/isolates/isolate_manager.dart @@ -0,0 +1,134 @@ +import 'dart:async'; +import 'dart:isolate'; + +class IsolateConfig { + Isolate isolate; + SendPort sendPort; + IsolateConfig(this.isolate, this.sendPort); +} + +class IsolateManager { + static IsolateManager? _instance; + static IsolateManager get instance { + _instance ??= IsolateManager._(); + return _instance!; + } + + Isolate? _encodeIsolate; + Isolate? _computeIsolate; + + SendPort? _encodeSendPort; + SendPort? _computeSendPort; + final Completer _readyCompleter = Completer(); + + IsolateManager._() { + _initialize(); + } + + Future _initialize() async { + try { + _encodeSendPort = await _createIsolate((sendPort) { + _encodeIsolate = sendPort.isolate; + return sendPort.sendPort; + }); + _computeSendPort = await _createIsolate((sendPort) { + _computeIsolate = sendPort.isolate; + return sendPort.sendPort; + }); + + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + } catch (e) { + if (!_readyCompleter.isCompleted) { + _readyCompleter.completeError(e); + } + } + } + + Future _createIsolate(Function(IsolateConfig) isolateConfig) async { + final receivePort = ReceivePort(); + final isolate = await Isolate.spawn(_isolateEntry, receivePort.sendPort); + final sendPort = await receivePort.first as SendPort; + isolateConfig(IsolateConfig(isolate, sendPort)); + return sendPort; + } + + Future _runTask( + R Function(Q) task, + Q argument, + SendPort? sendPort, + ) async { + if (sendPort == null) { + throw StateError('Isolate not initialized'); + } + + final completer = Completer(); + final port = ReceivePort(); + sendPort.send([task, argument, port.sendPort]); + port.listen((message) { + port.close(); + if (message is Map && message['error'] != null) { + completer.completeError(message['error']); + } else { + completer.complete(message as R); + } + }); + return completer.future; + } + + Future get ready => _readyCompleter.future; + + /// dedicated for decoding/encoding json + Future runInEncodingIsolate( + R Function(Q) task, + Q argument, + ) async { + await ready; + return _runTask(task, argument, _encodeSendPort); + } + + /// dedicated for compute operations (like crypto, hashing, etc) + Future runInComputeIsolate( + R Function(Q) task, + Q argument, + ) async { + await ready; + return _runTask(task, argument, _computeSendPort); + } + + Future dispose() async { + _encodeIsolate?.kill(priority: Isolate.immediate); + _computeIsolate?.kill(priority: Isolate.immediate); + + _encodeIsolate = null; + _computeIsolate = null; + + _encodeSendPort = null; + _computeSendPort = null; + _instance = null; + } +} + +void _isolateEntry(SendPort sendPort) { + final port = ReceivePort(); + sendPort.send(port.sendPort); + port.listen((message) { + if (message is! List || message.length != 3) { + return; + } + + final task = message[0] as Function; + final argument = message[1]; + final replyPort = message[2] as SendPort; + + try { + final result = task(argument); + replyPort.send(result); + } catch (e, stackTrace) { + // ignore: avoid_print + print('_isolateEntry Error: $e\n$stackTrace'); + replyPort.send({'error': 'Error: $e'}); + } + }); +}