Class: Temporalio::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/client.rb,
lib/temporalio/client/schedule.rb,
lib/temporalio/client/connection.rb,
lib/temporalio/client/interceptor.rb,
lib/temporalio/client/schedule_handle.rb,
lib/temporalio/client/workflow_handle.rb,
lib/temporalio/client/connection/service.rb,
lib/temporalio/client/workflow_execution.rb,
lib/temporalio/client/activity_id_reference.rb,
lib/temporalio/client/async_activity_handle.rb,
lib/temporalio/client/workflow_update_handle.rb,
lib/temporalio/client/connection/test_service.rb,
lib/temporalio/client/connection/cloud_service.rb,
lib/temporalio/client/workflow_execution_count.rb,
lib/temporalio/client/workflow_execution_status.rb,
lib/temporalio/client/workflow_update_wait_stage.rb,
lib/temporalio/client/connection/operator_service.rb,
lib/temporalio/client/connection/workflow_service.rb,
lib/temporalio/client/workflow_query_reject_condition.rb

Overview

Client for accessing Temporal.

Most users will use Client.connect to connect a client. The #workflow_service method provides access to a raw gRPC client. To create another client on the same connection, like for a different namespace, #options may be used to get the options as a struct which can then be dup’d, altered, and splatted as kwargs to the constructor (e.g. Client.new(**my_options.to_h)).

Clients are thread-safe and are meant to be reused for the life of the application. They are built to work in both synchronous and asynchronous contexts. Internally they use callbacks based on Queue which means they are Fiber-compatible.

Defined Under Namespace

Modules: Interceptor, WorkflowExecutionStatus, WorkflowQueryRejectCondition, WorkflowUpdateWaitStage Classes: ActivityIDReference, AsyncActivityHandle, Connection, Options, RPCOptions, Schedule, ScheduleHandle, WorkflowExecution, WorkflowExecutionCount, WorkflowHandle, WorkflowUpdateHandle

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection:, namespace:, data_converter: DataConverter.default, interceptors: [], logger: Logger.new($stdout, level: Logger::WARN), default_workflow_query_reject_condition: nil) ⇒ Client

Create a client from an existing connection. Most users will prefer connect instead. Parameters here match Options returned from #options by intention so options can be dup’d, altered, and splatted to create a new client.

Parameters:

  • connection (Connection)

    Existing connection to create a client from.

  • namespace (String)

    Namespace to use for client calls.

  • data_converter (Converters::DataConverter) (defaults to: DataConverter.default)

    Data converter to use for all data conversions to/from payloads.

  • interceptors (Array<Interceptor>) (defaults to: [])

    Set of interceptors that are chained together to allow intercepting of client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker interceptor will be used as worker interceptors too so they should not be given separately when creating a worker.

  • logger (Logger) (defaults to: Logger.new($stdout, level: Logger::WARN))

    Logger to use for this client and any workers made from this client. Defaults to stdout with warn level. Callers setting this logger are responsible for closing it.

  • default_workflow_query_reject_condition (WorkflowQueryRejectCondition, nil) (defaults to: nil)

    Default rejection condition for workflow queries if not set during query. See Temporalio::Client::WorkflowHandle#query for details on the rejection condition.

See Also:



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/temporalio/client.rb', line 141

def initialize(
  connection:,
  namespace:,
  data_converter: DataConverter.default,
  interceptors: [],
  logger: Logger.new($stdout, level: Logger::WARN),
  default_workflow_query_reject_condition: nil
)
  @options = Options.new(
    connection:,
    namespace:,
    data_converter:,
    interceptors:,
    logger:,
    default_workflow_query_reject_condition:
  ).freeze
  # Initialize interceptors
  @impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int|
    int.intercept_client(acc)
  end
end

Instance Attribute Details

#optionsOptions (readonly)

Returns Frozen options for this client which has the same attributes as #initialize.

Returns:

  • (Options)

    Frozen options for this client which has the same attributes as #initialize.



122
123
124
# File 'lib/temporalio/client.rb', line 122

def options
  @options
end

Class Method Details

.connect(target_host, namespace, api_key: nil, tls: false, data_converter: Converters::DataConverter.default, interceptors: [], logger: Logger.new($stdout, level: Logger::WARN), default_workflow_query_reject_condition: nil, rpc_metadata: {}, rpc_retry: Connection::RPCRetryOptions.new, identity: "#{Process.pid}@#{Socket.gethostname}", keep_alive: Connection::KeepAliveOptions.new, http_connect_proxy: nil, runtime: Runtime.default, lazy_connect: false) ⇒ Client

Connect to Temporal server. This is a shortcut for Connection.new followed by Client.new.

Parameters:

  • target_host (String)

    host:port for the Temporal server. For local development, this is often localhost:7233.

  • namespace (String)

    Namespace to use for client calls.

  • api_key (String, nil) (defaults to: nil)

    API key for Temporal. This becomes the Authorization HTTP header with “Bearer ” prepended. This is only set if RPC metadata doesn’t already have an authorization key.

  • tls (Boolean, Connection::TLSOptions) (defaults to: false)

    If false, do not use TLS. If true, use system default TLS options. If TLS options are present, those TLS options will be used.

  • data_converter (Converters::DataConverter) (defaults to: Converters::DataConverter.default)

    Data converter to use for all data conversions to/from payloads.

  • interceptors (Array<Interceptor>) (defaults to: [])

    Set of interceptors that are chained together to allow intercepting of client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker interceptor will be used as worker interceptors too so they should not be given separately when creating a worker.

  • logger (Logger) (defaults to: Logger.new($stdout, level: Logger::WARN))

    Logger to use for this client and any workers made from this client. Defaults to stdout with warn level. Callers setting this logger are responsible for closing it.

  • default_workflow_query_reject_condition (WorkflowQueryRejectCondition, nil) (defaults to: nil)

    Default rejection condition for workflow queries if not set during query. See Temporalio::Client::WorkflowHandle#query for details on the rejection condition.

  • rpc_metadata (Hash<String, String>) (defaults to: {})

    Headers to use for all calls to the server. Keys here can be overriden by per-call RPC metadata keys.

  • rpc_retry (Connection::RPCRetryOptions) (defaults to: Connection::RPCRetryOptions.new)

    Retry options for direct service calls (when opted in) or all high-level calls made by this client (which all opt-in to retries by default).

  • identity (String) (defaults to: "#{Process.pid}@#{Socket.gethostname}")

    Identity for this client.

  • keep_alive (Connection::KeepAliveOptions) (defaults to: Connection::KeepAliveOptions.new)

    Keep-alive options for the client connection. Can be set to nil to disable.

  • http_connect_proxy (Connection::HTTPConnectProxyOptions, nil) (defaults to: nil)

    Options for HTTP CONNECT proxy.

  • runtime (Runtime) (defaults to: Runtime.default)

    Runtime for this client.

  • lazy_connect (Boolean) (defaults to: false)

    If true, the client will not connect until the first call is attempted or a worker is created with it. Lazy clients cannot be used for workers if they have not performed a connection.

Returns:

  • (Client)

    Connected client.

See Also:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/temporalio/client.rb', line 83

def self.connect(
  target_host,
  namespace,
  api_key: nil,
  tls: false,
  data_converter: Converters::DataConverter.default,
  interceptors: [],
  logger: Logger.new($stdout, level: Logger::WARN),
  default_workflow_query_reject_condition: nil,
  rpc_metadata: {},
  rpc_retry: Connection::RPCRetryOptions.new,
  identity: "#{Process.pid}@#{Socket.gethostname}",
  keep_alive: Connection::KeepAliveOptions.new, # Set to nil to disable
  http_connect_proxy: nil,
  runtime: Runtime.default,
  lazy_connect: false
)
  Client.new(
    connection: Connection.new(
      target_host:,
      api_key:,
      tls:,
      rpc_metadata:,
      rpc_retry:,
      identity:,
      keep_alive:,
      http_connect_proxy:,
      runtime:,
      lazy_connect:
    ),
    namespace:,
    data_converter:,
    interceptors:,
    logger:,
    default_workflow_query_reject_condition:
  )
end

Instance Method Details

#async_activity_handle(task_token_or_id_reference) ⇒ AsyncActivityHandle

Get an async activity handle.

Parameters:

  • task_token_or_id_reference (String, ActivityIDReference)

    Task token string or activity ID reference.

Returns:



430
431
432
433
434
435
436
437
438
# File 'lib/temporalio/client.rb', line 430

def async_activity_handle(task_token_or_id_reference)
  if task_token_or_id_reference.is_a?(ActivityIDReference)
    AsyncActivityHandle.new(client: self, task_token: nil, id_reference: task_token_or_id_reference)
  elsif task_token_or_id_reference.is_a?(String)
    AsyncActivityHandle.new(client: self, task_token: task_token_or_id_reference, id_reference: nil)
  else
    raise ArgumentError, 'Must be a string task token or an ActivityIDReference'
  end
end

#connectionConnection

Returns Underlying connection for this client.

Returns:

  • (Connection)

    Underlying connection for this client.



164
165
166
# File 'lib/temporalio/client.rb', line 164

def connection
  @options.connection
end

#count_workflows(query = nil, rpc_options: nil) ⇒ WorkflowExecutionCount

Count workflows.

Parameters:

  • query (String, nil) (defaults to: nil)

    A Temporal visibility list filter.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:

See Also:



361
362
363
# File 'lib/temporalio/client.rb', line 361

def count_workflows(query = nil, rpc_options: nil)
  @impl.count_workflows(Interceptor::CountWorkflowsInput.new(query:, rpc_options:))
end

#create_schedule(id, schedule, trigger_immediately: false, backfills: [], memo: nil, search_attributes: nil, rpc_options: nil) ⇒ ScheduleHandle

Create a schedule and return its handle.

Parameters:

  • id (String)

    Unique identifier of the schedule.

  • schedule (Schedule)

    Schedule to create.

  • trigger_immediately (Boolean) (defaults to: false)

    If true, trigger one action immediately when creating the schedule.

  • backfills (Array<Schedule::Backfill>) (defaults to: [])

    Set of time periods to take actions on as if that time passed right now.

  • memo (Hash<String, Object>, nil) (defaults to: nil)

    Memo for the schedule. Memo for a scheduled workflow is part of the schedule action.

  • search_attributes (SearchAttributes, nil) (defaults to: nil)

    Search attributes for the schedule. Search attributes for a scheduled workflow are part of the scheduled action.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# File 'lib/temporalio/client.rb', line 381

def create_schedule(
  id,
  schedule,
  trigger_immediately: false,
  backfills: [],
  memo: nil,
  search_attributes: nil,
  rpc_options: nil
)
  @impl.create_schedule(Interceptor::CreateScheduleInput.new(
                          id:,
                          schedule:,
                          trigger_immediately:,
                          backfills:,
                          memo:,
                          search_attributes:,
                          rpc_options:
                        ))
end

#data_converterDataConverter

Returns Data converter used by this client.

Returns:

  • (DataConverter)

    Data converter used by this client.



174
175
176
# File 'lib/temporalio/client.rb', line 174

def data_converter
  @options.data_converter
end

#execute_workflow(workflow, *args, id:, task_queue:, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil, start_delay: nil, request_eager_start: false, rpc_options: nil) ⇒ Object

Start a workflow and wait for its result. This is a shortcut for #start_workflow + Temporalio::Client::WorkflowHandle#result.

Parameters:

  • workflow (Class<Workflow::Definition>, Symbol, String)

    Workflow definition class or workflow name.

  • args (Array<Object>)

    Arguments to the workflow.

  • id (String)

    Unique identifier for the workflow execution.

  • task_queue (String)

    Task queue to run the workflow on.

  • execution_timeout (Float, nil) (defaults to: nil)

    Total workflow execution timeout in seconds including retries and continue as new.

  • run_timeout (Float, nil) (defaults to: nil)

    Timeout of a single workflow run in seconds.

  • task_timeout (Float, nil) (defaults to: nil)

    Timeout of a single workflow task in seconds.

  • id_reuse_policy (WorkflowIDReusePolicy) (defaults to: WorkflowIDReusePolicy::ALLOW_DUPLICATE)

    How already-existing IDs are treated.

  • id_conflict_policy (WorkflowIDConflictPolicy) (defaults to: WorkflowIDConflictPolicy::UNSPECIFIED)

    How already-running workflows of the same ID are treated. Default is unspecified which effectively means fail the start attempt. This cannot be set if ‘id_reuse_policy` is set to terminate if running.

  • retry_policy (RetryPolicy, nil) (defaults to: nil)

    Retry policy for the workflow.

  • cron_schedule (String, nil) (defaults to: nil)

    Cron schedule. Users should use schedules instead of this.

  • memo (Hash{String, Symbol => Object}, nil) (defaults to: nil)

    Memo for the workflow.

  • search_attributes (SearchAttributes, nil) (defaults to: nil)

    Search attributes for the workflow.

  • start_delay (Float, nil) (defaults to: nil)

    Amount of time in seconds to wait before starting the workflow. This does not work with ‘cron_schedule`.

  • request_eager_start (Boolean) (defaults to: false)

    Potentially reduce the latency to start this workflow by encouraging the server to start it on a local worker running with this same client. This is currently experimental.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

  • (Object)

    Successful result of the workflow.

Raises:



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/temporalio/client.rb', line 282

def execute_workflow(
  workflow,
  *args,
  id:,
  task_queue:,
  execution_timeout: nil,
  run_timeout: nil,
  task_timeout: nil,
  id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE,
  id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED,
  retry_policy: nil,
  cron_schedule: nil,
  memo: nil,
  search_attributes: nil,
  start_delay: nil,
  request_eager_start: false,
  rpc_options: nil
)
  start_workflow(
    workflow,
    *args,
    id:,
    task_queue:,
    execution_timeout:,
    run_timeout:,
    task_timeout:,
    id_reuse_policy:,
    id_conflict_policy:,
    retry_policy:,
    cron_schedule:,
    memo:,
    search_attributes:,
    start_delay:,
    request_eager_start:,
    rpc_options:
  ).result
end

#list_schedules(query = nil, rpc_options: nil) ⇒ Enumerator<Schedule::List::Description>

List schedules.

Note, this list is eventually consistent. Therefore if a schedule is added or deleted, it may not be available in the list immediately.

Parameters:

  • query (String) (defaults to: nil)

    A Temporal visibility list filter.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:

See Also:



422
423
424
# File 'lib/temporalio/client.rb', line 422

def list_schedules(query = nil, rpc_options: nil)
  @impl.list_schedules(Interceptor::ListSchedulesInput.new(query:, rpc_options:))
end

#list_workflows(query = nil, rpc_options: nil) ⇒ Enumerator<WorkflowExecution>

List workflows.

Parameters:

  • query (String, nil) (defaults to: nil)

    A Temporal visibility list filter.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:

See Also:



347
348
349
# File 'lib/temporalio/client.rb', line 347

def list_workflows(query = nil, rpc_options: nil)
  @impl.list_workflows(Interceptor::ListWorkflowsInput.new(query:, rpc_options:))
end

#namespaceString

Returns Namespace used in calls by this client.

Returns:

  • (String)

    Namespace used in calls by this client.



169
170
171
# File 'lib/temporalio/client.rb', line 169

def namespace
  @options.namespace
end

#operator_serviceConnection::OperatorService

Returns Raw gRPC operator service.

Returns:



184
185
186
# File 'lib/temporalio/client.rb', line 184

def operator_service
  connection.operator_service
end

#schedule_handle(id) ⇒ ScheduleHandle

Get a schedule handle to an existing schedule for the given ID.

Parameters:

  • id (String)

    Schedule ID to get a handle to.

Returns:



405
406
407
# File 'lib/temporalio/client.rb', line 405

def schedule_handle(id)
  ScheduleHandle.new(client: self, id:)
end

#start_workflow(workflow, *args, id:, task_queue:, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil, start_delay: nil, request_eager_start: false, rpc_options: nil) ⇒ WorkflowHandle

Start a workflow and return its handle.

Parameters:

  • workflow (Class<Workflow::Definition>, String, Symbol)

    Workflow definition class or workflow name.

  • args (Array<Object>)

    Arguments to the workflow.

  • id (String)

    Unique identifier for the workflow execution.

  • task_queue (String)

    Task queue to run the workflow on.

  • execution_timeout (Float, nil) (defaults to: nil)

    Total workflow execution timeout in seconds including retries and continue as new.

  • run_timeout (Float, nil) (defaults to: nil)

    Timeout of a single workflow run in seconds.

  • task_timeout (Float, nil) (defaults to: nil)

    Timeout of a single workflow task in seconds.

  • id_reuse_policy (WorkflowIDReusePolicy) (defaults to: WorkflowIDReusePolicy::ALLOW_DUPLICATE)

    How already-existing IDs are treated.

  • id_conflict_policy (WorkflowIDConflictPolicy) (defaults to: WorkflowIDConflictPolicy::UNSPECIFIED)

    How already-running workflows of the same ID are treated. Default is unspecified which effectively means fail the start attempt. This cannot be set if ‘id_reuse_policy` is set to terminate if running.

  • retry_policy (RetryPolicy, nil) (defaults to: nil)

    Retry policy for the workflow.

  • cron_schedule (String, nil) (defaults to: nil)

    Cron schedule. Users should use schedules instead of this.

  • memo (Hash{String, Symbol => Object}, nil) (defaults to: nil)

    Memo for the workflow.

  • search_attributes (SearchAttributes, nil) (defaults to: nil)

    Search attributes for the workflow.

  • start_delay (Float, nil) (defaults to: nil)

    Amount of time in seconds to wait before starting the workflow. This does not work with ‘cron_schedule`.

  • request_eager_start (Boolean) (defaults to: false)

    Potentially reduce the latency to start this workflow by encouraging the server to start it on a local worker running with this same client. This is currently experimental.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/temporalio/client.rb', line 215

def start_workflow(
  workflow,
  *args,
  id:,
  task_queue:,
  execution_timeout: nil,
  run_timeout: nil,
  task_timeout: nil,
  id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE,
  id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED,
  retry_policy: nil,
  cron_schedule: nil,
  memo: nil,
  search_attributes: nil,
  start_delay: nil,
  request_eager_start: false,
  rpc_options: nil
)
  @impl.start_workflow(Interceptor::StartWorkflowInput.new(
                         workflow:,
                         args:,
                         workflow_id: id,
                         task_queue:,
                         execution_timeout:,
                         run_timeout:,
                         task_timeout:,
                         id_reuse_policy:,
                         id_conflict_policy:,
                         retry_policy:,
                         cron_schedule:,
                         memo:,
                         search_attributes:,
                         start_delay:,
                         request_eager_start:,
                         headers: {},
                         rpc_options:
                       ))
end

#workflow_handle(workflow_id, run_id: nil, first_execution_run_id: nil) ⇒ WorkflowHandle

Get a workflow handle to an existing workflow by its ID.

Parameters:

  • workflow_id (String)

    Workflow ID to get a handle to.

  • run_id (String, nil) (defaults to: nil)

    Run ID that will be used for all calls. Many choose to leave this unset which ensures interactions occur on the latest of the workflow ID.

  • first_execution_run_id (String, nil) (defaults to: nil)

    First execution run ID used for some calls like cancellation and termination to ensure the affected workflow is only within the same chain as this given run ID.

Returns:



329
330
331
332
333
334
335
# File 'lib/temporalio/client.rb', line 329

def workflow_handle(
  workflow_id,
  run_id: nil,
  first_execution_run_id: nil
)
  WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
end

#workflow_serviceConnection::WorkflowService

Returns Raw gRPC workflow service.

Returns:



179
180
181
# File 'lib/temporalio/client.rb', line 179

def workflow_service
  connection.workflow_service
end