client

package
v0.73.98 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2025 License: MIT Imports: 37 Imported by: 5

Documentation

Index

Constants

View Source
const (
	DefaultActionListenerRetryInterval = 5 * time.Second
	DefaultActionListenerRetryCount    = 5
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action struct {
	// the worker id
	WorkerId string `json:"workerId"`

	// the tenant id
	TenantId string `json:"tenantId"`

	// the workflow run id
	WorkflowRunId string `json:"workflowRunId"`

	// the get group key run id
	GetGroupKeyRunId string `json:"getGroupKeyRunId"`

	// the job id
	JobId string `json:"jobId"`

	// the job name
	JobName string `json:"jobName"`

	// the job run id
	JobRunId string `json:"jobRunId"`

	// the step id
	StepId string `json:"stepId"`

	// the step name
	StepName string `json:"stepName"`

	// the step run id
	StepRunId string `json:"stepRunId"`

	// the action id
	ActionId string `json:"actionId"`

	// the action payload
	ActionPayload []byte `json:"actionPayload"`

	// the action type
	ActionType ActionType `json:"actionType"`

	// the count of the retry attempt
	RetryCount int32 `json:"retryCount"`

	// the additional metadata for the workflow run
	AdditionalMetadata map[string]string

	// the child index for the workflow run
	ChildIndex *int32

	// the child key for the workflow run
	ChildKey *string

	// the parent workflow run id
	ParentWorkflowRunId *string

	Priority int32 `json:"priority,omitempty"`

	WorkflowId *string `json:"workflowId,omitempty"`

	WorkflowVersionId *string `json:"workflowVersionId,omitempty"`
}

type ActionEvent

type ActionEvent struct {
	*Action

	// the event timestamp
	EventTimestamp *time.Time

	// the step event type
	EventType ActionEventType

	// The event payload. This must be JSON-compatible as it gets marshalled to a JSON string.
	EventPayload interface{}

	// If this is an error, whether to retry on failure
	ShouldNotRetry *bool
}

type ActionEventResponse

type ActionEventResponse struct {
	// the tenant id
	TenantId string

	// the id of the worker
	WorkerId string
}

type ActionEventType

type ActionEventType string
const (
	ActionEventTypeUnknown   ActionEventType = "STEP_EVENT_TYPE_UNKNOWN"
	ActionEventTypeStarted   ActionEventType = "STEP_EVENT_TYPE_STARTED"
	ActionEventTypeCompleted ActionEventType = "STEP_EVENT_TYPE_COMPLETED"
	ActionEventTypeFailed    ActionEventType = "STEP_EVENT_TYPE_FAILED"
)

type ActionPayload

type ActionPayload func(target interface{}) error

ActionPayload unmarshals the action payload into the target. It also validates the resulting target.

type ActionType

type ActionType string
const (
	ActionTypeStartStepRun     ActionType = "START_STEP_RUN"
	ActionTypeCancelStepRun    ActionType = "CANCEL_STEP_RUN"
	ActionTypeStartGetGroupKey ActionType = "START_GET_GROUP_KEY"
)

type AdminClient

type AdminClient interface {
	PutWorkflow(workflow *types.Workflow, opts ...PutOptFunc) error
	PutWorkflowV1(workflow *v1contracts.CreateWorkflowVersionRequest, opts ...PutOptFunc) error

	ScheduleWorkflow(workflowName string, opts ...ScheduleOptFunc) error

	// RunWorkflow triggers a workflow run and returns the run id
	RunWorkflow(workflowName string, input interface{}, opts ...RunOptFunc) (*Workflow, error)

	BulkRunWorkflow(workflows []*WorkflowRun) ([]string, error)

	RunChildWorkflow(workflowName string, input interface{}, opts *ChildWorkflowOpts) (string, error)
	RunChildWorkflows(workflows []*RunChildWorkflowsOpts) ([]string, error)

	PutRateLimit(key string, opts *types.RateLimitOpts) error
}

type BulkPushOpFunc

type BulkPushOpFunc func(*eventcontracts.BulkPushEventRequest) error

type ChildWorkflowOpts

type ChildWorkflowOpts struct {
	ParentId           string
	ParentStepRunId    string
	ChildIndex         int
	ChildKey           *string
	DesiredWorkerId    *string
	AdditionalMetadata *map[string]string
	Priority           *int32
}

type Client

type Client interface {
	Admin() AdminClient
	Cron() CronClient
	Schedule() ScheduleClient
	Dispatcher() DispatcherClient
	Event() EventClient
	Subscribe() SubscribeClient
	API() *rest.ClientWithResponses
	CloudAPI() *cloudrest.ClientWithResponses
	Logger() *zerolog.Logger
	TenantId() string
	Namespace() string
	CloudRegisterID() *string
	RunnableActions() []string
}

func New

func New(fs ...ClientOpt) (Client, error)

New creates a new client instance.

func NewFromConfigFile

func NewFromConfigFile(cf *client.ClientConfigFile, fs ...ClientOpt) (Client, error)

type ClientEventListener

type ClientEventListener interface {
	OnWorkflowEvent(ctx context.Context, event *WorkflowEvent) error
}

type ClientOpt

type ClientOpt func(*ClientOpts)

func InitWorkflows

func InitWorkflows() ClientOpt

func WithHostPort

func WithHostPort(host string, port int) ClientOpt

func WithLogLevel deprecated

func WithLogLevel(lvl string) ClientOpt

Deprecated: use WithLogger instead

func WithLogger

func WithLogger(l *zerolog.Logger) ClientOpt

func WithNamespace

func WithNamespace(namespace string) ClientOpt

func WithSharedMeta

func WithSharedMeta(meta map[string]string) ClientOpt

func WithTenantId

func WithTenantId(tenantId string) ClientOpt

func WithToken

func WithToken(token string) ClientOpt

type ClientOpts

type ClientOpts struct {
	// contains filtered or unexported fields
}

type CronClient

type CronClient interface {
	// Create creates a new cron trigger
	Create(ctx context.Context, workflow string, opts *CronOpts) (*gen.CronWorkflows, error)

	// Delete deletes a cron trigger
	Delete(ctx context.Context, id string) error

	// List lists all cron triggers
	List(ctx context.Context) (*gen.CronWorkflowsList, error)
}

func NewCronClient

func NewCronClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (CronClient, error)

type CronOpts

type CronOpts struct {
	// Name is the user-friendly name for the cron trigger
	Name string

	// Expression is the cron expression for the trigger
	Expression string

	// Input is the input to the workflow
	Input map[string]interface{}

	// AdditionalMetadata is additional metadata to be stored with the cron trigger
	AdditionalMetadata map[string]string

	// Priority is the priority of the run triggered by the cron
	Priority *int32
}

type DedupeViolationErr

type DedupeViolationErr struct {
	// contains filtered or unexported fields
}

func (*DedupeViolationErr) Error

func (d *DedupeViolationErr) Error() string

type DispatcherClient

type DispatcherClient interface {
	GetActionListener(ctx context.Context, req *GetActionListenerRequest) (WorkerActionListener, *string, error)

	SendStepActionEvent(ctx context.Context, in *ActionEvent) (*ActionEventResponse, error)

	SendGroupKeyActionEvent(ctx context.Context, in *ActionEvent) (*ActionEventResponse, error)

	ReleaseSlot(ctx context.Context, stepRunId string) error

	RefreshTimeout(ctx context.Context, stepRunId string, incrementTimeoutBy string) error

	UpsertWorkerLabels(ctx context.Context, workerId string, labels map[string]interface{}) error

	RegisterDurableEvent(ctx context.Context, req *sharedcontracts.RegisterDurableEventRequest) (*sharedcontracts.RegisterDurableEventResponse, error)
}

type DurableEvent

type DurableEvent *contracts.DurableEvent

type DurableEventHandler

type DurableEventHandler func(e DurableEvent) error

type DurableEventsListener

type DurableEventsListener struct {
	// contains filtered or unexported fields
}

func (*DurableEventsListener) AddSignal

func (l *DurableEventsListener) AddSignal(
	taskId string,
	signalKey string,
	handler DurableEventHandler,
) error

func (*DurableEventsListener) Close

func (l *DurableEventsListener) Close() error

func (*DurableEventsListener) Listen

func (l *DurableEventsListener) Listen(ctx context.Context) error

type EventClient

type EventClient interface {
	Push(ctx context.Context, eventKey string, payload interface{}, options ...PushOpFunc) error

	BulkPush(ctx context.Context, payloads []EventWithAdditionalMetadata, options ...BulkPushOpFunc) error

	PutLog(ctx context.Context, stepRunId, msg string) error

	PutStreamEvent(ctx context.Context, stepRunId string, message []byte, options ...StreamEventOption) error
}

type EventWithAdditionalMetadata

type EventWithAdditionalMetadata struct {
	Event              interface{}       `json:"event"`
	AdditionalMetadata map[string]string `json:"metadata"`
	Key                string            `json:"key"`
	Priority           *int32            `json:"priority"`
	Scope              *string           `json:"scope"`
}

type GetActionListenerRequest

type GetActionListenerRequest struct {
	WorkerName string
	Services   []string
	Actions    []string
	MaxRuns    *int
	Labels     map[string]interface{}
	WebhookId  *string
}

type ListenerStrategy

type ListenerStrategy string
const (
	ListenerStrategyV1 ListenerStrategy = "v1"
	ListenerStrategyV2 ListenerStrategy = "v2"
)

type PushOpFunc

type PushOpFunc func(*pushOpt) error

func WithEventMetadata

func WithEventMetadata(metadata map[string]string) PushOpFunc

func WithEventPriority

func WithEventPriority(priority *int32) PushOpFunc

func WithFilterScope

func WithFilterScope(scope *string) PushOpFunc

type PutOptFunc

type PutOptFunc func(*putOpts)

type RunChildWorkflowsOpts

type RunChildWorkflowsOpts struct {
	WorkflowName string
	Input        interface{}
	Opts         *ChildWorkflowOpts
}

type RunHandler

type RunHandler func(event WorkflowEvent) error

type RunOptFunc

func WithPriority

func WithPriority(priority int32) RunOptFunc

func WithRunMetadata

func WithRunMetadata(metadata interface{}) RunOptFunc

type ScheduleClient

type ScheduleClient interface {
	// Create creates a new scheduled workflow run
	Create(ctx context.Context, workflow string, opts *ScheduleOpts) (*gen.ScheduledWorkflows, error)

	// Delete deletes a scheduled workflow run
	Delete(ctx context.Context, id string) error

	// List lists all scheduled workflow runs
	List(ctx context.Context) (*gen.ScheduledWorkflowsList, error)
}

func NewScheduleClient

func NewScheduleClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (ScheduleClient, error)

type ScheduleOptFunc

type ScheduleOptFunc func(*scheduleOpts)

func WithInput

func WithInput(input any) ScheduleOptFunc

func WithSchedules

func WithSchedules(schedules ...time.Time) ScheduleOptFunc

type ScheduleOpts

type ScheduleOpts struct {
	// TriggerAt is the time at which the scheduled run should be triggered
	TriggerAt time.Time

	// Input is the input to the workflow
	Input map[string]interface{}

	// AdditionalMetadata is additional metadata to be stored with the cron trigger
	AdditionalMetadata map[string]string

	Priority *int32 `json:"priority,omitempty"`
}

type StreamEvent

type StreamEvent struct {
	Message []byte
}

type StreamEventOption

type StreamEventOption func(*streamEventOpts)

func WithStreamEventIndex

func WithStreamEventIndex(index int64) StreamEventOption

type StreamHandler

type StreamHandler func(event StreamEvent) error

type SubscribeClient

type SubscribeClient interface {
	On(ctx context.Context, workflowRunId string, handler RunHandler) error

	Stream(ctx context.Context, workflowRunId string, handler StreamHandler) error

	StreamByAdditionalMetadata(ctx context.Context, key string, value string, handler StreamHandler) error

	SubscribeToWorkflowRunEvents(ctx context.Context) (*WorkflowRunsListener, error)

	ListenForDurableEvents(ctx context.Context) (*DurableEventsListener, error)
}

type WorkerActionListener

type WorkerActionListener interface {
	Actions(ctx context.Context) (<-chan *Action, <-chan error, error)

	Unregister() error
}

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow represents a running workflow instance and provides methods to retrieve its results.

The workflow listener uses a multi-layer best-effort retry strategy to handle transient failures and provides robust recovery from temporary connection issues like brief DB downtime or network interruptions without requiring manual intervention.

func NewWorkflow

func NewWorkflow(
	workflowRunId string,
	listener *WorkflowRunsListener,
) *Workflow

func (*Workflow) Result

func (c *Workflow) Result() (*WorkflowResult, error)

Result waits for the workflow run to complete and returns the results.

Retry strategy (best-effort): 1. This function retries AddWorkflowRun up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals 2. AddWorkflowRun calls retrySend which retries up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals 3. Each retrySend attempt calls retrySubscribe which itself retries up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals

func (*Workflow) RunId

func (r *Workflow) RunId() string

func (*Workflow) WorkflowRunId deprecated

func (r *Workflow) WorkflowRunId() string

Deprecated: Use RunId instead

type WorkflowResult

type WorkflowResult struct {
	// contains filtered or unexported fields
}

func (*WorkflowResult) Results

func (r *WorkflowResult) Results() (interface{}, error)

Results returns a map of all step outputs from the workflow run.

Note: This method operates on an already-fetched WorkflowResult. The retry logic is handled by Workflow.Result() which obtains the WorkflowResult.

func (*WorkflowResult) StepOutput

func (r *WorkflowResult) StepOutput(key string, v interface{}) error

type WorkflowRun

type WorkflowRun struct {
	Name    string
	Input   interface{}
	Options []RunOptFunc
}

type WorkflowRunEventHandler

type WorkflowRunEventHandler func(event WorkflowRunEvent) error

type WorkflowRunsListener

type WorkflowRunsListener struct {
	// contains filtered or unexported fields
}

func (*WorkflowRunsListener) AddWorkflowRun

func (l *WorkflowRunsListener) AddWorkflowRun(
	workflowRunId, sessionId string,
	handler WorkflowRunEventHandler,
) error

func (*WorkflowRunsListener) Close

func (l *WorkflowRunsListener) Close() error

func (*WorkflowRunsListener) Listen

func (l *WorkflowRunsListener) Listen(ctx context.Context) error

func (*WorkflowRunsListener) RemoveWorkflowRun

func (l *WorkflowRunsListener) RemoveWorkflowRun(
	workflowRunId, sessionId string,
)

Directories

Path Synopsis
cloud
rest
Package rest provides primitives to interact with the openapi HTTP API.
Package rest provides primitives to interact with the openapi HTTP API.
Package rest provides primitives to interact with the openapi HTTP API.
Package rest provides primitives to interact with the openapi HTTP API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL