@@ -489,9 +489,63 @@ public void setUseEmbedded(boolean useEmbedded) {
489489 this .useEmbedded = useEmbedded ;
490490 }
491491
492+ public void publish (String channel , String message ) {
493+ if (useMockFallback ) {
494+ mockRedis .publish (channel , message );
495+ return ;
496+ }
497+
498+ if (!isConnected ()) {
499+ logger .error ("Not connected to Redis server. Cannot publish message." );
500+ return ;
501+ }
502+
503+ try (Jedis jedis = jedisPool .getResource ()) {
504+ jedis .publish (channel , message );
505+ } catch (JedisConnectionException e ) {
506+ logger .error ("Jedis connection error during PUBLISH operation" , e );
507+ connected = false ;
508+ } catch (Exception e ) {
509+ logger .error ("Error publishing message to Redis" , e );
510+ }
511+ }
512+
513+ public void subscribe (String channel , MessageListener messageListener ) {
514+ if (useMockFallback ) {
515+ mockRedis .subscribe (channel , messageListener );
516+ return ;
517+ }
518+
519+ if (!isConnected ()) {
520+ logger .error ("Not connected to Redis server. Cannot subscribe to channel." );
521+ return ;
522+ }
523+
524+ new Thread (() -> {
525+ try (Jedis jedis = jedisPool .getResource ()) {
526+ jedis .subscribe (new JedisPubSub () {
527+ @ Override
528+ public void onMessage (String channel , String message ) {
529+ messageListener .onMessage (channel , message );
530+ }
531+ }, channel );
532+ } catch (JedisConnectionException e ) {
533+ logger .error ("Jedis connection error during SUBSCRIBE operation" , e );
534+ connected = false ;
535+ } catch (Exception e ) {
536+ logger .error ("Error subscribing to Redis channel" , e );
537+ }
538+ }, "Redis-Subscriber-" + channel ).start ();
539+ }
540+
541+ public interface MessageListener {
542+ void onMessage (String channel , String message );
543+ }
544+
492545 public static class MockRedis extends Redis {
493546 private final Map <String , String > dataStore = new HashMap <>();
494547 private boolean running = false ;
548+ private final Map <String , List <MessageListener >> subscribers = new HashMap <>();
495549
496550 public MockRedis () {
497551 super ("mock" , 0 );
@@ -581,5 +635,29 @@ public void connect() {
581635 public void disconnect () {
582636 stopServer ();
583637 }
638+
639+ public void publish (String channel , String message ) {
640+ if (!isRunning ()) {
641+ getLogger ().error ("Mock Redis server is not running. Cannot publish message." );
642+ return ;
643+ }
644+
645+ List <MessageListener > channelSubscribers = subscribers .get (channel );
646+ if (channelSubscribers != null ) {
647+ for (MessageListener listener : channelSubscribers ) {
648+ listener .onMessage (channel , message );
649+ }
650+ }
651+ }
652+
653+ public void subscribe (String channel , MessageListener messageListener ) {
654+ if (!isRunning ()) {
655+ getLogger ().error ("Mock Redis server is not running. Cannot subscribe to channel." );
656+ return ;
657+ }
658+
659+ subscribers .computeIfAbsent (channel , k -> new CopyOnWriteArrayList <>())
660+ .add (messageListener );
661+ }
584662 }
585663}
0 commit comments