Eric Bower
·
2026-02-03
multicast.go
1package pubsub
2
3import (
4 "context"
5 "errors"
6 "io"
7 "iter"
8 "log/slog"
9
10 "github.com/antoniomika/syncmap"
11)
12
13/*
14Multicast is a flexible, bidirectional broker.
15
16It provides the most pure version of our PubSub interface which lets
17end-developers build one-to-many connections between publishers and
18subscribers and vice versa.
19
20It doesn't provide any topic filtering capabilities and is only
21concerned with sending data to and from an `io.ReadWriter` via our
22channels.
23*/
24type Multicast struct {
25 Broker
26 Logger *slog.Logger
27}
28
29func NewMulticast(logger *slog.Logger) *Multicast {
30 return &Multicast{
31 Logger: logger,
32 Broker: &BaseBroker{
33 Channels: syncmap.New[string, *Channel](),
34 Logger: logger.With(slog.Bool("broker", true)),
35 },
36 }
37}
38
39func (p *Multicast) getClients(direction ChannelDirection) iter.Seq2[string, *Client] {
40 return func(yield func(string, *Client) bool) {
41 for clientID, client := range p.GetClients() {
42 if client.Direction == direction {
43 yield(clientID, client)
44 }
45 }
46 }
47}
48
49func (p *Multicast) GetPipes() iter.Seq2[string, *Client] {
50 return p.getClients(ChannelDirectionInputOutput)
51}
52
53func (p *Multicast) GetPubs() iter.Seq2[string, *Client] {
54 return p.getClients(ChannelDirectionInput)
55}
56
57func (p *Multicast) GetSubs() iter.Seq2[string, *Client] {
58 return p.getClients(ChannelDirectionOutput)
59}
60
61func (p *Multicast) connect(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, direction ChannelDirection, blockWrite bool, replay, keepAlive bool, dispatcher MessageDispatcher) (error, error) {
62 client := NewClient(ID, rw, direction, blockWrite, replay, keepAlive)
63
64 // Set dispatcher on all channels (only if not already set)
65 for _, ch := range channels {
66 ch.SetDispatcher(dispatcher)
67 }
68
69 go func() {
70 <-ctx.Done()
71 client.Cleanup()
72 }()
73
74 return p.Connect(client, channels)
75}
76
77func (p *Multicast) Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error) {
78 return p.connect(ctx, ID, rw, channels, ChannelDirectionInputOutput, false, replay, false, &MulticastDispatcher{})
79}
80
81func (p *Multicast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error {
82 return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionInput, blockWrite, false, false, &MulticastDispatcher{}))
83}
84
85func (p *Multicast) Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error {
86 return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionOutput, false, false, keepAlive, &MulticastDispatcher{}))
87}
88
89var _ PubSub = (*Multicast)(nil)