Temporal Workflows
pkg/workflows/temporal connects to a Temporal server and manages a worker process for your workflows and activities.
Register workflows and activities inline via WithRegistrar:
lakta.NewRuntime( config.NewModule( config.WithConfigDirs(".", "./config"), config.WithArgs(os.Args[1:]), ), tint.NewModule(), slog.NewModule(), otel.NewModule(), health.NewModule(), grpcclient.NewModule( grpcclient.WithName("data"), grpcclient.WithClient(v1.NewDataServiceClient), ), temporal.NewModule( temporal.WithRegistrar(func(ctx context.Context, w worker.Worker) error { w.RegisterWorkflow(OrderWorkflow) w.RegisterActivity(UpdateOrderStatusActivity) return nil }), ), grpcserver.NewModule( grpcserver.WithService(&v1.WorkflowService_ServiceDesc, NewServer()), ),)Workflows
Section titled “Workflows”Workflows are plain Go functions. They may not call external services directly — use activities for that:
const taskQueue = "MY_TASK_QUEUE"
func OrderWorkflow(ctx workflow.Context, orderID string) error { ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second} ctx = workflow.WithActivityOptions(ctx, ao) return workflow.ExecuteActivity(ctx, UpdateOrderStatusActivity, orderID, "processing").Get(ctx, nil)}Activities
Section titled “Activities”Activities can access DI via lakta.Invoke[T](ctx):
func UpdateOrderStatusActivity(ctx context.Context, orderID string, status string) error { client, err := lakta.Invoke[v1.DataServiceClient](ctx) if err != nil { return err } _, err = client.UpdateOrderStatus(ctx, &v1.UpdateOrderStatusRequest{ OrderId: orderID, NewStatus: v1.OrderStatus(v1.OrderStatus_value["ORDER_STATUS_"+strings.ToUpper(status)]), }) return err}Starting a workflow from a gRPC handler
Section titled “Starting a workflow from a gRPC handler”The Temporal client.Client is available in DI:
func (s *WorkflowServer) StartOrderWorkflow(ctx context.Context, req *v1.StartOrderWorkflowRequest) (*v1.StartOrderWorkflowResponse, error) { c, err := lakta.Invoke[client.Client](ctx) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }
_, err = c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ ID: fmt.Sprintf("order-workflow-%s", req.GetOrderId()), TaskQueue: taskQueue, }, OrderWorkflow, req.GetOrderId()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }
return &v1.StartOrderWorkflowResponse{}, nil}Configuration Reference
Section titled “Configuration Reference”Config path: modules.workflows.temporal.<name>
target target specifies the Temporal server's target address for client connections
LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_TARGET task_queue required taskQueue specifies the Temporal task queue name for workflow and activity execution
LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_TASK_QUEUE namespace namespace defines the Temporal namespace to be used for client and worker operations
LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_NAMESPACE insecure insecure indicates whether transport credentials should be bypassed, enabling an insecure connection
LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_INSECURE Code-only options
Section titled “Code-only options”These options can only be set in Go code via With*() functions, not via config files or environment variables.
| Option | Type | Description |
|---|---|---|
Credentials(...) | credentials.TransportCredentials | |
WithRegistrar(...) | []temporal.Registrar | adds a workflow/activity registrar (code-only) |