repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pubsub
Eric Bower  ·  2026-02-03

dispatcher.go

 1package pubsub
 2
 3import (
 4	"slices"
 5	"strings"
 6	"sync"
 7)
 8
 9// MessageDispatcher defines how messages are dispatched to subscribers.
10type MessageDispatcher interface {
11	// Dispatch sends a message to the appropriate subscriber(s).
12	// It receives the message, all subscribers, and the channel's sync primitives.
13	Dispatch(msg ChannelMessage, subscribers []*Client, channelDone chan struct{}) error
14}
15
16// MulticastDispatcher sends each message to all eligible subscribers.
17type MulticastDispatcher struct{}
18
19func (d *MulticastDispatcher) Dispatch(msg ChannelMessage, subscribers []*Client, channelDone chan struct{}) error {
20	var wg sync.WaitGroup
21	for _, client := range subscribers {
22		wg.Add(1)
23		go func(cl *Client) {
24			defer wg.Done()
25			select {
26			case cl.Data <- msg:
27			case <-cl.Done:
28			case <-channelDone:
29			}
30		}(client)
31	}
32	wg.Wait()
33	return nil
34}
35
36/*
37RoundRobin is a load-balancing broker that distributes published messages
38to subscribers using a round-robin algorithm.
39
40Unlike Multicast which sends each message to all subscribers, RoundRobin
41sends each message to exactly one subscriber, rotating through the available
42subscribers for each published message. This provides load balancing for
43message processing.
44
45It maintains independent round-robin state per channel/topic.
46*/
47type RoundRobinDispatcher struct {
48	index uint32
49	mu    sync.Mutex
50}
51
52func (d *RoundRobinDispatcher) Dispatch(msg ChannelMessage, subscribers []*Client, channelDone chan struct{}) error {
53	// If no subscribers, nothing to dispatch
54	// BlockWrite behavior at publish time ensures subscribers are present when needed
55	if len(subscribers) == 0 {
56		return nil
57	}
58
59	slices.SortFunc(subscribers, func(a, b *Client) int {
60		return strings.Compare(a.ID, b.ID)
61	})
62
63	// Select the next subscriber in round-robin order
64	d.mu.Lock()
65	selectedIdx := int(d.index % uint32(len(subscribers)))
66	d.index++
67	d.mu.Unlock()
68
69	selectedClient := subscribers[selectedIdx]
70
71	select {
72	case selectedClient.Data <- msg:
73	case <-selectedClient.Done:
74	case <-channelDone:
75	}
76
77	return nil
78}