Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultMessageBatchHandlerConfig = &BatchMessageHandlerConfig{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, NumGoroutines: 10, BufferedByteLimit: 10 * pubsub.MaxPublishRequestBytes, }
Functions ¶
This section is empty.
Types ¶
type BatchError ¶
BatchError is used to handle error for each message The key is message id
func (BatchError) Error ¶
func (b BatchError) Error() string
type BatchMessageHandlerConfig ¶
type BatchMessageHandlerConfig struct {
// Process a non-empty batch after this delay has passed.
// Defaults to DefaultMessageBatchHandlerConfig.DelayThreshold.
DelayThreshold time.Duration
// Process a batch when it has this many messages.
// Defaults to DefaultMessageBatchHandlerConfig.CountThreshold.
CountThreshold int
// Process a batch when its size in bytes reaches this value.
// Defaults to DefaultMessageBatchHandlerConfig.ByteThreshold.
ByteThreshold int
// The number of goroutines.
// Defaults to DefaultMessageBatchHandlerConfig.NumGoroutines.
NumGoroutines int
// Defaults to DefaultMessageBatchHandlerConfig.BufferedByteLimit.
BufferedByteLimit int
}
type MessageBatchHandler ¶
MessageBatchHandler defines the batch message handler By default, when non-nil error is returned, all messages are processed as error in MessageHandler To handle error for each message, use BatchError
type MessageHandler ¶
MessageHandler defines the message handler invoked by SubscriptionInterceptor to complete the normal message handling.
func NewBatchMessageHandler ¶
func NewBatchMessageHandler(handler MessageBatchHandler, config BatchMessageHandlerConfig) MessageHandler
NewBatchMessageHandler initializes MessageHandler for batch message processing with config
type MessagePublisher ¶
type MessagePublisher = func(ctx context.Context, topic *pubsub.Publisher, m *pubsub.Message) *pubsub.PublishResult
MessagePublisher defines the message publisher invoked by PublishInterceptor to complete the normal message publishment.
type PublishInterceptor ¶
type PublishInterceptor = func(next MessagePublisher) MessagePublisher
PublishInterceptor provides a hook to intercept the execution of a publishment.
type Publisher ¶
Publisher represents a wrapper of Pub/Sub client focusing on publishment.
func NewPublisher ¶
func NewPublisher(pubsubClient *pubsub.Client, opt ...PublisherOption) *Publisher
NewPublisher initializes new Publisher.
type PublisherOption ¶
type PublisherOption interface {
// contains filtered or unexported methods
}
PublisherOption is a option to change publisher configuration.
func WithPublishInterceptor ¶
func WithPublishInterceptor(interceptors ...PublishInterceptor) PublisherOption
WithPublishInterceptor sets publish interceptors.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents a wrapper of Pub/Sub client mainly focusing on pull subscription.
func NewSubscriber ¶
func NewSubscriber(pubsubClient *pubsub.Client, opt ...SubscriberOption) *Subscriber
NewSubscriber initializes new Subscriber.
func (*Subscriber) Close ¶
func (s *Subscriber) Close()
Close closes running subscriptions gracefully.
func (*Subscriber) HandleSubscriptionFunc ¶
func (s *Subscriber) HandleSubscriptionFunc(subscription *pubsub.Subscriber, f MessageHandler) error
HandleSubscriptionFunc registers subscription handler for the given id's subscription.
func (*Subscriber) HandleSubscriptionFuncMap ¶
func (s *Subscriber) HandleSubscriptionFuncMap(funcMap map[*pubsub.Subscriber]MessageHandler) error
HandleSubscriptionFuncMap registers multiple subscription handlers at once.
func (*Subscriber) Run ¶
func (s *Subscriber) Run(ctx context.Context)
Run starts running registered pull subscriptions.
type SubscriberOption ¶
type SubscriberOption interface {
// contains filtered or unexported methods
}
SubscriberOption is a option to change subscriber configuration.
func WithSubscriptionInterceptor ¶
func WithSubscriptionInterceptor(interceptors ...SubscriptionInterceptor) SubscriberOption
WithSubscriptionInterceptor sets subscriber interceptors.
type SubscriptionInfo ¶
type SubscriptionInfo struct {
SubscriptionID string
}
SubscriptionInfo contains various info about the subscriber.
type SubscriptionInterceptor ¶
type SubscriptionInterceptor = func(info *SubscriptionInfo, next MessageHandler) MessageHandler
SubscriptionInterceptor provides a hook to intercept the execution of a message handling.
type TestServer ¶
TestServer wraps pstest.Server with a pubsub.Client for testing
func NewTestServer ¶
func NewTestServer(ctx context.Context, t *testing.T) *TestServer
NewTestServer creates a new test Pub/Sub server and client
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
middleware
|
|
|
logging/pm_slog
Package pm_slog provides a slog-based logging interceptor for pm subscriptions.
|
Package pm_slog provides a slog-based logging interceptor for pm subscriptions. |