Module: Dor::WorkflowService

Defined in:
lib/dor/services/workflow_service.rb

Overview

Methods to create and update workflow

Constant Summary collapse

VALID_STATUS =

From Workflow Service's admin/Process.java

%w{waiting completed error queued skipped hold}
@@resource =
nil
@@dor_services_url =
nil

Class Method Summary collapse

Class Method Details

.archive_active_workflow(repo, druid) ⇒ Object



365
366
367
368
369
370
# File 'lib/dor/services/workflow_service.rb', line 365

def archive_active_workflow(repo, druid)
  workflows = get_active_workflows(repo, druid)
  workflows.each do |wf|
    archive_workflow(repo, druid, wf)
  end
end

.archive_workflow(repo, druid, wf_name, version_num = nil) ⇒ Object



372
373
374
375
376
377
378
379
# File 'lib/dor/services/workflow_service.rb', line 372

def archive_workflow(repo, druid, wf_name, version_num=nil)
  raise "Please call Dor::WorkflowService.configure(workflow_service_url, :dor_services_url => DOR_SERVIES_URL) once before archiving workflow" if(@@dor_services_url.nil?)

  dor_services = RestClient::Resource.new(@@dor_services_url)
  url = "/v1/objects/#{druid}/workflows/#{wf_name}/archive"
  url << "/#{version_num}" if(version_num)
  dor_services[url].post ''
end

.close_version(repo, druid, create_accession_wf = true) ⇒ Object

Calls the versionClose endpoint of the WorkflowService:

  • completes the versioningWF:submit-version and versioningWF:start-accession steps
  • initiates accesssionWF

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object to delete the workflow from

  • create_accession_wf (Boolean) (defaults to: true)

    Option to create accessionWF when closing a version. Defaults to true



389
390
391
392
393
394
# File 'lib/dor/services/workflow_service.rb', line 389

def close_version(repo, druid, create_accession_wf = true)
  uri = "#{repo}/objects/#{druid}/versionClose"
  uri << "?create-accession=false" if(!create_accession_wf)
  workflow_resource[uri].post ''
  return true
end

.configure(url, opts = {}) ⇒ RestClient::Resource

Configure the workflow service

Parameters:

  • url (String)

    points to the workflow service

  • opts (Hash) (defaults to: {})

    optional params

Options Hash (opts):

  • :dor_services_uri (String)

    uri to the DOR REST service

  • :timeout (Integer)

    number of seconds for RestClient timeout

  • :client_cert_file (String)

    path to an SSL client certificate (deprecated)

  • :client_key_file (String)

    path to an SSL key file (deprecated)

  • :client_key_pass (String)

    password for the key file (deprecated)

Returns:

  • (RestClient::Resource)

    the REST client resource



425
426
427
428
429
430
431
432
# File 'lib/dor/services/workflow_service.rb', line 425

def configure(url, opts={})
  params = {}
  params[:timeout] = opts[:timeout] if opts[:timeout]
  @@dor_services_url = opts[:dor_services_url] if opts[:dor_services_url]
  #params[:ssl_client_cert] = OpenSSL::X509::Certificate.new(File.read(opts[:client_cert_file])) if opts[:client_cert_file]
  #params[:ssl_client_key]  = OpenSSL::PKey::RSA.new(File.read(opts[:client_key_file]), opts[:client_key_pass]) if opts[:client_key_file]
  @@resource = RestClient::Resource.new(url, params)
end

.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer

Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in

Parameters:

  • repository (String)

    name of the repository you want to query, like 'dor' or 'sdr'

  • opts (Hash) (defaults to: {})

    optional values for query

Options Hash (opts):

  • :hours_ago (Integer)

    steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours, meaning you will get all queued workflows

Returns:

  • (Integer)

    number of stale, queued steps if the :count_only option was set to true



339
340
341
342
343
344
345
# File 'lib/dor/services/workflow_service.rb', line 339

def count_stale_queued_workflows(repository, opts = {})
  uri_string = build_queued_uri(repository, opts)
  uri_string << "&count-only=true"
  xml = workflow_resource[uri_string].get
  doc = Nokogiri::XML(xml)
  return doc.at_xpath('/objects/@count').value.to_i
end

.create_process_xml(params) ⇒ String

Returns:

  • (String)


348
349
350
351
352
353
354
355
# File 'lib/dor/services/workflow_service.rb', line 348

def create_process_xml(params)
  builder = Nokogiri::XML::Builder.new do |xml|
    attrs = params.reject { |k,v| v.nil? }
    attrs = Hash[ attrs.map {|k,v| [k.to_s.camelize(:lower), v]}]  # camelize all the keys in the attrs hash
    xml.process(attrs)
  end
  return builder.to_xml
end

.create_workflow(repo, druid, workflow_name, wf_xml, opts = {:create_ds => true}) ⇒ Boolean

Creates a workflow for a given object in the repository. If this particular workflow for this objects exists, it will replace the old workflow with wf_xml passed to this method. You have the option of creating a datastream or not. Returns true on success. Caller must handle any exceptions

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow_name (String)

    The name of the workflow you want to create

  • wf_xml (String)

    The xml that represents the workflow

  • opts (Hash) (defaults to: {:create_ds => true})

    optional params

Options Hash (opts):

  • :create_ds (Boolean)

    if true, a workflow datastream will be created in Fedora. Set to false if you do not want a datastream to be created If you do not pass in an opts Hash, then :create_ds is set to true by default

  • :lane_id (String)

    adds laneId attribute to all process elements in the wf_xml workflow xml. Defaults to a value of 'default'

Returns:

  • (Boolean)

    always true



31
32
33
34
35
36
37
# File 'lib/dor/services/workflow_service.rb', line 31

def create_workflow(repo, druid, workflow_name, wf_xml, opts = {:create_ds => true})
  lane_id = opts.fetch(:lane_id, 'default')
  xml = add_lane_id_to_workflow_xml(lane_id, wf_xml)
  workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow_name}"].put(xml, :content_type => 'application/xml',
                                                                               :params => {'create-ds' => opts[:create_ds] })
  return true
end

.delete_workflow(repo, druid, workflow) ⇒ Boolean

Deletes a workflow from a particular repository and druid

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object to delete the workflow from

  • workflow (String)

    The name of the workflow to be deleted

Returns:

  • (Boolean)

    always true



158
159
160
161
# File 'lib/dor/services/workflow_service.rb', line 158

def delete_workflow(repo, druid, workflow)
  workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}"].delete
  return true
end

.get_active_lifecycle(repo, druid, milestone) ⇒ Time

Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table

Examples:

An example lifecycle xml from the workflow service.

<lifecycle objectId="druid:ct011cv6501">
  <milestone date="2010-04-27T11:34:17-0700">registered</milestone>
  <milestone date="2010-04-29T10:12:51-0700">inprocess</milestone>
  <milestone date="2010-06-15T16:08:58-0700">released</milestone>
</lifecycle>

Parameters:

  • repo (String)

    epository name

  • druid (String)

    object id

  • milestone (String)

    name of the milestone being queried for

Returns:

  • (Time)

    when the milestone was achieved. Returns nil if the milestone does not exist



195
196
197
198
199
200
201
202
203
# File 'lib/dor/services/workflow_service.rb', line 195

def get_active_lifecycle(repo, druid, milestone)
  doc = self.query_lifecycle(repo, druid, true)
  milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
  if(milestone)
    return Time.parse(milestone['date'])
  end

  nil
end

.get_active_workflows(repo, pid) ⇒ Array<String>

Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR

Examples:

Dor::WorkflowService.get_workflows('dor', 'druid:sr100hp0609')
=> ["accessionWF", "assemblyWF", "disseminationWF"]

Parameters:

  • repo (String)

    repository of the object

  • pid (String)

    id of object

Returns:

  • (Array<String>)

    list of active worklows. Returns an empty Array if none are found



124
125
126
127
# File 'lib/dor/services/workflow_service.rb', line 124

def get_active_workflows(repo, pid)
  doc = Nokogiri::XML(get_workflow_xml(repo,pid,''))
  doc.xpath( %(//workflow[not(process/@archived)]/@id ) ).map {|n| n.value}
end

.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash

Get a list of druids that have errored out in a particular workflow and step

Examples:

Dor::WorkflowService.get_errored_objects_for_workstep('accessionWF','content-metadata')
=> {"druid:qd556jq0580"=>"druid:qd556jq0580 - Item error; caused by
   #<Rubydora::FedoraInvalidRequest: Error modifying datastream contentMetadata for druid:qd556jq0580. See logger for details>"}

Parameters:

  • workflow (String)

    name

  • step (String)

    name

  • repository (String) (defaults to: 'dor')

    -- optional, default=dor

Returns:

  • (Hash)

    hash of results, with key has a druid, and value as the error message



308
309
310
311
312
313
314
315
316
# File 'lib/dor/services/workflow_service.rb', line 308

def get_errored_objects_for_workstep workflow, step, repository='dor'
  result = {}
  uri_string = "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}"
  resp = workflow_resource[uri_string].get
  objs = Nokogiri::XML(resp).xpath('//object').collect do |node|
    result.merge!(node['id'] => node['errorMessage'])
  end
  result
end

.get_lane_ids(repo, workflow, process) ⇒ Array<String>

Returns all the distinct laneIds for a given workflow step

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • workflow (String)

    name

  • process (String)

    name

Returns:

  • (Array<String>)

    all of the distinct laneIds. Array will be empty if no lane ids were found



402
403
404
405
406
407
# File 'lib/dor/services/workflow_service.rb', line 402

def get_lane_ids(repo, workflow, process)
  uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}"
  doc = Nokogiri::XML(workflow_resource[uri].get)
  nodes = doc.xpath('/lanes/lane')
  nodes.map {|n| n['id']}
end

.get_lifecycle(repo, druid, milestone) ⇒ Time

Returns the Date for a requested milestone from workflow lifecycle

Examples:

An example lifecycle xml from the workflow service.

<lifecycle objectId="druid:ct011cv6501">
  <milestone date="2010-04-27T11:34:17-0700">registered</milestone>
  <milestone date="2010-04-29T10:12:51-0700">inprocess</milestone>
  <milestone date="2010-06-15T16:08:58-0700">released</milestone>
</lifecycle>

Parameters:

  • repo (String)

    repository name

  • druid (String)

    object id

  • milestone (String)

    name of the milestone being queried for

Returns:

  • (Time)

    when the milestone was achieved. Returns nil if the milestone does not exist



174
175
176
177
178
179
180
181
182
# File 'lib/dor/services/workflow_service.rb', line 174

def get_lifecycle(repo, druid, milestone)
  doc = self.query_lifecycle(repo, druid)
  milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
  if(milestone)
    return Time.parse(milestone['date'])
  end

  nil
end

.get_milestones(repo, druid) ⇒ Hash

Returns:

  • (Hash)


206
207
208
209
210
211
# File 'lib/dor/services/workflow_service.rb', line 206

def get_milestones(repo, druid)
  doc = self.query_lifecycle(repo, druid)
  doc.xpath("//lifecycle/milestone").collect do |node|
    { :milestone => node.text, :at => Time.parse(node['date']), :version => node['version'] }
  end
end

.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>

Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params

Examples:

get_objects_for_workstep(...)
=> [
   "druid:py156ps0477",
   "druid:tt628cb6479",
   "druid:ct021wp7863"
 ]
get_objects_for_workstep(..., "lane1")
=> {
 "druid:py156ps0477",
 "druid:tt628cb6479",
}
get_objects_for_workstep(..., "lane1", limit: 1)
=> {
 "druid:py156ps0477",
}

Parameters:

  • completed (Array<String>, String)

    An array or single String of the completed steps, should use the qualified format: repository:workflow:step-name

  • waiting (String)

    name of the waiting step

  • repository (String)

    default repository to use if it isn't passed in the qualified-step-name

  • workflow (String)

    default workflow to use if it isn't passed in the qualified-step-name

  • lane_id (String) (defaults to: 'default')

    issue a query for a specific lane_id for the waiting step

  • options (Hash) (defaults to: {})
  • options (String) (defaults to: {})

    :default_repository repository to query for if not using the qualified format

  • options (String) (defaults to: {})

    :default_workflow workflow to query for if not using the qualified format

Options Hash (options):

  • :limit (Integer)

    maximum number of druids to return (nil for no limit)

Returns:

  • (Array<String>)

    Array of druids



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/dor/services/workflow_service.rb', line 264

def get_objects_for_workstep completed, waiting, lane_id='default', options = {}
  result = nil
  waiting_param = qualify_step(options[:default_repository],options[:default_workflow],waiting)
  uri_string = "workflow_queue?waiting=#{waiting_param}"
  if(completed)
    Array(completed).each do |step|
      completed_param = qualify_step(options[:default_repository],options[:default_workflow],step)
      uri_string << "&completed=#{completed_param}"
    end
  end

  if options[:limit] and options[:limit].to_i > 0
    uri_string << "&limit=#{options[:limit].to_i}"
  end

  uri_string << "&lane-id=#{lane_id}"

  workflow_resource.options[:timeout] = 5 * 60 unless(workflow_resource.options.include?(:timeout))
  resp = workflow_resource[uri_string].get
  #
  # response looks like:
  #    <objects count="2">
  #      <object id="druid:ab123de4567"/>
  #      <object id="druid:ab123de9012"/>
  #    </objects>
  #
  # convert into:
  #   ['druid:ab123de4567', 'druid:ab123de9012']
  #
  result = Nokogiri::XML(resp).xpath('//object[@id]')
  result.map { |n| n[:id] }
end

.get_stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]

Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager

Parameters:

  • repository (String)

    name of the repository you want to query, like 'dor' or 'sdr'

  • opts (Hash) (defaults to: {})

    optional values for query

Options Hash (opts):

  • :hours_ago (Integer)

    steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours, meaning you will get all queued workflows

  • :limit (Integer)

    sets the maximum number of workflow steps that can be returned. Defaults to no limit

Returns:

  • (Array[Hash])

    each Hash represents a workflow step. It will have the following keys: :workflow, :step, :druid, :lane_id



327
328
329
330
331
# File 'lib/dor/services/workflow_service.rb', line 327

def get_stale_queued_workflows(repository, opts = {})
  uri_string = build_queued_uri(repository, opts)
  xml = workflow_resource[uri_string].get
  parse_queued_workflows_response xml
end

.get_workflow_status(repo, druid, workflow, process) ⇒ String

Retrieves the process status of the given workflow for the given object identifier

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

  • process (String)

    The name of the process step

Returns:

  • (String)

    status for repo-workflow-process-druid

Raises:

  • (Exception)


79
80
81
82
83
84
85
86
87
88
89
# File 'lib/dor/services/workflow_service.rb', line 79

def get_workflow_status(repo, druid, workflow, process)
  workflow_md = get_workflow_xml(repo, druid, workflow)
  doc = Nokogiri::XML(workflow_md)
  raise Exception.new("Unable to parse response:\n#{workflow_md}") if(doc.root.nil?)

  status = doc.root.at_xpath("//process[@name='#{process}']/@status")
  if status
    status=status.content
  end
  return status
end

.get_workflow_xml(repo, druid, workflow) ⇒ String

Retrieves the raw XML for the given workflow

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

Returns:

  • (String)

    XML of the workflow



97
98
99
# File 'lib/dor/services/workflow_service.rb', line 97

def get_workflow_xml(repo, druid, workflow)
  workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}"].get
end

.get_workflows(pid, repo = 'dor') ⇒ Array<String>

Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR

Examples:

Dor::WorkflowService.get_workflows('druid:sr100hp0609')
=> ["accessionWF", "assemblyWF", "disseminationWF"]

Parameters:

  • pid (String)

    of druid

  • repo (String) (defaults to: 'dor')

    repository for the object. Defaults to 'dor'

Returns:

  • (Array<String>)

    list of worklows



110
111
112
113
# File 'lib/dor/services/workflow_service.rb', line 110

def get_workflows(pid, repo='dor')
  xml_doc=Nokogiri::XML(get_workflow_xml(repo,pid,''))
  return xml_doc.xpath('//workflow').collect {|workflow| workflow['id']}
end

.qualify_step(default_repository, default_workflow, step) ⇒ String

Converts repo-workflow-step into repo:workflow:step

Examples:

dor:assemblyWF:jp2-create

Parameters:

  • default_repository (String)
  • default_workflow (String)
  • step (String)

    if contains colon :, then uses the value for workflow and/or workflow/repository. for example, jp2-create, or assemblyWF:jp2-create, or dor:assemblyWF:jp2-create

Returns:

  • (String)

    repo:workflow:step



223
224
225
226
227
228
# File 'lib/dor/services/workflow_service.rb', line 223

def qualify_step(default_repository, default_workflow, step)
  current = step.split(/:/,3)
  current.unshift(default_workflow) if current.length < 3
  current.unshift(default_repository) if current.length < 3
  current.join(':')
end

.query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document

Returns:

  • (Nokogiri::XML::Document)


358
359
360
361
362
363
# File 'lib/dor/services/workflow_service.rb', line 358

def query_lifecycle(repo, druid, active_only = false)
  req = "#{repo}/objects/#{druid}/lifecycle"
  req << '?active-only=true' if active_only
  lifecycle_xml = workflow_resource[req].get
  return Nokogiri::XML(lifecycle_xml)
end

.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean

Updates the status of one step in a workflow to error. Returns true on success. Caller must handle any exceptions

Http Call

The method does an HTTP PUT to the URL defined in Dor::WF_URI.

PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"error\" />"

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

  • error_msg (String)

    The error message. Ideally, this is a brief message describing the error

  • opts (Hash) (defaults to: {})

    optional values for the workflow step

Options Hash (opts):

  • :error_text (String)

    A slot to hold more information about the error, like a full stacktrace

Returns:

  • (Boolean)

    always true



146
147
148
149
150
151
# File 'lib/dor/services/workflow_service.rb', line 146

def update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {})
  opts = {:error_text => nil}.merge!(opts)
  xml = create_process_xml({:name => process, :status => 'error', :errorMessage => error_msg}.merge!(opts))
  workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}"].put(xml, :content_type => 'application/xml')
  return true
end

.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean

Updates the status of one step in a workflow. Returns true on success. Caller must handle any exceptions

Http Call

The method does an HTTP PUT to the URL defined in Dor::WF_URI. As an example:

PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"completed\" />"

Parameters:

  • repo (String)

    The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment

  • druid (String)

    The id of the object

  • workflow (String)

    The name of the workflow

  • process (String)

    The name of the process step

  • status (String)

    The status that you want to set -- using one of the values in VALID_STATUS

  • opts (Hash) (defaults to: {})

    optional values for the workflow step

Options Hash (opts):

  • :elapsed (Float)

    The number of seconds it took to complete this step. Can have a decimal. Is set to 0 if not passed in.

  • :lifecycle (String)

    Bookeeping label for this particular workflow step. Examples are: 'registered', 'shelved'

  • :note (String)

    Any kind of string annotation that you want to attach to the workflow

  • :lane_id (String)

    Id of processing lane used by the job manager. Can convey priority or name of an applicaiton specific processing lane (e.g. 'high', 'critical', 'hydrus')

  • :current_status (String)

    Setting this string tells the workflow service to compare the current status to this value. If the current value does not match this value, the update is not performed

Returns:

  • (Boolean)

    always true

Raises:

  • (ArgumentError)


60
61
62
63
64
65
66
67
68
69
70
# File 'lib/dor/services/workflow_service.rb', line 60

def update_workflow_status(repo, druid, workflow, process, status, opts = {})
  raise ArgumentError, "Unknown status value #{status}" unless VALID_STATUS.include?(status.downcase)
  opts = {:elapsed => 0, :lifecycle => nil, :note => nil}.merge!(opts)
  opts[:elapsed] = opts[:elapsed].to_s
  current_status = opts.delete(:current_status)
  xml = create_process_xml({:name => process, :status => status.downcase}.merge!(opts))
  uri = "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}"
  uri << "?current-status=#{current_status.downcase}" if current_status
  workflow_resource[uri].put(xml, :content_type => 'application/xml')
  return true
end

.workflow_resourceRestClient::Resource

Returns the REST client resource.

Returns:

  • (RestClient::Resource)

    the REST client resource



410
411
412
413
# File 'lib/dor/services/workflow_service.rb', line 410

def workflow_resource
  raise "Please call Dor::WorkflowService.configure(url) once before calling any WorkflowService methods" if(@@resource.nil?)
  @@resource
end