Class: Dor::Workflow::Client::Queues
- Inherits:
-
Object
- Object
- Dor::Workflow::Client::Queues
- Defined in:
- lib/dor/workflow/client/queues.rb
Overview
Makes requests relating to the workflow queues
Instance Method Summary collapse
-
#count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step.
-
#count_objects_in_step(workflow, step, type, repo) ⇒ Object
Used by preservation robots stats reporter.
-
#count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step.
-
#count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in.
-
#errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step.
-
#initialize(requestor:) ⇒ Queues
constructor
A new instance of Queues.
-
#lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step.
-
#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params.
-
#stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager.
Constructor Details
#initialize(requestor:) ⇒ Queues
Returns a new instance of Queues.
8 9 10 |
# File 'lib/dor/workflow/client/queues.rb', line 8 def initialize(requestor:) @requestor = requestor end |
Instance Method Details
#count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step
147 148 149 |
# File 'lib/dor/workflow/client/queues.rb', line 147 def count_errored_for_workstep(workflow, step, repository = 'dor') count_objects_in_step(workflow, step, 'error', repository) end |
#count_objects_in_step(workflow, step, type, repo) ⇒ Object
Used by preservation robots stats reporter
135 136 137 138 |
# File 'lib/dor/workflow/client/queues.rb', line 135 def count_objects_in_step(workflow, step, type, repo) resp = requestor.request "workflow_queue?repository=#{repo}&workflow=#{workflow}&#{type}=#{step}" extract_object_count(resp) end |
#count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step
158 159 160 |
# File 'lib/dor/workflow/client/queues.rb', line 158 def count_queued_for_workstep(workflow, step, repository = 'dor') count_objects_in_step(workflow, step, 'queued', repository) end |
#count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
45 46 47 48 49 |
# File 'lib/dor/workflow/client/queues.rb', line 45 def count_stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) + '&count-only=true' doc = Nokogiri::XML(requestor.request(uri_string)) doc.at_xpath('/objects/@count').value.to_i end |
#errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step
125 126 127 128 129 130 131 132 |
# File 'lib/dor/workflow/client/queues.rb', line 125 def errored_objects_for_workstep(workflow, step, repository = 'dor') resp = requestor.request "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}" result = {} Nokogiri::XML(resp).xpath('//object').collect do |node| result.merge!(node['id'] => node['errorMessage']) end result end |
#lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step
18 19 20 21 22 23 |
# File 'lib/dor/workflow/client/queues.rb', line 18 def lane_ids(repo, workflow, process) uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}" doc = Nokogiri::XML(requestor.request(uri)) nodes = doc.xpath('/lanes/lane') nodes.map { |n| n['id'] } end |
#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/dor/workflow/client/queues.rb', line 86 def objects_for_workstep(completed, waiting, lane_id = 'default', = {}) waiting_param = qualify_step([:default_repository], [:default_workflow], waiting) uri_string = "workflow_queue?waiting=#{waiting_param}" if completed Array(completed).each do |step| completed_param = qualify_step([:default_repository], [:default_workflow], step) uri_string += "&completed=#{completed_param}" end end uri_string += "&limit=#{options[:limit].to_i}" if [:limit]&.to_i&.positive? uri_string += "&lane-id=#{lane_id}" resp = requestor.request uri_string # # response looks like: # <objects count="2"> # <object id="druid:ab123de4567"/> # <object id="druid:ab123de9012"/> # </objects> # # convert into: # ['druid:ab123de4567', 'druid:ab123de9012'] # result = Nokogiri::XML(resp).xpath('//object[@id]') result.map { |n| n[:id] } end |
#stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager
34 35 36 37 |
# File 'lib/dor/workflow/client/queues.rb', line 34 def stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) parse_queued_workflows_response requestor.request(uri_string) end |