Class: Dor::Workflow::Client::Queues

Inherits:
Object
  • Object
show all
Defined in:
lib/dor/workflow/client/queues.rb

Overview

Makes requests relating to the workflow queues

Instance Method Summary collapse

Constructor Details

#initialize(requestor:) ⇒ Queues

Returns a new instance of Queues.



8
9
10
# File 'lib/dor/workflow/client/queues.rb', line 8

def initialize(requestor:)
  @requestor = requestor
end

Instance Method Details

#count_errored_for_workstep(workflow, step, repository = nil) ⇒ Integer

Returns the number of objects that have a status of 'error' in a particular workflow and step

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • repository (String) (defaults to: nil)

    -- deprecated, ignored by workflow service

Returns:

  • (Integer)

    Number of objects with this repository:workflow:step that have a status of 'error'



172
173
174
175
# File 'lib/dor/workflow/client/queues.rb', line 172

def count_errored_for_workstep(workflow, step, repository = nil)
  Deprecation.warn(self, 'the third argument to `#count_errored_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repository.nil?
  count_objects_in_step(workflow, step, 'error')
end

#count_objects_in_step(workflow, step, type, repo = nil) ⇒ Hash

Used by preservation robots stats reporter

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • type (String)
  • repo (String) (defaults to: nil)

    -- deprecated, ignored by workflow service

Returns:

  • (Hash)

    hash of results, with key has a druid, and value as the error message



159
160
161
162
163
# File 'lib/dor/workflow/client/queues.rb', line 159

def count_objects_in_step(workflow, step, type, repo = nil)
  Deprecation.warn(self, 'the fourth argument to `#count_objects_in_step` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repo.nil?
  resp = requestor.request "workflow_queue?workflow=#{workflow}&#{type}=#{step}"
  extract_object_count(resp)
end

#count_queued_for_workstep(workflow, step, repository = nil) ⇒ Integer

Returns the number of objects that have a status of 'queued' in a particular workflow and step

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • repository (String) (defaults to: nil)

    -- deprecated, ignored by workflow service

Returns:

  • (Integer)

    Number of objects with this repository:workflow:step that have a status of 'queued'



184
185
186
187
# File 'lib/dor/workflow/client/queues.rb', line 184

def count_queued_for_workstep(workflow, step, repository = nil)
  Deprecation.warn(self, 'the third argument to `#count_queued_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repository.nil?
  count_objects_in_step(workflow, step, 'queued')
end

#count_stale_queued_workflows(*args) ⇒ Integer

Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in

Parameters:

  • repository (String)

    -- deprecated, ignored by workflow service

  • opts (Hash)

    optional values for query

Returns:

  • (Integer)

    number of stale, queued steps if the :count_only option was set to true



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/dor/workflow/client/queues.rb', line 58

def count_stale_queued_workflows(*args)
  if args.count == 2
    Deprecation.warn(
      self,
      '`#count_stale_queued_workflows` only takes one arg: a hash. This will raise an exception in Dor::Workflow::Client 4.0.0'
    )
    args.shift # ditch the `repo` argument
  end
  uri_string = build_queued_uri(args.first) + '&count-only=true'
  doc = Nokogiri::XML(requestor.request(uri_string))
  doc.at_xpath('/objects/@count').value.to_i
end

#errored_objects_for_workstep(workflow, step, repository = nil) ⇒ Hash

Get a list of druids that have errored out in a particular workflow and step

Examples:

client.errored_objects_for_workstep('accessionWF','content-metadata')
=> {"druid:qd556jq0580"=>"druid:qd556jq0580 - Item error; caused by
   #<Rubydora::FedoraInvalidRequest: Error modifying datastream contentMetadata for druid:qd556jq0580. See logger for details>"}

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • repository (String) (defaults to: nil)

    -- deprecated, ignored by workflow service

Returns:

  • (Hash)

    hash of results, with key has a druid, and value as the error message



143
144
145
146
147
148
149
# File 'lib/dor/workflow/client/queues.rb', line 143

def errored_objects_for_workstep(workflow, step, repository = nil)
  Deprecation.warn(self, 'the third argument to `#errored_objects_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') unless repository.nil?
  resp = requestor.request "workflow_queue?workflow=#{workflow}&error=#{step}"
  Nokogiri::XML(resp).xpath('//object').map do |node|
    [node['id'], node['errorMessage']]
  end.to_h
end

#lane_ids(*args) ⇒ Array<String>

Returns all the distinct laneIds for a given workflow step

Parameters:

  • repo (String)

    -- deprecated, ignored by workflow service

  • workflow (String)

    name

  • process (String)

    name

Returns:

  • (Array<String>)

    all of the distinct laneIds. Array will be empty if no lane ids were found



18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/dor/workflow/client/queues.rb', line 18

def lane_ids(*args)
  if args.count == 3
    Deprecation.warn(
      self,
      '`#lane_ids` only takes two args: workflow name, & process/step name. This will raise an exception in Dor::Workflow::Client 4.0.0'
    )
    args.shift # ditch the `repo` argument
  end
  uri = "workflow_queue/lane_ids?step=#{args.first}:#{args.second}"
  doc = Nokogiri::XML(requestor.request(uri))
  doc.xpath('/lanes/lane').map { |n| n['id'] }
end

#objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>

Returns a list of druids from the workflow service that meet the criteria of the passed in completed and waiting params

Examples:

objects_for_workstep(...)
=> [
   "druid:py156ps0477",
   "druid:tt628cb6479",
   "druid:ct021wp7863"
 ]
objects_for_workstep(..., "lane1")
=> {
 "druid:py156ps0477",
 "druid:tt628cb6479",
}
objects_for_workstep(..., "lane1", limit: 1)
=> {
 "druid:py156ps0477",
}

Parameters:

  • completed (Array<String>, String)

    An array or single String of the completed steps, should use the qualified format: workflow:step-name

  • waiting (String)

    name of the waiting step

  • workflow (String)

    default workflow to use if it isn't passed in the qualified-step-name

  • lane_id (String) (defaults to: 'default')

    issue a query for a specific lane_id for the waiting step

  • options (Hash) (defaults to: {})
  • options (String) (defaults to: {})

    :default_workflow workflow to query for if not using the qualified format

Options Hash (options):

  • :limit (Integer)

    maximum number of druids to return (nil for no limit)

Returns:

  • (Array<String>)

    Array of druids



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/dor/workflow/client/queues.rb', line 104

def objects_for_workstep(completed, waiting, lane_id = 'default', options = {})
  Deprecation.warn(self, 'the `:default_repository` option in `#objects_for_workstep` is unused and will go away in Dor::Workflow::Client 4.0.0. omit argument to silence.') if options[:default_repository]
  waiting_param = qualify_step(options[:default_workflow], waiting)
  uri_string = "workflow_queue?waiting=#{waiting_param}"
  if completed
    Array(completed).each do |step|
      completed_param = qualify_step(options[:default_workflow], step)
      uri_string += "&completed=#{completed_param}"
    end
  end

  uri_string += "&limit=#{options[:limit].to_i}" if options[:limit]&.to_i&.positive?
  uri_string += "&lane-id=#{lane_id}"

  resp = requestor.request uri_string
  #
  # response looks like:
  #    <objects count="2">
  #      <object id="druid:ab123de4567"/>
  #      <object id="druid:ab123de9012"/>
  #    </objects>
  #
  # convert into:
  #   ['druid:ab123de4567', 'druid:ab123de9012']
  #
  Nokogiri::XML(resp).xpath('//object[@id]').map { |n| n[:id] }
end

#stale_queued_workflows(*args) ⇒ Array[Hash]

Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager

Parameters:

  • repository (String)

    -- deprecated, ignored by workflow service

  • opts (Hash)

    optional values for query

Returns:

  • (Array[Hash])

    each Hash represents a workflow step. It will have the following keys: :workflow, :step, :druid, :lane_id



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/dor/workflow/client/queues.rb', line 40

def stale_queued_workflows(*args)
  if args.count == 2
    Deprecation.warn(
      self,
      '`#stale_queued_workflows` only takes one arg: a hash. This will raise an exception in Dor::Workflow::Client 4.0.0'
    )
    args.shift # ditch the `repo` argument
  end
  uri_string = build_queued_uri(args.first)
  parse_queued_workflows_response requestor.request(uri_string)
end