Class: Dor::Workflow::Client::Queues
- Inherits:
-
Object
- Object
- Dor::Workflow::Client::Queues
- Defined in:
- lib/dor/workflow/client/queues.rb
Overview
Makes requests relating to the workflow queues
Instance Method Summary collapse
-
#count_errored_for_workstep(workflow, step) ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step.
-
#count_objects_in_step(workflow, step, type) ⇒ Hash
Used by preservation robots stats reporter.
-
#count_queued_for_workstep(workflow, step) ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step.
-
#count_stale_queued_workflows(opts) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in.
-
#errored_objects_for_workstep(workflow, step) ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step.
-
#initialize(requestor:) ⇒ Queues
constructor
A new instance of Queues.
-
#lane_ids(workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step.
-
#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params.
-
#stale_queued_workflows(opts) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager.
Constructor Details
#initialize(requestor:) ⇒ Queues
Returns a new instance of Queues.
8 9 10 |
# File 'lib/dor/workflow/client/queues.rb', line 8 def initialize(requestor:) @requestor = requestor end |
Instance Method Details
#count_errored_for_workstep(workflow, step) ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step
142 143 144 |
# File 'lib/dor/workflow/client/queues.rb', line 142 def count_errored_for_workstep(workflow, step) count_objects_in_step(workflow, step, 'error') end |
#count_objects_in_step(workflow, step, type) ⇒ Hash
Used by preservation robots stats reporter
131 132 133 134 |
# File 'lib/dor/workflow/client/queues.rb', line 131 def count_objects_in_step(workflow, step, type) resp = requestor.request "workflow_queue?workflow=#{workflow}&#{type}=#{step}" extract_object_count(resp) end |
#count_queued_for_workstep(workflow, step) ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step
152 153 154 |
# File 'lib/dor/workflow/client/queues.rb', line 152 def count_queued_for_workstep(workflow, step) count_objects_in_step(workflow, step, 'queued') end |
#count_stale_queued_workflows(opts) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
41 42 43 44 45 |
# File 'lib/dor/workflow/client/queues.rb', line 41 def count_stale_queued_workflows(opts) uri_string = "#{build_queued_uri(opts)}&count-only=true" doc = Nokogiri::XML(requestor.request(uri_string)) doc.at_xpath('/objects/@count').value.to_i end |
#errored_objects_for_workstep(workflow, step) ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step
117 118 119 120 121 122 |
# File 'lib/dor/workflow/client/queues.rb', line 117 def errored_objects_for_workstep(workflow, step) resp = requestor.request "workflow_queue?workflow=#{workflow}&error=#{step}" Nokogiri::XML(resp).xpath('//object').to_h do |node| [node['id'], node['errorMessage']] end end |
#lane_ids(workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step
17 18 19 20 21 |
# File 'lib/dor/workflow/client/queues.rb', line 17 def lane_ids(workflow, process) uri = "workflow_queue/lane_ids?step=#{workflow}:#{process}" doc = Nokogiri::XML(requestor.request(uri)) doc.xpath('/lanes/lane').map { |n| n['id'] } end |
#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/dor/workflow/client/queues.rb', line 80 def objects_for_workstep(completed, waiting, lane_id = 'default', = {}) waiting_param = qualify_step([:default_workflow], waiting) uri_string = "workflow_queue?waiting=#{waiting_param}" if completed Array(completed).each do |step| completed_param = qualify_step([:default_workflow], step) uri_string += "&completed=#{completed_param}" end end uri_string += "&limit=#{[:limit].to_i}" if [:limit]&.to_i&.positive? uri_string += "&lane-id=#{lane_id}" resp = requestor.request uri_string # # response looks like: # <objects count="2"> # <object id="druid:ab123de4567"/> # <object id="druid:ab123de9012"/> # </objects> # # convert into: # ['druid:ab123de4567', 'druid:ab123de9012'] # Nokogiri::XML(resp).xpath('//object[@id]').map { |n| n[:id] } end |
#stale_queued_workflows(opts) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager
31 32 33 34 |
# File 'lib/dor/workflow/client/queues.rb', line 31 def stale_queued_workflows(opts) uri_string = build_queued_uri(opts) parse_queued_workflows_response requestor.request(uri_string) end |