Class: Dor::WorkflowService

Inherits:
Object
  • Object
show all
Extended by:
Deprecation
Defined in:
lib/dor/services/workflow_service.rb

Overview

Create and update workflows

Constant Summary collapse

VALID_STATUS =

From Workflow Service's admin/Process.java

%w[waiting completed error queued skipped hold].freeze
@@handler =
nil
@@logger =
nil
@@dor_services_url =
nil
@@http_conn =
nil

Class Method Summary collapse

Class Method Details

.active_lifecycle(repo, druid, milestone) ⇒ Time

Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table

Examples:

An example lifecycle xml from the workflow service.

<lifecycle objectId="druid:ct011cv6501">
  <milestone date="2010-04-27T11:34:17-0700">registered</milestone>
  <milestone date="2010-04-29T10:12:51-0700">inprocess</milestone>
  <milestone date="2010-06-15T16:08:58-0700">released</milestone>
</lifecycle>

Parameters:

  • repo (String)

    repository name

  • druid (String)

    object id

  • milestone (String)

    name of the milestone being queried for

Returns:

  • (Time)

    when the milestone was achieved. Returns nil if the milestone does not exist



253
254
255
256
257
258
259
# File 'lib/dor/services/workflow_service.rb', line 253

def active_lifecycle(repo, druid, milestone)
  doc = query_lifecycle(repo, druid, true)
  milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
  return Time.parse(milestone['date']) if milestone

  nil
end

.active_workflows(repo, pid) ⇒ Array<String> Also known as: get_active_workflows

Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR

Examples:

Dor::WorkflowService.active_workflows('dor', 'druid:sr100hp0609')
=> ["accessionWF", "assemblyWF", "disseminationWF"]

Parameters:

  • repo (String)

    repository of the object

  • pid (String)

    id of object

Returns:

  • (Array<String>)

    list of active worklows. Returns an empty Array if none are found



167
168
169
170
171
# File 'lib/dor/services/workflow_service.rb', line 167

def active_workflows(repo, pid)
  Deprecation.warn(self, 'get_active_workflows will be removed without replacement because the workflow server no longer archives processes')
  doc = Nokogiri::XML(get_workflow_xml(repo, pid, ''))
  doc.xpath(%(//workflow[not(process/@archived)]/@id )).map(&:value)
end

.all_workflows_xml(druid) ⇒ String

Retrieves the raw XML for all the workflows for the the given object

Parameters:

  • druid (String)

    The id of the object

Returns:

  • (String)

    XML of the workflow



135
136
137
# File 'lib/dor/services/workflow_service.rb', line 135

def all_workflows_xml(druid)
  workflow_resource_method "objects/#{druid}/workflows"
end

.archive_active_workflow(repo, druid) ⇒ Object

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object to archive the workflows from



473
474
475
476
477
478
# File 'lib/dor/services/workflow_service.rb', line 473

def archive_active_workflow(repo, druid)
  workflows = get_active_workflows(repo, druid)
  workflows.each do |wf|
    archive_workflow(repo, druid, wf)
  end
end

.archive_workflow(_repo, druid, wf_name, version_num = nil) ⇒ Object

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object to delete the workflow from



482
483
484
485
486
487
488
# File 'lib/dor/services/workflow_service.rb', line 482

def archive_workflow(_repo, druid, wf_name, version_num = nil)
  raise 'Please call Dor::WorkflowService.configure(workflow_service_url, :dor_services_url => DOR_SERVIES_URL) once before archiving workflow' if @@dor_services_url.nil?

  url = "/v1/objects/#{druid}/workflows/#{wf_name}/archive"
  url += "/#{version_num}" if version_num
  workflow_resource_method(url, 'post', '')
end

.base_urlObject

Get the configured URL for the connection



533
534
535
# File 'lib/dor/services/workflow_service.rb', line 533

def base_url
  workflow_resource.url_prefix
end

.close_version(repo, druid, create_accession_wf = true) ⇒ Object

Calls the versionClose endpoint of the WorkflowService:

  • completes the versioningWF:submit-version and versioningWF:start-accession steps
  • initiates accesssionWF

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object to delete the workflow from

  • create_accession_wf (Boolean) (defaults to: true)

    Option to create accessionWF when closing a version. Defaults to true



498
499
500
501
502
503
# File 'lib/dor/services/workflow_service.rb', line 498

def close_version(repo, druid, create_accession_wf = true)
  uri = "#{repo}/objects/#{druid}/versionClose"
  uri += '?create-accession=false' unless create_accession_wf
  workflow_resource_method(uri, 'post', '')
  true
end

.configure(url_or_connection, opts = {}) ⇒ Faraday::Connection

Configure the workflow service TODO: replace with initialize

Parameters:

  • url (String)

    points to the workflow service

  • opts (Hash) (defaults to: {})

    optional params

Options Hash (opts):

  • :logger (Logger)

    defaults writing to workflow_service.log with weekly rotation

  • :dor_services_url (String)

    uri to the DOR REST service

  • :timeout (Integer)

    number of seconds for HTTP timeout

  • :client_cert_file (String)

    path to an SSL client certificate (deprecated)

  • :client_key_file (String)

    path to an SSL key file (deprecated)

  • :client_key_pass (String)

    password for the key file (deprecated)

Returns:

  • (Faraday::Connection)

    the REST client resource



560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
# File 'lib/dor/services/workflow_service.rb', line 560

def configure(url_or_connection, opts = {})
  @@logger           = opts[:logger] || default_logger
  @@dor_services_url = opts[:dor_services_url] if opts[:dor_services_url]
  # params[:ssl_client_cert] = OpenSSL::X509::Certificate.new(File.read(opts[:client_cert_file])) if opts[:client_cert_file]
  # params[:ssl_client_key]  = OpenSSL::PKey::RSA.new(File.read(opts[:client_key_file]), opts[:client_key_pass]) if opts[:client_key_file]
  @@handler = proc do |exception, attempt_number, total_delay|
    @@logger.warn "[Attempt #{attempt_number}] #{exception.class}: #{exception.message}; #{total_delay} seconds elapsed."
  end
  @@http_conn = case url_or_connection
                when String
                  build_connection(url_or_connection, opts)
                else
                  url_or_connection
                end
end

.count_archived_for_workflow(workflow, repository = 'dor') ⇒ Integer

Returns the number of objects that have completed a particular workflow

Parameters:

  • workflow (String)

    name

  • repository (String) (defaults to: 'dor')

    -- optional, default=dor

Returns:

  • (Integer)

    Number of objects with this repository:workflow that have been archived



416
417
418
419
420
# File 'lib/dor/services/workflow_service.rb', line 416

def count_archived_for_workflow(workflow, repository = 'dor')
  Deprecation.warn(self, 'count_archived_for_workflow will be removed without replacement because the workflow server no longer archives processes')
  resp = workflow_resource_method "workflow_archive?repository=#{repository}&workflow=#{workflow}&count-only=true"
  extract_object_count(resp)
end

.count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer

Returns the number of objects that have a status of 'error' in a particular workflow and step

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • repository (String) (defaults to: 'dor')

    -- optional, default=dor

Returns:

  • (Integer)

    Number of objects with this repository:workflow:step that have a status of 'error'



392
393
394
395
396
# File 'lib/dor/services/workflow_service.rb', line 392

def count_errored_for_workstep(workflow, step, repository = 'dor')
  Deprecation.warn(self, 'count_errored_for_workstep is deprecated and will be removed in dor-workflow-service version 3. Use count_objects_in_step instead.')

  count_objects_in_step(workflow, step, 'error', repository)
end

.count_objects_in_step(workflow, step, type, repo) ⇒ Object



576
577
578
579
# File 'lib/dor/services/workflow_service.rb', line 576

def count_objects_in_step(workflow, step, type, repo)
  resp = workflow_resource_method "workflow_queue?repository=#{repo}&workflow=#{workflow}&#{type}=#{step}"
  extract_object_count(resp)
end

.count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer

Returns the number of objects that have a status of 'queued' in a particular workflow and step

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • repository (String) (defaults to: 'dor')

    -- optional, default=dor

Returns:

  • (Integer)

    Number of objects with this repository:workflow:step that have a status of 'queued'



405
406
407
408
409
# File 'lib/dor/services/workflow_service.rb', line 405

def count_queued_for_workstep(workflow, step, repository = 'dor')
  Deprecation.warn(self, 'count_queued_for_workstep is deprecated and will be removed in dor-workflow-service version 3. Use count_objects_in_step instead.')

  count_objects_in_step(workflow, step, 'queued', repository)
end

.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer

Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in

Parameters:

  • repository (String)

    name of the repository you want to query, like 'dor' or 'sdr'

  • opts (Hash) (defaults to: {})

    optional values for query

Options Hash (opts):

  • :hours_ago (Integer)

    steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours, meaning you will get all queued workflows

Returns:

  • (Integer)

    number of stale, queued steps if the :count_only option was set to true



447
448
449
450
451
# File 'lib/dor/services/workflow_service.rb', line 447

def count_stale_queued_workflows(repository, opts = {})
  uri_string = build_queued_uri(repository, opts) + '&count-only=true'
  doc = Nokogiri::XML(workflow_resource_method(uri_string))
  doc.at_xpath('/objects/@count').value.to_i
end

.create_process_xml(params) ⇒ String

Parameters:

  • params (Hash)

Returns:

  • (String)


455
456
457
458
459
460
461
462
# File 'lib/dor/services/workflow_service.rb', line 455

def create_process_xml(params)
  builder = Nokogiri::XML::Builder.new do |xml|
    attrs = params.reject { |_k, v| v.nil? }
    attrs = Hash[attrs.map { |k, v| [k.to_s.camelize(:lower), v] }] # camelize all the keys in the attrs hash
    xml.process(attrs)
  end
  builder.to_xml
end

.create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true }) ⇒ Boolean

Creates a workflow for a given object in the repository. If this particular workflow for this objects exists, it will replace the old workflow with wf_xml passed to this method. You have the option of creating a datastream or not. Returns true on success. Caller must handle any exceptions

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow_name (String)

    The name of the workflow you want to create

  • wf_xml (String)

    The xml that represents the workflow

  • opts (Hash) (defaults to: { create_ds: true })

    optional params

Options Hash (opts):

  • :create_ds (Boolean)

    if true, a workflow datastream will be created in Fedora. Set to false if you do not want a datastream to be created If you do not pass in an opts Hash, then :create_ds is set to true by default

  • :lane_id (String)

    adds laneId attribute to all process elements in the wf_xml workflow xml. Defaults to a value of 'default'

Returns:

  • (Boolean)

    always true



47
48
49
50
51
52
53
54
# File 'lib/dor/services/workflow_service.rb', line 47

def create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true })
  lane_id = opts.fetch(:lane_id, 'default')
  xml = add_lane_id_to_workflow_xml(lane_id, wf_xml)
  status = workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow_name}", 'put', xml,
                                    content_type: 'application/xml',
                                    params: { 'create-ds' => opts[:create_ds] }
  true
end

.default_logger(logdev = 'workflow_service.log', shift_age = 'weekly') ⇒ Logger

Among other things, a distinct method helps tests mock default logger

Parameters:

  • logdev (String, IO) (defaults to: 'workflow_service.log')

    The log device. This is a filename (String) or IO object (typically STDOUT, STDERR, or an open file).

  • shift_age (String, Integer) (defaults to: 'weekly')

    Number of old log files to keep, or frequency of rotation (daily, weekly or monthly).

Returns:

  • (Logger)

    default logger object



541
542
543
# File 'lib/dor/services/workflow_service.rb', line 541

def default_logger(logdev = 'workflow_service.log', shift_age = 'weekly')
  Logger.new(logdev, shift_age)
end

.delete_workflow(repo, druid, workflow) ⇒ Boolean

Deletes a workflow from a particular repository and druid

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object to delete the workflow from

  • workflow (String)

    The name of the workflow to be deleted

Returns:

  • (Boolean)

    always true



213
214
215
216
# File 'lib/dor/services/workflow_service.rb', line 213

def delete_workflow(repo, druid, workflow)
  workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}", 'delete'
  true
end

.errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash

Get a list of druids that have errored out in a particular workflow and step

Examples:

Dor::WorkflowService.errored_objects_for_workstep('accessionWF','content-metadata')
=> {"druid:qd556jq0580"=>"druid:qd556jq0580 - Item error; caused by
   #<Rubydora::FedoraInvalidRequest: Error modifying datastream contentMetadata for druid:qd556jq0580. See logger for details>"}

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • repository (String) (defaults to: 'dor')

    -- optional, default=dor

Returns:

  • (Hash)

    hash of results, with key has a druid, and value as the error message



371
372
373
374
375
376
377
378
# File 'lib/dor/services/workflow_service.rb', line 371

def errored_objects_for_workstep(workflow, step, repository = 'dor')
  resp = workflow_resource_method "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}"
  result = {}
  Nokogiri::XML(resp).xpath('//object').collect do |node|
    result.merge!(node['id'] => node['errorMessage'])
  end
  result
end

.get_active_lifecycle(repo, druid, milestone) ⇒ Object



261
262
263
264
# File 'lib/dor/services/workflow_service.rb', line 261

def get_active_lifecycle(repo, druid, milestone)
  Deprecation.warn(self, 'use active_lifecycle instead. This will be removed in dor-workflow-service version 3')
  active_lifecycle(repo, druid, milestone)
end

.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Object



380
381
382
383
# File 'lib/dor/services/workflow_service.rb', line 380

def get_errored_objects_for_workstep(workflow, step, repository = 'dor')
  Deprecation.warn(self, 'use errored_objects_for_workstep instead. This will be removed in dor-workflow-service version 3')
  errored_objects_for_workstep(workflow, step, repository)
end

.get_lane_ids(repo, workflow, process) ⇒ Object



518
519
520
521
# File 'lib/dor/services/workflow_service.rb', line 518

def get_lane_ids(repo, workflow, process)
  Deprecation.warn(self, 'use lane_ids instead. This will be removed in dor-workflow-service version 3')
  lane_ids(repo, workflow, process)
end

.get_lifecycle(repo, druid, milestone) ⇒ Object



237
238
239
240
# File 'lib/dor/services/workflow_service.rb', line 237

def get_lifecycle(repo, druid, milestone)
  Deprecation.warn(self, 'use lifecycle instead. This will be removed in dor-workflow-service version 3')
  lifecycle(repo, druid, milestone)
end

.get_milestones(repo, druid) ⇒ Object



274
275
276
277
# File 'lib/dor/services/workflow_service.rb', line 274

def get_milestones(repo, druid)
  Deprecation.warn(self, 'use milestones instead. This will be removed in dor-workflow-service version 3')
  milestones(repo, druid)
end

.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Object



355
356
357
358
# File 'lib/dor/services/workflow_service.rb', line 355

def get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {})
  Deprecation.warn(self, 'use objects_for_workstep instead. This will be removed in dor-workflow-service version 3')
  objects_for_workstep(completed, waiting, lane_id, options)
end

.get_stale_queued_workflows(repository, opts = {}) ⇒ Object



436
437
438
439
# File 'lib/dor/services/workflow_service.rb', line 436

def get_stale_queued_workflows(repository, opts = {})
  Deprecation.warn(self, 'use stale_queued_workflows instead. This will be removed in dor-workflow-service version 3')
  stale_queued_workflows(repository, opts)
end

.get_workflow_status(repo, druid, workflow, process) ⇒ Object



107
108
109
110
# File 'lib/dor/services/workflow_service.rb', line 107

def get_workflow_status(repo, druid, workflow, process)
  Deprecation.warn(self, 'use workflow_status instead. This will be removed in dor-workflow-service version 3')
  workflow_status(repo, druid, workflow, process)
end

.get_workflow_xml(repo, druid, workflow) ⇒ Object



126
127
128
129
# File 'lib/dor/services/workflow_service.rb', line 126

def get_workflow_xml(repo, druid, workflow)
  Deprecation.warn(self, 'use workflow_xml instead. This will be removed in dor-workflow-service version 3')
  workflow_xml(repo, druid, workflow)
end

.get_workflows(pid, repo = 'dor') ⇒ Object



153
154
155
156
# File 'lib/dor/services/workflow_service.rb', line 153

def get_workflows(pid, repo = 'dor')
  Deprecation.warn(self, 'use workflows instead. This will be removed in dor-workflow-service version 3')
  workflows(pid, repo)
end

.lane_ids(repo, workflow, process) ⇒ Array<String>

Returns all the distinct laneIds for a given workflow step

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • workflow (String)

    name

  • process (String)

    name

Returns:

  • (Array<String>)

    all of the distinct laneIds. Array will be empty if no lane ids were found



511
512
513
514
515
516
# File 'lib/dor/services/workflow_service.rb', line 511

def lane_ids(repo, workflow, process)
  uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}"
  doc = Nokogiri::XML(workflow_resource_method(uri))
  nodes = doc.xpath('/lanes/lane')
  nodes.map { |n| n['id'] }
end

.lifecycle(repo, druid, milestone) ⇒ Time

Returns the Date for a requested milestone from workflow lifecycle

Examples:

An example lifecycle xml from the workflow service.

<lifecycle objectId="druid:ct011cv6501">
  <milestone date="2010-04-27T11:34:17-0700">registered</milestone>
  <milestone date="2010-04-29T10:12:51-0700">inprocess</milestone>
  <milestone date="2010-06-15T16:08:58-0700">released</milestone>
</lifecycle>

Parameters:

  • repo (String)

    repository name

  • druid (String)

    object id

  • milestone (String)

    name of the milestone being queried for

Returns:

  • (Time)

    when the milestone was achieved. Returns nil if the milestone does not exist



229
230
231
232
233
234
235
# File 'lib/dor/services/workflow_service.rb', line 229

def lifecycle(repo, druid, milestone)
  doc = query_lifecycle(repo, druid)
  milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
  return Time.parse(milestone['date']) if milestone

  nil
end

.milestones(repo, druid) ⇒ Hash

Returns:

  • (Hash)


267
268
269
270
271
272
# File 'lib/dor/services/workflow_service.rb', line 267

def milestones(repo, druid)
  doc = query_lifecycle(repo, druid)
  doc.xpath('//lifecycle/milestone').collect do |node|
    { milestone: node.text, at: Time.parse(node['date']), version: node['version'] }
  end
end

.objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>

Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params

Examples:

objects_for_workstep(...)
=> [
   "druid:py156ps0477",
   "druid:tt628cb6479",
   "druid:ct021wp7863"
 ]
objects_for_workstep(..., "lane1")
=> {
 "druid:py156ps0477",
 "druid:tt628cb6479",
}
objects_for_workstep(..., "lane1", limit: 1)
=> {
 "druid:py156ps0477",
}

Parameters:

  • completed (Array<String>, String)

    An array or single String of the completed steps, should use the qualified format: repository:workflow:step-name

  • waiting (String)

    name of the waiting step

  • repository (String)

    default repository to use if it isn't passed in the qualified-step-name

  • workflow (String)

    default workflow to use if it isn't passed in the qualified-step-name

  • lane_id (String) (defaults to: 'default')

    issue a query for a specific lane_id for the waiting step

  • options (Hash) (defaults to: {})
  • options (String) (defaults to: {})

    :default_repository repository to query for if not using the qualified format

  • options (String) (defaults to: {})

    :default_workflow workflow to query for if not using the qualified format

Options Hash (options):

  • :limit (Integer)

    maximum number of druids to return (nil for no limit)

Returns:

  • (Array<String>)

    Array of druids



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'lib/dor/services/workflow_service.rb', line 327

def objects_for_workstep(completed, waiting, lane_id = 'default', options = {})
  waiting_param = qualify_step(options[:default_repository], options[:default_workflow], waiting)
  uri_string = "workflow_queue?waiting=#{waiting_param}"
  if completed
    Array(completed).each do |step|
      completed_param = qualify_step(options[:default_repository], options[:default_workflow], step)
      uri_string += "&completed=#{completed_param}"
    end
  end

  uri_string += "&limit=#{options[:limit].to_i}" if options[:limit]&.to_i&.positive?
  uri_string += "&lane-id=#{lane_id}"

  resp = workflow_resource_method uri_string
  #
  # response looks like:
  #    <objects count="2">
  #      <object id="druid:ab123de4567"/>
  #      <object id="druid:ab123de9012"/>
  #    </objects>
  #
  # convert into:
  #   ['druid:ab123de4567', 'druid:ab123de9012']
  #
  result = Nokogiri::XML(resp).xpath('//object[@id]')
  result.map { |n| n[:id] }
end

.qualify_step(default_repository, default_workflow, step) ⇒ String

Converts repo-workflow-step into repo:workflow:step

Examples:

dor:assemblyWF:jp2-create

Parameters:

  • default_repository (String)
  • default_workflow (String)
  • step (String)

    if contains colon :, then the value for workflow and/or workflow/repository. For example: 'jp2-create', 'assemblyWF:jp2-create' or 'dor:assemblyWF:jp2-create'

Returns:

  • (String)

    repo:workflow:step



286
287
288
289
290
291
# File 'lib/dor/services/workflow_service.rb', line 286

def qualify_step(default_repository, default_workflow, step)
  current = step.split(/:/, 3)
  current.unshift(default_workflow)   if current.length < 3
  current.unshift(default_repository) if current.length < 3
  current.join(':')
end

.query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document

Returns:

  • (Nokogiri::XML::Document)


465
466
467
468
469
# File 'lib/dor/services/workflow_service.rb', line 465

def query_lifecycle(repo, druid, active_only = false)
  req = "#{repo}/objects/#{druid}/lifecycle"
  req += '?active-only=true' if active_only
  Nokogiri::XML(workflow_resource_method(req))
end

.stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]

Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager

Parameters:

  • repository (String)

    name of the repository you want to query, like 'dor' or 'sdr'

  • opts (Hash) (defaults to: {})

    optional values for query

Options Hash (opts):

  • :hours_ago (Integer)

    steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours, meaning you will get all queued workflows

  • :limit (Integer)

    sets the maximum number of workflow steps that can be returned. Defaults to no limit

Returns:

  • (Array[Hash])

    each Hash represents a workflow step. It will have the following keys: :workflow, :step, :druid, :lane_id



431
432
433
434
# File 'lib/dor/services/workflow_service.rb', line 431

def stale_queued_workflows(repository, opts = {})
  uri_string = build_queued_uri(repository, opts)
  parse_queued_workflows_response workflow_resource_method(uri_string)
end

.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean

Updates the status of one step in a workflow to error. Returns true on success. Caller must handle any exceptions

Http Call

The method does an HTTP PUT to the URL defined in Dor::WF_URI.

PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"error\" />"

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

  • error_msg (String)

    The error message. Ideally, this is a brief message describing the error

  • opts (Hash) (defaults to: {})

    optional values for the workflow step

Options Hash (opts):

  • :error_text (String)

    A slot to hold more information about the error, like a full stacktrace

Returns:

  • (Boolean)

    always true



201
202
203
204
205
206
# File 'lib/dor/services/workflow_service.rb', line 201

def update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {})
  opts = { error_text: nil }.merge!(opts)
  xml = create_process_xml({ name: process, status: 'error', errorMessage: error_msg }.merge!(opts))
  workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}", 'put', xml, content_type: 'application/xml'
  true
end

.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean

Updates the status of one step in a workflow. Returns true on success. Caller must handle any exceptions

Http Call

The method does an HTTP PUT to the URL defined in Dor::WF_URI. As an example:

PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"completed\" />"

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

  • process (String)

    The name of the process step

  • status (String)

    The status that you want to set -- using one of the values in VALID_STATUS

  • opts (Hash) (defaults to: {})

    optional values for the workflow step

Options Hash (opts):

  • :elapsed (Float)

    The number of seconds it took to complete this step. Can have a decimal. Is set to 0 if not passed in.

  • :lifecycle (String)

    Bookeeping label for this particular workflow step. Examples are: 'registered', 'shelved'

  • :note (String)

    Any kind of string annotation that you want to attach to the workflow

  • :lane_id (String)

    Id of processing lane used by the job manager. Can convey priority or name of an applicaiton specific processing lane (e.g. 'high', 'critical', 'hydrus')

  • :current_status (String)

    Setting this string tells the workflow service to compare the current status to this value. If the current value does not match this value, the update is not performed

Returns:

  • (Boolean)

    always true

Raises:

  • (ArgumentError)


77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/dor/services/workflow_service.rb', line 77

def update_workflow_status(repo, druid, workflow, process, status, opts = {})
  raise ArgumentError, "Unknown status value #{status}" unless VALID_STATUS.include?(status.downcase)

  opts = { elapsed: 0, lifecycle: nil, note: nil }.merge!(opts)
  opts[:elapsed] = opts[:elapsed].to_s
  current_status = opts.delete(:current_status)
  xml = create_process_xml({ name: process, status: status.downcase }.merge!(opts))
  uri = "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}"
  uri += "?current-status=#{current_status.downcase}" if current_status
  workflow_resource_method(uri, 'put', xml, content_type: 'application/xml')
  true
end

.workflow(repo: 'dor', pid:, workflow_name:) ⇒ Workflow::Response::Workflow

Parameters:

  • repo (String) (defaults to: 'dor')

    repository of the object

  • pid (String)

    id of object

  • workflow_name (String)

    The name of the workflow

Returns:



179
180
181
182
# File 'lib/dor/services/workflow_service.rb', line 179

def workflow(repo: 'dor', pid:, workflow_name:)
  xml = workflow_xml(repo, pid, workflow_name)
  Workflow::Response::Workflow.new(xml: xml)
end

.workflow_resourceFaraday::Connection

MIMICKING ATTRIBUTE READER

Returns:

  • (Faraday::Connection)

    the REST client resource created during configure()



525
526
527
528
529
# File 'lib/dor/services/workflow_service.rb', line 525

def workflow_resource
  raise 'Please call Dor::WorkflowService.configure(url) once before calling any WorkflowService methods' if @@http_conn.nil?

  @@http_conn
end

.workflow_service_exceptions_to_catchObject



545
546
547
# File 'lib/dor/services/workflow_service.rb', line 545

def workflow_service_exceptions_to_catch
  [Faraday::Error]
end

.workflow_status(repo, druid, workflow, process) ⇒ String

Retrieves the process status of the given workflow for the given object identifier

Parameters:

  • repo (String)

    The repository the object resides in. Currently recoginzes "dor" and "sdr".

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

  • process (String)

    The name of the process step

Returns:

  • (String)

    status for repo-workflow-process-druid

Raises:



97
98
99
100
101
102
103
104
105
# File 'lib/dor/services/workflow_service.rb', line 97

def workflow_status(repo, druid, workflow, process)
  workflow_md = workflow_xml(repo, druid, workflow)
  doc = Nokogiri::XML(workflow_md)
  raise Dor::WorkflowException, "Unable to parse response:\n#{workflow_md}" if doc.root.nil?

  processes = doc.root.xpath("//process[@name='#{process}']")
  process = processes.max { |a, b| a.attr('version').to_i <=> b.attr('version').to_i }
  process&.attr('status')
end

.workflow_xml(repo, druid, workflow) ⇒ String

Retrieves the raw XML for the given workflow

Parameters:

  • repo (String)

    The repository the object resides in. Currently recoginzes "dor" and "sdr".

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

Returns:

  • (String)

    XML of the workflow



118
119
120
121
122
123
124
# File 'lib/dor/services/workflow_service.rb', line 118

def workflow_xml(repo, druid, workflow)
  unless workflow
    Deprecation.warn(self, 'calling workflow_xml without a workflow is deprecated and will be removed in version 3. Use all_workflows_xml instead.')
    return all_workflows_xml(druid)
  end
  workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}"
end

.workflows(pid, repo = 'dor') ⇒ Array<String>

Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR

Examples:

Dor::WorkflowService.workflows('druid:sr100hp0609')
=> ["accessionWF", "assemblyWF", "disseminationWF"]

Parameters:

  • pid (String)

    of druid

  • repo (String) (defaults to: 'dor')

    repository for the object

Returns:

  • (Array<String>)

    list of worklows



148
149
150
151
# File 'lib/dor/services/workflow_service.rb', line 148

def workflows(pid, repo = 'dor')
  xml_doc = Nokogiri::XML(workflow_xml(repo, pid, ''))
  xml_doc.xpath('//workflow').collect { |workflow| workflow['id'] }
end