Class: Dor::Workflow::Client::Queues

Inherits:
Object
  • Object
show all
Defined in:
lib/dor/workflow/client/queues.rb

Overview

Makes requests relating to the workflow queues

Instance Method Summary collapse

Constructor Details

#initialize(requestor:) ⇒ Queues



8
9
10
# File 'lib/dor/workflow/client/queues.rb', line 8

def initialize(requestor:)
  @requestor = requestor
end

Instance Method Details

#count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer

Returns the number of objects that have a status of 'error' in a particular workflow and step



147
148
149
# File 'lib/dor/workflow/client/queues.rb', line 147

def count_errored_for_workstep(workflow, step, repository = 'dor')
  count_objects_in_step(workflow, step, 'error', repository)
end

#count_objects_in_step(workflow, step, type, repo) ⇒ Object

Used by preservation robots stats reporter



135
136
137
138
# File 'lib/dor/workflow/client/queues.rb', line 135

def count_objects_in_step(workflow, step, type, repo)
  resp = requestor.request "workflow_queue?repository=#{repo}&workflow=#{workflow}&#{type}=#{step}"
  extract_object_count(resp)
end

#count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer

Returns the number of objects that have a status of 'queued' in a particular workflow and step



158
159
160
# File 'lib/dor/workflow/client/queues.rb', line 158

def count_queued_for_workstep(workflow, step, repository = 'dor')
  count_objects_in_step(workflow, step, 'queued', repository)
end

#count_stale_queued_workflows(repository, opts = {}) ⇒ Integer

Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in

Options Hash (opts):

  • :hours_ago (Integer)

    steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours, meaning you will get all queued workflows



45
46
47
48
49
# File 'lib/dor/workflow/client/queues.rb', line 45

def count_stale_queued_workflows(repository, opts = {})
  uri_string = build_queued_uri(repository, opts) + '&count-only=true'
  doc = Nokogiri::XML(requestor.request(uri_string))
  doc.at_xpath('/objects/@count').value.to_i
end

#errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash

Get a list of druids that have errored out in a particular workflow and step

Examples:

client.errored_objects_for_workstep('accessionWF','content-metadata')
=> {"druid:qd556jq0580"=>"druid:qd556jq0580 - Item error; caused by
   #<Rubydora::FedoraInvalidRequest: Error modifying datastream contentMetadata for druid:qd556jq0580. See logger for details>"}


125
126
127
128
129
130
131
132
# File 'lib/dor/workflow/client/queues.rb', line 125

def errored_objects_for_workstep(workflow, step, repository = 'dor')
  resp = requestor.request "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}"
  result = {}
  Nokogiri::XML(resp).xpath('//object').collect do |node|
    result.merge!(node['id'] => node['errorMessage'])
  end
  result
end

#lane_ids(repo, workflow, process) ⇒ Array<String>

Returns all the distinct laneIds for a given workflow step



18
19
20
21
22
23
# File 'lib/dor/workflow/client/queues.rb', line 18

def lane_ids(repo, workflow, process)
  uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}"
  doc = Nokogiri::XML(requestor.request(uri))
  nodes = doc.xpath('/lanes/lane')
  nodes.map { |n| n['id'] }
end

#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>

Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params

Examples:

objects_for_workstep(...)
=> [
   "druid:py156ps0477",
   "druid:tt628cb6479",
   "druid:ct021wp7863"
 ]
objects_for_workstep(..., "lane1")
=> {
 "druid:py156ps0477",
 "druid:tt628cb6479",
}
objects_for_workstep(..., "lane1", limit: 1)
=> {
 "druid:py156ps0477",
}

Options Hash (options):

  • :limit (Integer)

    maximum number of druids to return (nil for no limit)



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/dor/workflow/client/queues.rb', line 86

def objects_for_workstep(completed, waiting, lane_id = 'default', options = {})
  waiting_param = qualify_step(options[:default_repository], options[:default_workflow], waiting)
  uri_string = "workflow_queue?waiting=#{waiting_param}"
  if completed
    Array(completed).each do |step|
      completed_param = qualify_step(options[:default_repository], options[:default_workflow], step)
      uri_string += "&completed=#{completed_param}"
    end
  end

  uri_string += "&limit=#{options[:limit].to_i}" if options[:limit]&.to_i&.positive?
  uri_string += "&lane-id=#{lane_id}"

  resp = requestor.request uri_string
  #
  # response looks like:
  #    <objects count="2">
  #      <object id="druid:ab123de4567"/>
  #      <object id="druid:ab123de9012"/>
  #    </objects>
  #
  # convert into:
  #   ['druid:ab123de4567', 'druid:ab123de9012']
  #
  result = Nokogiri::XML(resp).xpath('//object[@id]')
  result.map { |n| n[:id] }
end

#stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]

Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager

Options Hash (opts):

  • :hours_ago (Integer)

    steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours, meaning you will get all queued workflows

  • :limit (Integer)

    sets the maximum number of workflow steps that can be returned. Defaults to no limit



34
35
36
37
# File 'lib/dor/workflow/client/queues.rb', line 34

def stale_queued_workflows(repository, opts = {})
  uri_string = build_queued_uri(repository, opts)
  parse_queued_workflows_response requestor.request(uri_string)
end