Class: Dor::WorkflowService
- Inherits:
-
Object
- Object
- Dor::WorkflowService
- Extended by:
- Deprecation
- Defined in:
- lib/dor/services/workflow_service.rb
Overview
Create and update workflows
Constant Summary collapse
- VALID_STATUS =
From Workflow Service's admin/Process.java
%w[waiting completed error queued skipped hold].freeze
- @@handler =
nil
- @@logger =
nil
- @@resource =
nil
- @@dor_services_url =
nil
- @@http_conn =
nil
Class Method Summary collapse
- .archive_active_workflow(repo, druid) ⇒ Object
- .archive_workflow(_repo, druid, wf_name, version_num = nil) ⇒ Object
-
.base_url ⇒ Object
Get the configured URL for the connection.
-
.close_version(repo, druid, create_accession_wf = true) ⇒ Object
Calls the versionClose endpoint of the WorkflowService:.
-
.configure(url_or_connection, opts = {}) ⇒ Faraday::Connection
Configure the workflow service TODO: replace with initialize.
-
.count_archived_for_workflow(workflow, repository = 'dor') ⇒ Integer
Returns the number of objects that have completed a particular workflow.
-
.count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step.
- .count_objects_in_step(workflow, step, type, repo) ⇒ Object
-
.count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step.
-
.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in.
- .create_process_xml(params) ⇒ String
-
.create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true }) ⇒ Boolean
Creates a workflow for a given object in the repository.
-
.default_logger(logdev = 'workflow_service.log', shift_age = 'weekly') ⇒ Logger
Among other things, a distinct method helps tests mock default logger.
-
.delete_workflow(repo, druid, workflow) ⇒ Boolean
Deletes a workflow from a particular repository and druid.
-
.get_active_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table.
-
.get_active_workflows(repo, pid) ⇒ Array<String>
Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR.
-
.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step.
-
.get_lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step.
-
.get_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone from workflow lifecycle.
- .get_milestones(repo, druid) ⇒ Hash
-
.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params.
-
.get_stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager.
-
.get_workflow_status(repo, druid, workflow, process) ⇒ String
Retrieves the process status of the given workflow for the given object identifier.
-
.get_workflow_xml(repo, druid, workflow) ⇒ String
Retrieves the raw XML for the given workflow.
-
.get_workflows(pid, repo = 'dor') ⇒ Array<String>
Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR.
-
.qualify_step(default_repository, default_workflow, step) ⇒ String
Converts repo-workflow-step into repo:workflow:step.
- .query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document
-
.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow to error.
-
.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow.
- .workflow(repo: 'dor', pid:, workflow_name:) ⇒ Workflow::Response::Workflow
-
.workflow_resource ⇒ Faraday::Connection
MIMICKING ATTRIBUTE READER.
- .workflow_service_exceptions_to_catch ⇒ Object
Class Method Details
.archive_active_workflow(repo, druid) ⇒ Object
410 411 412 413 414 415 |
# File 'lib/dor/services/workflow_service.rb', line 410 def archive_active_workflow(repo, druid) workflows = get_active_workflows(repo, druid) workflows.each do |wf| archive_workflow(repo, druid, wf) end end |
.archive_workflow(_repo, druid, wf_name, version_num = nil) ⇒ Object
419 420 421 422 423 424 425 |
# File 'lib/dor/services/workflow_service.rb', line 419 def archive_workflow(_repo, druid, wf_name, version_num = nil) raise 'Please call Dor::WorkflowService.configure(workflow_service_url, :dor_services_url => DOR_SERVIES_URL) once before archiving workflow' if @@dor_services_url.nil? url = "/v1/objects/#{druid}/workflows/#{wf_name}/archive" url += "/#{version_num}" if version_num workflow_resource_method(url, 'post', '') end |
.base_url ⇒ Object
Get the configured URL for the connection
465 466 467 |
# File 'lib/dor/services/workflow_service.rb', line 465 def base_url workflow_resource.url_prefix end |
.close_version(repo, druid, create_accession_wf = true) ⇒ Object
Calls the versionClose endpoint of the WorkflowService:
- completes the versioningWF:submit-version and versioningWF:start-accession steps
- initiates accesssionWF
435 436 437 438 439 440 |
# File 'lib/dor/services/workflow_service.rb', line 435 def close_version(repo, druid, create_accession_wf = true) uri = "#{repo}/objects/#{druid}/versionClose" uri += '?create-accession=false' unless create_accession_wf workflow_resource_method(uri, 'post', '') true end |
.configure(url_or_connection, opts = {}) ⇒ Faraday::Connection
Configure the workflow service TODO: replace with initialize
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 |
# File 'lib/dor/services/workflow_service.rb', line 492 def configure(url_or_connection, opts = {}) @@logger = opts[:logger] || default_logger @@dor_services_url = opts[:dor_services_url] if opts[:dor_services_url] # params[:ssl_client_cert] = OpenSSL::X509::Certificate.new(File.read(opts[:client_cert_file])) if opts[:client_cert_file] # params[:ssl_client_key] = OpenSSL::PKey::RSA.new(File.read(opts[:client_key_file]), opts[:client_key_pass]) if opts[:client_key_file] @@handler = proc do |exception, attempt_number, total_delay| @@logger.warn "[Attempt #{attempt_number}] #{exception.class}: #{exception.}; #{total_delay} seconds elapsed." end @@http_conn = case url_or_connection when String build_connection(url_or_connection, opts) else url_or_connection end end |
.count_archived_for_workflow(workflow, repository = 'dor') ⇒ Integer
Returns the number of objects that have completed a particular workflow
359 360 361 362 |
# File 'lib/dor/services/workflow_service.rb', line 359 def count_archived_for_workflow(workflow, repository = 'dor') resp = workflow_resource_method "workflow_archive?repository=#{repository}&workflow=#{workflow}&count-only=true" extract_object_count(resp) end |
.count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step
339 340 341 |
# File 'lib/dor/services/workflow_service.rb', line 339 def count_errored_for_workstep(workflow, step, repository = 'dor') count_objects_in_step(workflow, step, 'error', repository) end |
.count_objects_in_step(workflow, step, type, repo) ⇒ Object
508 509 510 511 |
# File 'lib/dor/services/workflow_service.rb', line 508 def count_objects_in_step(workflow, step, type, repo) resp = workflow_resource_method "workflow_queue?repository=#{repo}&workflow=#{workflow}&#{type}=#{step}" extract_object_count(resp) end |
.count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step
350 351 352 |
# File 'lib/dor/services/workflow_service.rb', line 350 def count_queued_for_workstep(workflow, step, repository = 'dor') count_objects_in_step(workflow, step, 'queued', repository) end |
.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
384 385 386 387 388 |
# File 'lib/dor/services/workflow_service.rb', line 384 def count_stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) + '&count-only=true' doc = Nokogiri::XML(workflow_resource_method(uri_string)) doc.at_xpath('/objects/@count').value.to_i end |
.create_process_xml(params) ⇒ String
392 393 394 395 396 397 398 399 |
# File 'lib/dor/services/workflow_service.rb', line 392 def create_process_xml(params) builder = Nokogiri::XML::Builder.new do |xml| attrs = params.reject { |_k, v| v.nil? } attrs = Hash[attrs.map { |k, v| [k.to_s.camelize(:lower), v] }] # camelize all the keys in the attrs hash xml.process(attrs) end builder.to_xml end |
.create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true }) ⇒ Boolean
Creates a workflow for a given object in the repository. If this particular workflow for this objects exists, it will replace the old workflow with wf_xml passed to this method. You have the option of creating a datastream or not. Returns true on success. Caller must handle any exceptions
48 49 50 51 52 53 54 55 |
# File 'lib/dor/services/workflow_service.rb', line 48 def create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true }) lane_id = opts.fetch(:lane_id, 'default') xml = add_lane_id_to_workflow_xml(lane_id, wf_xml) status = workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow_name}", 'put', xml, content_type: 'application/xml', params: { 'create-ds' => opts[:create_ds] } true end |
.default_logger(logdev = 'workflow_service.log', shift_age = 'weekly') ⇒ Logger
Among other things, a distinct method helps tests mock default logger
473 474 475 |
# File 'lib/dor/services/workflow_service.rb', line 473 def default_logger(logdev = 'workflow_service.log', shift_age = 'weekly') Logger.new(logdev, shift_age) end |
.delete_workflow(repo, druid, workflow) ⇒ Boolean
Deletes a workflow from a particular repository and druid
185 186 187 188 |
# File 'lib/dor/services/workflow_service.rb', line 185 def delete_workflow(repo, druid, workflow) workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}", 'delete' true end |
.get_active_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table
220 221 222 223 224 225 226 |
# File 'lib/dor/services/workflow_service.rb', line 220 def get_active_lifecycle(repo, druid, milestone) doc = query_lifecycle(repo, druid, true) milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']") return Time.parse(milestone['date']) if milestone nil end |
.get_active_workflows(repo, pid) ⇒ Array<String>
Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR
141 142 143 144 145 |
# File 'lib/dor/services/workflow_service.rb', line 141 def get_active_workflows(repo, pid) Deprecation.warn(self, 'get_active_workflows will be removed without replacement because the workflow server no longer archives processes') doc = Nokogiri::XML(get_workflow_xml(repo, pid, '')) doc.xpath(%(//workflow[not(process/@archived)]/@id )).map(&:value) end |
.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step
323 324 325 326 327 328 329 330 |
# File 'lib/dor/services/workflow_service.rb', line 323 def get_errored_objects_for_workstep(workflow, step, repository = 'dor') resp = workflow_resource_method "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}" result = {} Nokogiri::XML(resp).xpath('//object').collect do |node| result.merge!(node['id'] => node['errorMessage']) end result end |
.get_lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step
448 449 450 451 452 453 |
# File 'lib/dor/services/workflow_service.rb', line 448 def get_lane_ids(repo, workflow, process) uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}" doc = Nokogiri::XML(workflow_resource_method(uri)) nodes = doc.xpath('/lanes/lane') nodes.map { |n| n['id'] } end |
.get_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone from workflow lifecycle
201 202 203 204 205 206 207 |
# File 'lib/dor/services/workflow_service.rb', line 201 def get_lifecycle(repo, druid, milestone) doc = query_lifecycle(repo, druid) milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']") return Time.parse(milestone['date']) if milestone nil end |
.get_milestones(repo, druid) ⇒ Hash
229 230 231 232 233 234 |
# File 'lib/dor/services/workflow_service.rb', line 229 def get_milestones(repo, druid) doc = query_lifecycle(repo, druid) doc.xpath('//lifecycle/milestone').collect do |node| { milestone: node.text, at: Time.parse(node['date']), version: node['version'] } end end |
.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/dor/services/workflow_service.rb', line 284 def get_objects_for_workstep(completed, waiting, lane_id = 'default', = {}) waiting_param = qualify_step([:default_repository], [:default_workflow], waiting) uri_string = "workflow_queue?waiting=#{waiting_param}" if completed Array(completed).each do |step| completed_param = qualify_step([:default_repository], [:default_workflow], step) uri_string += "&completed=#{completed_param}" end end uri_string += "&limit=#{[:limit].to_i}" if [:limit]&.to_i&.positive? uri_string += "&lane-id=#{lane_id}" resp = workflow_resource_method uri_string # # response looks like: # <objects count="2"> # <object id="druid:ab123de4567"/> # <object id="druid:ab123de9012"/> # </objects> # # convert into: # ['druid:ab123de4567', 'druid:ab123de9012'] # result = Nokogiri::XML(resp).xpath('//object[@id]') result.map { |n| n[:id] } end |
.get_stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager
373 374 375 376 |
# File 'lib/dor/services/workflow_service.rb', line 373 def get_stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) parse_queued_workflows_response workflow_resource_method(uri_string) end |
.get_workflow_status(repo, druid, workflow, process) ⇒ String
Retrieves the process status of the given workflow for the given object identifier
98 99 100 101 102 103 104 105 106 |
# File 'lib/dor/services/workflow_service.rb', line 98 def get_workflow_status(repo, druid, workflow, process) workflow_md = get_workflow_xml(repo, druid, workflow) doc = Nokogiri::XML(workflow_md) raise Dor::WorkflowException, "Unable to parse response:\n#{workflow_md}" if doc.root.nil? processes = doc.root.xpath("//process[@name='#{process}']") process = processes.max { |a, b| a.attr('version').to_i <=> b.attr('version').to_i } process&.attr('status') end |
.get_workflow_xml(repo, druid, workflow) ⇒ String
Retrieves the raw XML for the given workflow
114 115 116 |
# File 'lib/dor/services/workflow_service.rb', line 114 def get_workflow_xml(repo, druid, workflow) workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}" end |
.get_workflows(pid, repo = 'dor') ⇒ Array<String>
Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR
127 128 129 130 |
# File 'lib/dor/services/workflow_service.rb', line 127 def get_workflows(pid, repo = 'dor') xml_doc = Nokogiri::XML(get_workflow_xml(repo, pid, '')) xml_doc.xpath('//workflow').collect { |workflow| workflow['id'] } end |
.qualify_step(default_repository, default_workflow, step) ⇒ String
Converts repo-workflow-step into repo:workflow:step
243 244 245 246 247 248 |
# File 'lib/dor/services/workflow_service.rb', line 243 def qualify_step(default_repository, default_workflow, step) current = step.split(/:/, 3) current.unshift(default_workflow) if current.length < 3 current.unshift(default_repository) if current.length < 3 current.join(':') end |
.query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document
402 403 404 405 406 |
# File 'lib/dor/services/workflow_service.rb', line 402 def query_lifecycle(repo, druid, active_only = false) req = "#{repo}/objects/#{druid}/lifecycle" req += '?active-only=true' if active_only Nokogiri::XML(workflow_resource_method(req)) end |
.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow to error. Returns true on success. Caller must handle any exceptions
Http Call
The method does an HTTP PUT to the URL defined in Dor::WF_URI
.
PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"error\" />"
173 174 175 176 177 178 |
# File 'lib/dor/services/workflow_service.rb', line 173 def update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) opts = { error_text: nil }.merge!(opts) xml = create_process_xml({ name: process, status: 'error', errorMessage: error_msg }.merge!(opts)) workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}", 'put', xml, content_type: 'application/xml' true end |
.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow. Returns true on success. Caller must handle any exceptions
Http Call
The method does an HTTP PUT to the URL defined in Dor::WF_URI
. As an example:
PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"completed\" />"
78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/dor/services/workflow_service.rb', line 78 def update_workflow_status(repo, druid, workflow, process, status, opts = {}) raise ArgumentError, "Unknown status value #{status}" unless VALID_STATUS.include?(status.downcase) opts = { elapsed: 0, lifecycle: nil, note: nil }.merge!(opts) opts[:elapsed] = opts[:elapsed].to_s current_status = opts.delete(:current_status) xml = create_process_xml({ name: process, status: status.downcase }.merge!(opts)) uri = "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}" uri += "?current-status=#{current_status.downcase}" if current_status workflow_resource_method(uri, 'put', xml, content_type: 'application/xml') true end |
.workflow(repo: 'dor', pid:, workflow_name:) ⇒ Workflow::Response::Workflow
151 152 153 154 |
# File 'lib/dor/services/workflow_service.rb', line 151 def workflow(repo: 'dor', pid:, workflow_name:) xml = get_workflow_xml(repo, pid, workflow_name) Workflow::Response::Workflow.new(xml: xml) end |
.workflow_resource ⇒ Faraday::Connection
MIMICKING ATTRIBUTE READER
457 458 459 460 461 |
# File 'lib/dor/services/workflow_service.rb', line 457 def workflow_resource raise 'Please call Dor::WorkflowService.configure(url) once before calling any WorkflowService methods' if @@http_conn.nil? @@http_conn end |
.workflow_service_exceptions_to_catch ⇒ Object
477 478 479 |
# File 'lib/dor/services/workflow_service.rb', line 477 def workflow_service_exceptions_to_catch [Faraday::Error] end |