event

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2025 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRendezvousExists   = errors.New("rendezvous already exists")
	ErrRendezvousNotFound = errors.New("rendezvous not found")
)

Functions

func EventIDString

func EventIDString(id *eventpb.EventId) string

func MustGenerateEventID

func MustGenerateEventID() *eventpb.EventId

Types

type Bus

type Bus[Key, Event any] struct {
	// contains filtered or unexported fields
}

func NewBus

func NewBus[Key, Event any]() *Bus[Key, Event]

func (*Bus[Key, Event]) AddHandler

func (b *Bus[Key, Event]) AddHandler(h Handler[Key, Event])

func (*Bus[Key, Event]) OnEvent

func (b *Bus[Key, Event]) OnEvent(key Key, e Event)

type Forwarder

type Forwarder interface {
	ForwardUserEvents(ctx context.Context, events ...*eventpb.UserEvent) error
}

func NewForwardingClient

func NewForwardingClient(log *zap.Logger, events Store, currentRpcApiKey string) Forwarder

type ForwardingClient

type ForwardingClient struct {
	// contains filtered or unexported fields
}

func (*ForwardingClient) ForwardUserEvents

func (c *ForwardingClient) ForwardUserEvents(ctx context.Context, events ...*eventpb.UserEvent) error

todo: duplicated code with ForwardingClient

type Handler

type Handler[Key, Event any] interface {
	OnEvent(key Key, e Event)
}

type HandlerFunc

type HandlerFunc[Key, Event any] func(Key, Event)

HandlerFunc is an adapter to allow the use of ordinary functions as Handlers.

func (HandlerFunc[Key, Event]) OnEvent

func (f HandlerFunc[Key, Event]) OnEvent(key Key, e Event)

OnEvent calls f(key, e).

type KeyAndEvent

type KeyAndEvent[Key, Event any] struct {
	Key   Key
	Event Event
}

type ProtoEventStream

type ProtoEventStream[E any, P proto.Message] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewProtoEventStream

func NewProtoEventStream[E any, P proto.Message](
	id string,
	bufferSize int,
	selector func(event E) (P, bool),
) *ProtoEventStream[E, P]

func (*ProtoEventStream[E, P]) Channel

func (s *ProtoEventStream[E, P]) Channel() <-chan P

func (*ProtoEventStream[E, P]) Close

func (s *ProtoEventStream[E, P]) Close()

func (*ProtoEventStream[E, P]) ID

func (s *ProtoEventStream[E, P]) ID() string

func (*ProtoEventStream[E, P]) Notify

func (s *ProtoEventStream[E, P]) Notify(event E, timeout time.Duration) error

type Rendezvous

type Rendezvous struct {
	Key       string
	Address   string
	ExpiresAt time.Time
}

func (*Rendezvous) Clone

func (r *Rendezvous) Clone() *Rendezvous

type Server

type Server struct {
	eventpb.UnimplementedEventStreamingServer
	// contains filtered or unexported fields
}

func NewServer

func NewServer(
	log *zap.Logger,
	authz auth.Authorizer,
	accounts account.Store,
	events Store,
	eventBus *Bus[*commonpb.UserId, *eventpb.Event],
	staleEventDetectorCtors []StaleEventDetectorCtor[*eventpb.Event],
	broadcastAddress string,
	currentRpcApiKey string,
) *Server

func (*Server) ForwardEvents

func (*Server) ForwardUserEvents

func (s *Server) ForwardUserEvents(ctx context.Context, events ...*eventpb.UserEvent) error

todo: duplicated code with ForwardingClient todo: utilize batching by receiver to optimize internal forwarding RPC calls

func (*Server) OnEvent

func (s *Server) OnEvent(userID *commonpb.UserId, e *eventpb.Event)

type StaleEventDetector

type StaleEventDetector[Event any] interface {
	ShouldDrop(event Event) bool
}

type StaleEventDetectorCtor

type StaleEventDetectorCtor[Event any] func() StaleEventDetector[Event]

type Store

type Store interface {
	// CreateRendezvous creates a new rendezvous for an event stream
	CreateRendezvous(ctx context.Context, rendezvous *Rendezvous) error

	// GetRendezvous gets an event stream rendezvous for a given key
	GetRendezvous(ctx context.Context, key string) (*Rendezvous, error)

	// ExtendRendezvousxpiry extends a rendezvous' expiry for a given key and address
	ExtendRendezvousExpiry(ctx context.Context, key, address string, expiresAt time.Time) error

	// DeleteRendezvous deletes an event stream rendezvous for a given key and address
	DeleteRendezvous(ctx context.Context, key, address string) error
}

type Stream

type Stream[E any] interface {
	ID() string
	Notify(event E, timeout time.Duration) error
	Close()
}

type TestEventObserver

type TestEventObserver[Key, Event any] struct {
	// contains filtered or unexported fields
}

func NewTestEventObserver

func NewTestEventObserver[Key, Event any]() *TestEventObserver[Key, Event]

func (*TestEventObserver[Key, Event]) GetEvents

func (h *TestEventObserver[Key, Event]) GetEvents(filter func(Key) bool) []*KeyAndEvent[Key, Event]

func (*TestEventObserver[Key, Event]) OnEvent

func (h *TestEventObserver[Key, Event]) OnEvent(key Key, event Event)

func (*TestEventObserver[Key, Event]) Reset

func (h *TestEventObserver[Key, Event]) Reset()

func (*TestEventObserver[Key, Event]) WaitFor

func (h *TestEventObserver[Key, Event]) WaitFor(t *testing.T, condition func([]*KeyAndEvent[Key, Event]) bool)

func (*TestEventObserver[Key, Event]) WaitForWithTimeout

func (h *TestEventObserver[Key, Event]) WaitForWithTimeout(t *testing.T, timeout time.Duration, condition func([]*KeyAndEvent[Key, Event]) bool)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL