repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pubsub
Eric Bower  ·  2026-02-03

multicast.go

 1package pubsub
 2
 3import (
 4	"context"
 5	"errors"
 6	"io"
 7	"iter"
 8	"log/slog"
 9
10	"github.com/antoniomika/syncmap"
11)
12
13/*
14Multicast is a flexible, bidirectional broker.
15
16It provides the most pure version of our PubSub interface which lets
17end-developers build one-to-many connections between publishers and
18subscribers and vice versa.
19
20It doesn't provide any topic filtering capabilities and is only
21concerned with sending data to and from an `io.ReadWriter` via our
22channels.
23*/
24type Multicast struct {
25	Broker
26	Logger *slog.Logger
27}
28
29func NewMulticast(logger *slog.Logger) *Multicast {
30	return &Multicast{
31		Logger: logger,
32		Broker: &BaseBroker{
33			Channels: syncmap.New[string, *Channel](),
34			Logger:   logger.With(slog.Bool("broker", true)),
35		},
36	}
37}
38
39func (p *Multicast) getClients(direction ChannelDirection) iter.Seq2[string, *Client] {
40	return func(yield func(string, *Client) bool) {
41		for clientID, client := range p.GetClients() {
42			if client.Direction == direction {
43				yield(clientID, client)
44			}
45		}
46	}
47}
48
49func (p *Multicast) GetPipes() iter.Seq2[string, *Client] {
50	return p.getClients(ChannelDirectionInputOutput)
51}
52
53func (p *Multicast) GetPubs() iter.Seq2[string, *Client] {
54	return p.getClients(ChannelDirectionInput)
55}
56
57func (p *Multicast) GetSubs() iter.Seq2[string, *Client] {
58	return p.getClients(ChannelDirectionOutput)
59}
60
61func (p *Multicast) connect(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, direction ChannelDirection, blockWrite bool, replay, keepAlive bool, dispatcher MessageDispatcher) (error, error) {
62	client := NewClient(ID, rw, direction, blockWrite, replay, keepAlive)
63
64	// Set dispatcher on all channels (only if not already set)
65	for _, ch := range channels {
66		ch.SetDispatcher(dispatcher)
67	}
68
69	go func() {
70		<-ctx.Done()
71		client.Cleanup()
72	}()
73
74	return p.Connect(client, channels)
75}
76
77func (p *Multicast) Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error) {
78	return p.connect(ctx, ID, rw, channels, ChannelDirectionInputOutput, false, replay, false, &MulticastDispatcher{})
79}
80
81func (p *Multicast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error {
82	return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionInput, blockWrite, false, false, &MulticastDispatcher{}))
83}
84
85func (p *Multicast) Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error {
86	return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionOutput, false, false, keepAlive, &MulticastDispatcher{}))
87}
88
89var _ PubSub = (*Multicast)(nil)