Class: Dor::Workflow::Client::Queues
- Inherits:
-
Object
- Object
- Dor::Workflow::Client::Queues
- Defined in:
- lib/dor/workflow/client/queues.rb
Overview
Makes requests relating to the workflow queues
Instance Method Summary collapse
-
#count_errored_for_workstep(workflow, step, repository = nil) ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step.
-
#count_objects_in_step(workflow, step, type, repo = nil) ⇒ Hash
Used by preservation robots stats reporter.
-
#count_queued_for_workstep(workflow, step, repository = nil) ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step.
-
#count_stale_queued_workflows(*args) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in.
-
#errored_objects_for_workstep(workflow, step, repository = nil) ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step.
-
#initialize(requestor:) ⇒ Queues
constructor
A new instance of Queues.
-
#lane_ids(*args) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step.
-
#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params.
-
#stale_queued_workflows(*args) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager.
Constructor Details
#initialize(requestor:) ⇒ Queues
Returns a new instance of Queues.
8 9 10 |
# File 'lib/dor/workflow/client/queues.rb', line 8 def initialize(requestor:) @requestor = requestor end |
Instance Method Details
#count_errored_for_workstep(workflow, step, repository = nil) ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step
172 173 174 175 |
# File 'lib/dor/workflow/client/queues.rb', line 172 def count_errored_for_workstep(workflow, step, repository = nil) Deprecation.warn(self, 'the third argument to `#count_errored_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repository.nil? count_objects_in_step(workflow, step, 'error') end |
#count_objects_in_step(workflow, step, type, repo = nil) ⇒ Hash
Used by preservation robots stats reporter
159 160 161 162 163 |
# File 'lib/dor/workflow/client/queues.rb', line 159 def count_objects_in_step(workflow, step, type, repo = nil) Deprecation.warn(self, 'the fourth argument to `#count_objects_in_step` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repo.nil? resp = requestor.request "workflow_queue?workflow=#{workflow}&#{type}=#{step}" extract_object_count(resp) end |
#count_queued_for_workstep(workflow, step, repository = nil) ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step
184 185 186 187 |
# File 'lib/dor/workflow/client/queues.rb', line 184 def count_queued_for_workstep(workflow, step, repository = nil) Deprecation.warn(self, 'the third argument to `#count_queued_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repository.nil? count_objects_in_step(workflow, step, 'queued') end |
#count_stale_queued_workflows(*args) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/dor/workflow/client/queues.rb', line 58 def count_stale_queued_workflows(*args) if args.count == 2 Deprecation.warn( self, '`#count_stale_queued_workflows` only takes one arg: a hash. This will raise an exception in Dor::Workflow::Client 4.0.0' ) args.shift # ditch the `repo` argument end uri_string = build_queued_uri(args.first) + '&count-only=true' doc = Nokogiri::XML(requestor.request(uri_string)) doc.at_xpath('/objects/@count').value.to_i end |
#errored_objects_for_workstep(workflow, step, repository = nil) ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step
143 144 145 146 147 148 149 |
# File 'lib/dor/workflow/client/queues.rb', line 143 def errored_objects_for_workstep(workflow, step, repository = nil) Deprecation.warn(self, 'the third argument to `#errored_objects_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repository.nil? resp = requestor.request "workflow_queue?workflow=#{workflow}&error=#{step}" Nokogiri::XML(resp).xpath('//object').map do |node| [node['id'], node['errorMessage']] end.to_h end |
#lane_ids(*args) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step
18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/dor/workflow/client/queues.rb', line 18 def lane_ids(*args) if args.count == 3 Deprecation.warn( self, '`#lane_ids` only takes two args: workflow name, & process/step name. This will raise an exception in Dor::Workflow::Client 4.0.0' ) args.shift # ditch the `repo` argument end uri = "workflow_queue/lane_ids?step=#{args.first}:#{args.second}" doc = Nokogiri::XML(requestor.request(uri)) doc.xpath('/lanes/lane').map { |n| n['id'] } end |
#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/dor/workflow/client/queues.rb', line 104 def objects_for_workstep(completed, waiting, lane_id = 'default', = {}) Deprecation.warn(self, 'the `:default_repository` option in `#objects_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') if [:default_repository] waiting_param = qualify_step([:default_workflow], waiting) uri_string = "workflow_queue?waiting=#{waiting_param}" if completed Array(completed).each do |step| completed_param = qualify_step([:default_workflow], step) uri_string += "&completed=#{completed_param}" end end uri_string += "&limit=#{options[:limit].to_i}" if [:limit]&.to_i&.positive? uri_string += "&lane-id=#{lane_id}" resp = requestor.request uri_string # # response looks like: # <objects count="2"> # <object id="druid:ab123de4567"/> # <object id="druid:ab123de9012"/> # </objects> # # convert into: # ['druid:ab123de4567', 'druid:ab123de9012'] # Nokogiri::XML(resp).xpath('//object[@id]').map { |n| n[:id] } end |
#stale_queued_workflows(*args) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager
40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/dor/workflow/client/queues.rb', line 40 def stale_queued_workflows(*args) if args.count == 2 Deprecation.warn( self, '`#stale_queued_workflows` only takes one arg: a hash. This will raise an exception in Dor::Workflow::Client 4.0.0' ) args.shift # ditch the `repo` argument end uri_string = build_queued_uri(args.first) parse_queued_workflows_response requestor.request(uri_string) end |