Module: ZeebeBpmnRspec::Helpers
- Includes:
- Zeebe::Client::GatewayProtocol
- Defined in:
- lib/zeebe_bpmn_rspec/helpers.rb
Overview
rubocop:disable Metrics/ModuleLength
Instance Method Summary collapse
- #activate_job(type, validate: true) ⇒ Object (also: #process_job)
- #activate_jobs(type, max_jobs: nil) ⇒ Object
- #deploy_workflow(path, name = nil) ⇒ Object
- #expect_job_of_type(type) ⇒ Object
-
#job_with_type(type) ⇒ Object
TODO: deprecate process_job.
- #publish_message(name, correlation_key:, variables: nil) ⇒ Object
- #reset_zeebe! ⇒ Object
- #with_workflow_instance(name, variables = {}) ⇒ Object
- #workflow_complete! ⇒ Object
- #workflow_instance_key ⇒ Object
Instance Method Details
#activate_job(type, validate: true) ⇒ Object Also known as: process_job
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 67 def activate_job(type, validate: true) stream = _zeebe_client.activate_jobs(ActivateJobsRequest.new( type: type, worker: "#{type}-#{SecureRandom.hex}", maxJobsToActivate: 1, timeout: 1000, requestTimeout: ZeebeBpmnRspec.activate_request_timeout )) job = nil stream.find { |response| job = response.jobs.first } # puts job.inspect # support debug logging? ActivatedJob.new(job, type: type, workflow_instance_key: workflow_instance_key, client: _zeebe_client, context: self, validate: validate) end |
#activate_jobs(type, max_jobs: nil) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 98 def activate_jobs(type, max_jobs: nil) stream = _zeebe_client.activate_jobs(ActivateJobsRequest.new({ type: type, worker: "#{type}-#{SecureRandom.hex}", maxJobsToActivate: max_jobs, timeout: 1000, requestTimeout: ZeebeBpmnRspec.activate_request_timeout, }.compact)) Enumerator.new do |yielder| stream.each do |response| response.jobs.each do |job| yielder << ActivatedJob.new(job, type: type, workflow_instance_key: workflow_instance_key, client: _zeebe_client, context: self, validate: true) end end end end |
#deploy_workflow(path, name = nil) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 9 def deploy_workflow(path, name = nil) _zeebe_client.deploy_workflow(DeployWorkflowRequest.new( workflows: [WorkflowRequestObject.new( name: (name && "#{name}.bpmn") || File.basename(path), type: WorkflowRequestObject::ResourceType::FILE, definition: File.read(path) )] )) rescue StandardError => e raise "Failed to deploy workflow: #{e}" end |
#expect_job_of_type(type) ⇒ Object
94 95 96 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 94 def expect_job_of_type(type) expect(job_with_type(type)) end |
#job_with_type(type) ⇒ Object
TODO: deprecate process_job
90 91 92 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 90 def job_with_type(type) activate_job(type, validate: false) end |
#publish_message(name, correlation_key:, variables: nil) ⇒ Object
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 121 def (name, correlation_key:, variables: nil) _zeebe_client.(PublishMessageRequest.new( { name: name, correlationKey: correlation_key, timeToLive: 5000, variables: variables&.to_json, }.compact )) end |
#reset_zeebe! ⇒ Object
132 133 134 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 132 def reset_zeebe! @__workflow_instance_key = nil end |
#with_workflow_instance(name, variables = {}) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 21 def with_workflow_instance(name, variables = {}) system_error = nil workflow = _zeebe_client.create_workflow_instance(CreateWorkflowInstanceRequest.new( bpmnProcessId: name, version: -1, # always latest variables: variables.to_json )) @__workflow_instance_key = workflow.workflowInstanceKey yield(workflow.workflowInstanceKey) rescue Exception => e # rubocop:disable Lint/RescueException # exceptions are rescued to ensure that instances are cancelled # any error is re-raised below system_error = e ensure if workflow&.workflowInstanceKey begin _zeebe_client.cancel_workflow_instance(CancelWorkflowInstanceRequest.new( workflowInstanceKey: workflow.workflowInstanceKey )) rescue GRPC::NotFound => _e # expected rescue StandardError => _e puts "Cancelled instance #{ex.inspect}" # TODO end end raise system_error if system_error end |
#workflow_complete! ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 49 def workflow_complete! error = nil sleep 0.25 # TODO: configurable? begin _zeebe_client.cancel_workflow_instance(CancelWorkflowInstanceRequest.new( workflowInstanceKey: workflow_instance_key )) rescue GRPC::NotFound => e error = e end raise "Expected workflow instance #{workflow_instance_key} to be complete" if error.nil? end |
#workflow_instance_key ⇒ Object
63 64 65 |
# File 'lib/zeebe_bpmn_rspec/helpers.rb', line 63 def workflow_instance_key @__workflow_instance_key end |