Class: Temporal::Workflow::TaskProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/temporal/workflow/task_processor.rb

Constant Summary collapse

MAX_FAILED_ATTEMPTS =
1

Instance Method Summary collapse

Constructor Details

#initialize(task, namespace, workflow_lookup, client, middleware_chain) ⇒ TaskProcessor

Returns a new instance of TaskProcessor.



12
13
14
15
16
17
18
19
20
21
# File 'lib/temporal/workflow/task_processor.rb', line 12

def initialize(task, namespace, workflow_lookup, client, middleware_chain)
  @task = task
  @namespace = namespace
  @metadata = Metadata.generate(Metadata::WORKFLOW_TASK_TYPE, task, namespace)
  @task_token = task.task_token
  @workflow_name = task.workflow_type.name
  @workflow_class = workflow_lookup.find(workflow_name)
  @client = client
  @middleware_chain = middleware_chain
end

Instance Method Details

#processObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/temporal/workflow/task_processor.rb', line 23

def process
  start_time = Time.now

  Temporal.logger.debug("Processing Workflow task", .to_h)
  Temporal.metrics.timing('workflow_task.queue_time', queue_time_ms, workflow: workflow_name)

  if !workflow_class
    raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker'
  end

  history = fetch_full_history
  # TODO: For sticky workflows we need to cache the Executor instance
  executor = Workflow::Executor.new(workflow_class, history)

  commands = middleware_chain.invoke() do
    executor.run
  end

  complete_task(commands)
rescue StandardError => error
  Temporal::ErrorHandler.handle(error, metadata: )

  fail_task(error)
ensure
  time_diff_ms = ((Time.now - start_time) * 1000).round
  Temporal.metrics.timing('workflow_task.latency', time_diff_ms, workflow: workflow_name)
  Temporal.logger.debug("Workflow task processed", .to_h.merge(execution_time: time_diff_ms))
end