Class: Temporal::Testing::LocalWorkflowContext

Inherits:
Object
  • Object
show all
Defined in:
lib/temporal/testing/local_workflow_context.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(execution, workflow_id, run_id, disabled_releases, metadata) ⇒ LocalWorkflowContext

Returns a new instance of LocalWorkflowContext.



14
15
16
17
18
19
20
21
22
# File 'lib/temporal/testing/local_workflow_context.rb', line 14

def initialize(execution, workflow_id, run_id, disabled_releases, )
  @last_event_id = 0
  @execution = execution
  @run_id = run_id
  @workflow_id = workflow_id
  @disabled_releases = disabled_releases
  @metadata = 
  @completed = false
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



12
13
14
# File 'lib/temporal/testing/local_workflow_context.rb', line 12

def 
  @metadata
end

Instance Method Details

#cancel(target, cancelation_id) ⇒ Object

Raises:

  • (NotImplementedError)


184
185
186
# File 'lib/temporal/testing/local_workflow_context.rb', line 184

def cancel(target, cancelation_id)
  raise NotImplementedError, 'not yet available for testing'
end

#cancel_activity(activity_id) ⇒ Object

Raises:

  • (NotImplementedError)


180
181
182
# File 'lib/temporal/testing/local_workflow_context.rb', line 180

def cancel_activity(activity_id)
  raise NotImplementedError, 'not yet available for testing'
end

#cancel_timer(timer_id) ⇒ Object

Raises:

  • (NotImplementedError)


147
148
149
# File 'lib/temporal/testing/local_workflow_context.rb', line 147

def cancel_timer(timer_id)
  raise NotImplementedError, 'not yet available for testing'
end

#complete(result = nil) ⇒ Object



151
152
153
154
# File 'lib/temporal/testing/local_workflow_context.rb', line 151

def complete(result = nil)
  completed!
  result
end

#completed?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/temporal/testing/local_workflow_context.rb', line 24

def completed?
  @completed
end

#execute_activity(activity_class, *input, **args) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/temporal/testing/local_workflow_context.rb', line 40

def execute_activity(activity_class, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  event_id = next_event_id
  activity_id = options[:activity_id] || event_id

  target = Workflow::History::EventTarget.new(event_id, Workflow::History::EventTarget::ACTIVITY_TYPE)
  future = Workflow::Future.new(target, self, cancelation_id: activity_id)

  execution_options = ExecutionOptions.new(activity_class, options)
   = Metadata::Activity.new(
    namespace: execution_options.namespace,
    id: activity_id,
    name: execution_options.name,
    task_token: nil,
    attempt: 1,
    workflow_run_id: run_id,
    workflow_id: workflow_id,
    workflow_name: nil, # not yet used, but will be in the future
    headers: execution_options.headers,
    heartbeat_details: nil
  )
  context = LocalActivityContext.new()

  begin
    result = activity_class.execute_in_context(context, input)
  rescue StandardError => e
    # Capture any failure from running the activity into the future
    # instead of raising immediately in order to match the behavior of
    # running against a Temporal server.
    future.fail(e)
  else
    if context.async?
      execution.register_future(context.async_token, future)
    else
      # Fulfill the future straight away for non-async activities
      future.set(result)
    end
  end

  future
end

#execute_activity!(activity_class, *input, **args) ⇒ Object



84
85
86
87
88
89
90
91
# File 'lib/temporal/testing/local_workflow_context.rb', line 84

def execute_activity!(activity_class, *input, **args)
  future = execute_activity(activity_class, *input, **args)
  result_or_exception = future.get

  raise result_or_exception if future.failed?

  result_or_exception
end

#execute_local_activity(activity_class, *input, **args) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/temporal/testing/local_workflow_context.rb', line 93

def execute_local_activity(activity_class, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(activity_class, options)
  activity_id = options[:activity_id] || SecureRandom.uuid
   = Metadata::Activity.new(
    namespace: execution_options.namespace,
    id: activity_id,
    name: execution_options.name,
    task_token: nil,
    attempt: 1,
    workflow_run_id: run_id,
    workflow_id: workflow_id,
    workflow_name: nil, # not yet used, but will be in the future
    headers: execution_options.headers,
    heartbeat_details: nil
  )
  context = LocalActivityContext.new()

  activity_class.execute_in_context(context, input)
end

#execute_workflow(workflow_class, *input, **args) ⇒ Object

Raises:

  • (NotImplementedError)


116
117
118
# File 'lib/temporal/testing/local_workflow_context.rb', line 116

def execute_workflow(workflow_class, *input, **args)
  raise NotImplementedError, 'not yet available for testing'
end

#execute_workflow!(workflow_class, *input, **args) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/temporal/testing/local_workflow_context.rb', line 120

def execute_workflow!(workflow_class, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution = WorkflowExecution.new
  workflow_id = SecureRandom.uuid
  run_id = SecureRandom.uuid
  execution_options = ExecutionOptions.new(workflow_class, options)
  context = Temporal::Testing::LocalWorkflowContext.new(
    execution, workflow_id, run_id, workflow_class.disabled_releases, execution_options.headers
  )

  workflow_class.execute_in_context(context, input)
end

#fail(exception) ⇒ Object



156
157
158
159
# File 'lib/temporal/testing/local_workflow_context.rb', line 156

def fail(exception)
  completed!
  raise exception
end

#has_release?(change_name) ⇒ Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/temporal/testing/local_workflow_context.rb', line 36

def has_release?(change_name)
  !disabled_releases.include?(change_name.to_s)
end

#headersObject



32
33
34
# File 'lib/temporal/testing/local_workflow_context.rb', line 32

def headers
  .headers
end

#loggerObject



28
29
30
# File 'lib/temporal/testing/local_workflow_context.rb', line 28

def logger
  Temporal.logger
end

#nowObject



172
173
174
# File 'lib/temporal/testing/local_workflow_context.rb', line 172

def now
  Time.now
end

#on_signal(&block) ⇒ Object

Raises:

  • (NotImplementedError)


176
177
178
# File 'lib/temporal/testing/local_workflow_context.rb', line 176

def on_signal(&block)
  raise NotImplementedError, 'not yet available for testing'
end

#side_effect(&block) ⇒ Object



135
136
137
# File 'lib/temporal/testing/local_workflow_context.rb', line 135

def side_effect(&block)
  block.call
end

#sleep(timeout) ⇒ Object



139
140
141
# File 'lib/temporal/testing/local_workflow_context.rb', line 139

def sleep(timeout)
  ::Kernel.sleep timeout
end

#start_timer(timeout, timer_id = nil) ⇒ Object

Raises:

  • (NotImplementedError)


143
144
145
# File 'lib/temporal/testing/local_workflow_context.rb', line 143

def start_timer(timeout, timer_id = nil)
  raise NotImplementedError, 'not yet available for testing'
end

#wait_for(future) ⇒ Object



167
168
169
170
# File 'lib/temporal/testing/local_workflow_context.rb', line 167

def wait_for(future)
  # Point of communication
  Fiber.yield while !future.finished?
end

#wait_for_all(*futures) ⇒ Object



161
162
163
164
165
# File 'lib/temporal/testing/local_workflow_context.rb', line 161

def wait_for_all(*futures)
  futures.each(&:wait)

  return
end