PubSub is a modular, in-memory publish/subscribe (pubsub) system for the Tinh Tinh framework. It enables decoupled communication between different parts of your application using topics and message subscribers.
go get -u github.com/tinh-tinh/pubsub/v2- Central Broker: Manages topics and subscribers.
- Dynamic Subscribers: Subscribe to one or many topics dynamically.
- Asynchronous Messaging: Message delivery is non-blocking.
- Topic Patterns: Supports wildcards and topic delimiters for pattern-based subscriptions.
- Broadcast Support: Broadcast a message to all subscribers.
- Integration with Tinh Tinh Modules: Use dependency injection for broker and subscribers.
- Subscriber Limits: Optionally limit the max number of subscribers.
- Handler Utility: Simplifies listening to topics with concise functions and is the recommended way to consume messages.
import "github.com/tinh-tinh/pubsub/v2"
pubsubModule := pubsub.ForRoot(pubsub.BrokerOptions{
// Optional: MaxSubscribers, Wildcard, Delimiter, etc.
})Subscribe to specific topics:
priceSubModule := pubsub.ForFeature("BTC", "ETH") // subscribe to multiple topicsInstead of using subscriber.GetMessages() or Listener, use the Handler utility for subscribing to topics and consuming messages.
import (
"github.com/tinh-tinh/pubsub/v2"
"github.com/tinh-tinh/tinhtinh/v2/core"
)
type PriceService struct {
Message interface{}
}
priceService := func(module core.Module) core.Provider {
return module.NewProvider(core.ProviderOptions{
Name: "PriceService",
Value: &PriceService{},
})
}
priceHandler := func(module core.Module) core.Provider {
handler := pubsub.NewHandler(module)
priceService := module.Ref("PriceService").(*PriceService)
handler.Listen(func(msg *pubsub.Message) {
priceService.Message = msg.GetContent()
}, "BTC", "ETH", "SOL")
return handler
}controller := func(module core.Module) core.Controller {
ctrl := module.NewController("prices")
ctrl.Post("", func(ctx core.Ctx) error {
broker := pubsub.InjectBroker(module)
go broker.Publish("BTC", "hihi")
return ctx.JSON(core.Map{"data": "ok"})
})
ctrl.Get("", func(ctx core.Ctx) error {
service := module.Ref("PriceService").(*PriceService)
return ctx.JSON(core.Map{"data": service.Message})
})
return ctrl
}We welcome contributions! Please feel free to submit a Pull Request.
If you encounter any issues or need help, you can:
- Open an issue in the GitHub repository
- Check our documentation
- Join our community discussions