Module: ZeebeBpmnRspec::Helpers

Includes:
Zeebe::Client::GatewayProtocol
Defined in:
lib/zeebe_bpmn_rspec/helpers.rb

Overview

rubocop:disable Metrics/ModuleLength

Instance Method Summary collapse

Instance Method Details

#activate_job(type, validate: true) ⇒ Object Also known as: process_job



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 67

def activate_job(type, validate: true)
  stream = _zeebe_client.activate_jobs(ActivateJobsRequest.new(
                                         type: type,
                                         worker: "#{type}-#{SecureRandom.hex}",
                                         maxJobsToActivate: 1,
                                         timeout: 1000,
                                         requestTimeout: ZeebeBpmnRspec.activate_request_timeout
                                       ))

  job = nil
  stream.find { |response| job = response.jobs.first }
  # puts job.inspect # support debug logging?

  ActivatedJob.new(job,
                   type: type,
                   workflow_instance_key: workflow_instance_key,
                   client: _zeebe_client,
                   context: self,
                   validate: validate)
end

#activate_jobs(type, max_jobs: nil) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 98

def activate_jobs(type, max_jobs: nil)
  stream = _zeebe_client.activate_jobs(ActivateJobsRequest.new({
    type: type,
    worker: "#{type}-#{SecureRandom.hex}",
    maxJobsToActivate: max_jobs,
    timeout: 1000,
    requestTimeout: ZeebeBpmnRspec.activate_request_timeout,
  }.compact))

  Enumerator.new do |yielder|
    stream.each do |response|
      response.jobs.each do |job|
        yielder << ActivatedJob.new(job,
                                    type: type,
                                    workflow_instance_key: workflow_instance_key,
                                    client: _zeebe_client,
                                    context: self,
                                    validate: true)
      end
    end
  end
end

#deploy_workflow(path, name = nil) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 9

def deploy_workflow(path, name = nil)
  _zeebe_client.deploy_workflow(DeployWorkflowRequest.new(
                                  workflows: [WorkflowRequestObject.new(
                                    name: (name && "#{name}.bpmn") || File.basename(path),
                                    type: WorkflowRequestObject::ResourceType::FILE,
                                    definition: File.read(path)
                                  )]
                                ))
rescue StandardError => e
  raise "Failed to deploy workflow: #{e}"
end

#expect_job_of_type(type) ⇒ Object



94
95
96
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 94

def expect_job_of_type(type)
  expect(job_with_type(type))
end

#job_with_type(type) ⇒ Object

TODO: deprecate process_job



90
91
92
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 90

def job_with_type(type)
  activate_job(type, validate: false)
end

#publish_message(name, correlation_key:, variables: nil) ⇒ Object



121
122
123
124
125
126
127
128
129
130
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 121

def publish_message(name, correlation_key:, variables: nil)
  _zeebe_client.publish_message(PublishMessageRequest.new(
                                  {
                                    name: name,
                                    correlationKey: correlation_key,
                                    timeToLive: 5000,
                                    variables: variables&.to_json,
                                  }.compact
                                ))
end

#reset_zeebe!Object



132
133
134
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 132

def reset_zeebe!
  @__workflow_instance_key = nil
end

#with_workflow_instance(name, variables = {}) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 21

def with_workflow_instance(name, variables = {})
  system_error = nil
  workflow = _zeebe_client.create_workflow_instance(CreateWorkflowInstanceRequest.new(
                                                      bpmnProcessId: name,
                                                      version: -1, # always latest
                                                      variables: variables.to_json
                                                    ))
  @__workflow_instance_key = workflow.workflowInstanceKey
  yield(workflow.workflowInstanceKey)
rescue Exception => e # rubocop:disable Lint/RescueException
  # exceptions are rescued to ensure that instances are cancelled
  # any error is re-raised below
  system_error = e
ensure
  if workflow&.workflowInstanceKey
    begin
      _zeebe_client.cancel_workflow_instance(CancelWorkflowInstanceRequest.new(
                                               workflowInstanceKey: workflow.workflowInstanceKey
                                             ))
    rescue GRPC::NotFound => _e
      # expected
    rescue StandardError => _e
      puts "Cancelled instance #{ex.inspect}" # TODO
    end
  end
  raise system_error if system_error
end

#workflow_complete!Object



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 49

def workflow_complete!
  error = nil
  sleep 0.25 # TODO: configurable?
  begin
    _zeebe_client.cancel_workflow_instance(CancelWorkflowInstanceRequest.new(
                                             workflowInstanceKey: workflow_instance_key
                                           ))
  rescue GRPC::NotFound => e
    error = e
  end

  raise "Expected workflow instance #{workflow_instance_key} to be complete" if error.nil?
end

#workflow_instance_keyObject



63
64
65
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 63

def workflow_instance_key
  @__workflow_instance_key
end