Skip to content

Temporal Workflows

pkg/workflows/temporal connects to a Temporal server and manages a worker process for your workflows and activities.

Register workflows and activities inline via WithRegistrar:

lakta.NewRuntime(
config.NewModule(
config.WithConfigDirs(".", "./config"),
config.WithArgs(os.Args[1:]),
),
tint.NewModule(),
slog.NewModule(),
otel.NewModule(),
health.NewModule(),
grpcclient.NewModule(
grpcclient.WithName("data"),
grpcclient.WithClient(v1.NewDataServiceClient),
),
temporal.NewModule(
temporal.WithRegistrar(func(ctx context.Context, w worker.Worker) error {
w.RegisterWorkflow(OrderWorkflow)
w.RegisterActivity(UpdateOrderStatusActivity)
return nil
}),
),
grpcserver.NewModule(
grpcserver.WithService(&v1.WorkflowService_ServiceDesc, NewServer()),
),
)

Workflows are plain Go functions. They may not call external services directly — use activities for that:

const taskQueue = "MY_TASK_QUEUE"
func OrderWorkflow(ctx workflow.Context, orderID string) error {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
return workflow.ExecuteActivity(ctx, UpdateOrderStatusActivity, orderID, "processing").Get(ctx, nil)
}

Activities can access DI via lakta.Invoke[T](ctx):

func UpdateOrderStatusActivity(ctx context.Context, orderID string, status string) error {
client, err := lakta.Invoke[v1.DataServiceClient](ctx)
if err != nil {
return err
}
_, err = client.UpdateOrderStatus(ctx, &v1.UpdateOrderStatusRequest{
OrderId: orderID,
NewStatus: v1.OrderStatus(v1.OrderStatus_value["ORDER_STATUS_"+strings.ToUpper(status)]),
})
return err
}

The Temporal client.Client is available in DI:

func (s *WorkflowServer) StartOrderWorkflow(ctx context.Context, req *v1.StartOrderWorkflowRequest) (*v1.StartOrderWorkflowResponse, error) {
c, err := lakta.Invoke[client.Client](ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
_, err = c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: fmt.Sprintf("order-workflow-%s", req.GetOrderId()),
TaskQueue: taskQueue,
}, OrderWorkflow, req.GetOrderId())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &v1.StartOrderWorkflowResponse{}, nil
}

Config path: modules.workflows.temporal.<name>

target string default: localhost:7233

target specifies the Temporal server's target address for client connections

env LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_TARGET
task_queue required string

taskQueue specifies the Temporal task queue name for workflow and activity execution

env LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_TASK_QUEUE
namespace string default: default

namespace defines the Temporal namespace to be used for client and worker operations

env LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_NAMESPACE
insecure bool

insecure indicates whether transport credentials should be bypassed, enabling an insecure connection

env LAKTA_MODULES_WORKFLOWS_TEMPORAL_<NAME>_INSECURE

These options can only be set in Go code via With*() functions, not via config files or environment variables.

OptionTypeDescription
Credentials(...)credentials.TransportCredentials
WithRegistrar(...)[]temporal.Registraradds a workflow/activity registrar (code-only)