55import java .util .Map ;
66import java .util .ArrayList ;
77import java .util .List ;
8+ import org .zeromq .ZMQ ;
89
910/**
1011 * Java implementation of concore Docker communication.
@@ -24,6 +25,8 @@ public class concoredocker {
2425 private static String inpath = "/in" ;
2526 private static String outpath = "/out" ;
2627 private static Map <String , Object > params = new HashMap <>();
28+ private static Map <String , ZeroMQPort > zmqPorts = new HashMap <>();
29+ private static ZMQ .Context zmqContext = null ;
2730 // simtime as double to preserve fractional values (e.g. "[0.0, ...]")
2831 private static double simtime = 0 ;
2932 private static double maxtime ;
@@ -49,6 +52,7 @@ public class concoredocker {
4952 params = new HashMap <>();
5053 }
5154 defaultMaxTime (100 );
55+ Runtime .getRuntime ().addShutdownHook (new Thread (concoredocker ::terminateZmq ));
5256 }
5357
5458 /**
@@ -292,7 +296,7 @@ public static void write(int port, String name, Object val, int delta) {
292296 }
293297 content .append ("]" );
294298 // simtime must not be mutated here.
295- // Mutation breaks cross-language determinism (see issue #385) .
299+ // Mutation breaks cross-language determinism.
296300 } else if (val instanceof Object []) {
297301 // Legacy support for Object[] arguments
298302 Object [] arrayVal = (Object []) val ;
@@ -304,7 +308,7 @@ public static void write(int port, String name, Object val, int delta) {
304308 }
305309 content .append ("]" );
306310 // simtime must not be mutated here.
307- // Mutation breaks cross-language determinism (see issue #385) .
311+ // Mutation breaks cross-language determinism.
308312 } else {
309313 System .out .println ("write must have list or str" );
310314 return ;
@@ -336,6 +340,113 @@ public static List<Object> initVal(String simtimeVal) {
336340 return val ;
337341 }
338342
343+ private static ZMQ .Context getZmqContext () {
344+ if (zmqContext == null ) {
345+ zmqContext = ZMQ .context (1 );
346+ }
347+ return zmqContext ;
348+ }
349+
350+ public static void initZmqPort (String portName , String portType , String address , String socketTypeStr ) {
351+ if (zmqPorts .containsKey (portName )) return ;
352+ int sockType = zmqSocketTypeFromString (socketTypeStr );
353+ if (sockType == -1 ) {
354+ System .err .println ("initZmqPort: unknown socket type '" + socketTypeStr + "'" );
355+ return ;
356+ }
357+ zmqPorts .put (portName , new ZeroMQPort (portType , address , sockType ));
358+ }
359+
360+ public static void terminateZmq () {
361+ for (Map .Entry <String , ZeroMQPort > entry : zmqPorts .entrySet ()) {
362+ entry .getValue ().socket .close ();
363+ }
364+ zmqPorts .clear ();
365+ if (zmqContext != null ) {
366+ zmqContext .term ();
367+ zmqContext = null ;
368+ }
369+ }
370+
371+ private static int zmqSocketTypeFromString (String s ) {
372+ switch (s .toUpperCase ()) {
373+ case "REQ" : return ZMQ .REQ ;
374+ case "REP" : return ZMQ .REP ;
375+ case "PUB" : return ZMQ .PUB ;
376+ case "SUB" : return ZMQ .SUB ;
377+ case "PUSH" : return ZMQ .PUSH ;
378+ case "PULL" : return ZMQ .PULL ;
379+ case "PAIR" : return ZMQ .PAIR ;
380+ default : return -1 ;
381+ }
382+ }
383+
384+ /**
385+ * Reads data from a ZMQ port. Same wire format as file-based read:
386+ * expects [simtime, val1, val2, ...], strips simtime, returns the rest.
387+ */
388+ public static List <Object > read (String portName , String name , String initstr ) {
389+ List <Object > defaultVal = new ArrayList <>();
390+ try {
391+ List <?> parsed = (List <?>) literalEval (initstr );
392+ if (parsed .size () > 1 ) {
393+ defaultVal = new ArrayList <>(parsed .subList (1 , parsed .size ()));
394+ }
395+ } catch (Exception e ) {
396+ }
397+ ZeroMQPort port = zmqPorts .get (portName );
398+ if (port == null ) {
399+ System .err .println ("read: ZMQ port '" + portName + "' not initialized" );
400+ return defaultVal ;
401+ }
402+ String msg = port .recvWithRetry ();
403+ if (msg == null ) {
404+ System .err .println ("read: ZMQ recv timeout on port '" + portName + "'" );
405+ return defaultVal ;
406+ }
407+ s += msg ;
408+ try {
409+ List <?> inval = (List <?>) literalEval (msg );
410+ if (!inval .isEmpty ()) {
411+ simtime = Math .max (simtime , ((Number ) inval .get (0 )).doubleValue ());
412+ return new ArrayList <>(inval .subList (1 , inval .size ()));
413+ }
414+ } catch (Exception e ) {
415+ System .out .println ("Error parsing ZMQ message '" + msg + "': " + e .getMessage ());
416+ }
417+ return defaultVal ;
418+ }
419+
420+ /**
421+ * Writes data to a ZMQ port. Prepends [simtime+delta] to match file-based write behavior.
422+ */
423+ public static void write (String portName , String name , Object val , int delta ) {
424+ ZeroMQPort port = zmqPorts .get (portName );
425+ if (port == null ) {
426+ System .err .println ("write: ZMQ port '" + portName + "' not initialized" );
427+ return ;
428+ }
429+ String payload ;
430+ if (val instanceof List ) {
431+ List <?> listVal = (List <?>) val ;
432+ StringBuilder sb = new StringBuilder ("[" );
433+ sb .append (toPythonLiteral (simtime + delta ));
434+ for (Object o : listVal ) {
435+ sb .append (", " );
436+ sb .append (toPythonLiteral (o ));
437+ }
438+ sb .append ("]" );
439+ payload = sb .toString ();
440+ // simtime must not be mutated here
441+ } else if (val instanceof String ) {
442+ payload = (String ) val ;
443+ } else {
444+ System .out .println ("write must have list or str" );
445+ return ;
446+ }
447+ port .sendWithRetry (payload );
448+ }
449+
339450 /**
340451 * Parses a Python-literal string into Java objects using a recursive descent parser.
341452 * Supports: dict, list, int, float, string (single/double quoted), bool, None, nested structures.
@@ -354,6 +465,44 @@ static Object literalEval(String s) {
354465 return result ;
355466 }
356467
468+ /**
469+ * ZMQ socket wrapper with bind/connect, timeouts, and retry.
470+ */
471+ private static class ZeroMQPort {
472+ final ZMQ .Socket socket ;
473+ final String address ;
474+
475+ ZeroMQPort (String portType , String address , int socketType ) {
476+ this .address = address ;
477+ ZMQ .Context ctx = getZmqContext ();
478+ this .socket = ctx .socket (socketType );
479+ this .socket .setReceiveTimeOut (2000 );
480+ this .socket .setSendTimeOut (2000 );
481+ this .socket .setLinger (0 );
482+ if (portType .equals ("bind" )) {
483+ this .socket .bind (address );
484+ } else {
485+ this .socket .connect (address );
486+ }
487+ }
488+
489+ String recvWithRetry () {
490+ for (int attempt = 0 ; attempt < 5 ; attempt ++) {
491+ String msg = socket .recvStr ();
492+ if (msg != null ) return msg ;
493+ try { Thread .sleep (500 ); } catch (InterruptedException e ) { Thread .currentThread ().interrupt (); break ; }
494+ }
495+ return null ;
496+ }
497+
498+ void sendWithRetry (String message ) {
499+ for (int attempt = 0 ; attempt < 5 ; attempt ++) {
500+ if (socket .send (message )) return ;
501+ try { Thread .sleep (500 ); } catch (InterruptedException e ) { Thread .currentThread ().interrupt (); break ; }
502+ }
503+ }
504+ }
505+
357506 /**
358507 * Recursive descent parser for Python literal expressions.
359508 * Handles: dicts, lists, tuples, strings, numbers, booleans, None.
0 commit comments