repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pubsub
Eric Bower  ·  2026-02-03

channel.go

  1package pubsub
  2
  3import (
  4	"iter"
  5	"sync"
  6
  7	"github.com/antoniomika/syncmap"
  8)
  9
 10type ChannelDirection int
 11
 12func (d ChannelDirection) String() string {
 13	return [...]string{"input", "output", "inputoutput"}[d]
 14}
 15
 16const (
 17	ChannelDirectionInput ChannelDirection = iota
 18	ChannelDirectionOutput
 19	ChannelDirectionInputOutput
 20)
 21
 22type ChannelAction int
 23
 24func (d ChannelAction) String() string {
 25	return [...]string{"data", "close"}[d]
 26}
 27
 28const (
 29	ChannelActionData = iota
 30	ChannelActionClose
 31)
 32
 33type ChannelMessage struct {
 34	Data      []byte
 35	ClientID  string
 36	Direction ChannelDirection
 37	Action    ChannelAction
 38}
 39
 40func NewChannel(topic string) *Channel {
 41	return &Channel{
 42		Topic:   topic,
 43		Done:    make(chan struct{}),
 44		Data:    make(chan ChannelMessage),
 45		Clients: syncmap.New[string, *Client](),
 46	}
 47}
 48
 49/*
 50Channel is a container for a topic.  It holds the list of clients and
 51a data channel to receive a message.
 52*/
 53type Channel struct {
 54	Topic       string
 55	Done        chan struct{}
 56	Data        chan ChannelMessage
 57	Clients     *syncmap.Map[string, *Client]
 58	handleOnce  sync.Once
 59	cleanupOnce sync.Once
 60	mu          sync.Mutex
 61	Dispatcher  MessageDispatcher
 62}
 63
 64func (c *Channel) GetClients() iter.Seq2[string, *Client] {
 65	return c.Clients.Range
 66}
 67
 68func (c *Channel) SetDispatcher(d MessageDispatcher) {
 69	c.mu.Lock()
 70	defer c.mu.Unlock()
 71	if c.Dispatcher == nil {
 72		c.Dispatcher = d
 73	}
 74}
 75
 76func (c *Channel) GetDispatcher() MessageDispatcher {
 77	c.mu.Lock()
 78	defer c.mu.Unlock()
 79	return c.Dispatcher
 80}
 81
 82func (c *Channel) Cleanup() {
 83	c.cleanupOnce.Do(func() {
 84		close(c.Done)
 85	})
 86}
 87
 88func (c *Channel) Handle() {
 89	c.handleOnce.Do(func() {
 90		go func() {
 91			defer func() {
 92				for _, client := range c.GetClients() {
 93					client.Cleanup()
 94				}
 95			}()
 96
 97			for {
 98				select {
 99				case <-c.Done:
100					return
101				case data, ok := <-c.Data:
102					if !ok {
103						// Channel is closing, close all client data channels
104						for _, client := range c.GetClients() {
105							client.onceData.Do(func() {
106								close(client.Data)
107							})
108						}
109						return
110					}
111
112					// Collect eligible subscribers
113					subscribers := make([]*Client, 0)
114					for _, client := range c.GetClients() {
115						// Skip input-only clients and senders (unless replay is enabled)
116						if client.Direction == ChannelDirectionInput || (client.ID == data.ClientID && !client.Replay) {
117							continue
118						}
119						subscribers = append(subscribers, client)
120					}
121
122					if len(data.Data) > 0 {
123						// Dispatch message using the configured dispatcher
124						dispatcher := c.GetDispatcher()
125						if dispatcher != nil {
126							_ = dispatcher.Dispatch(data, subscribers, c.Done)
127						}
128					}
129				}
130			}
131		}()
132	})
133}