Class: Dor::WorkflowService
- Inherits:
-
Object
- Object
- Dor::WorkflowService
- Extended by:
- Deprecation
- Defined in:
- lib/dor/services/workflow_service.rb
Overview
Create and update workflows
Constant Summary collapse
- VALID_STATUS =
From Workflow Service's admin/Process.java
%w[waiting completed error queued skipped hold].freeze
- @@handler =
nil
- @@logger =
nil
- @@dor_services_url =
nil
- @@http_conn =
nil
Class Method Summary collapse
- .archive_active_workflow(repo, druid) ⇒ Object
- .archive_workflow(_repo, druid, wf_name, version_num = nil) ⇒ Object
-
.base_url ⇒ Object
Get the configured URL for the connection.
-
.close_version(repo, druid, create_accession_wf = true) ⇒ Object
Calls the versionClose endpoint of the WorkflowService:.
-
.configure(url_or_connection, opts = {}) ⇒ Faraday::Connection
Configure the workflow service TODO: replace with initialize.
-
.count_archived_for_workflow(workflow, repository = 'dor') ⇒ Integer
Returns the number of objects that have completed a particular workflow.
-
.count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step.
- .count_objects_in_step(workflow, step, type, repo) ⇒ Object
-
.count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step.
-
.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in.
- .create_process_xml(params) ⇒ String
-
.create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true }) ⇒ Boolean
Creates a workflow for a given object in the repository.
-
.default_logger(logdev = 'workflow_service.log', shift_age = 'weekly') ⇒ Logger
Among other things, a distinct method helps tests mock default logger.
-
.delete_workflow(repo, druid, workflow) ⇒ Boolean
Deletes a workflow from a particular repository and druid.
-
.get_active_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table.
-
.get_active_workflows(repo, pid) ⇒ Array<String>
Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR.
-
.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step.
-
.get_lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step.
-
.get_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone from workflow lifecycle.
- .get_milestones(repo, druid) ⇒ Hash
-
.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params.
-
.get_stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager.
-
.get_workflow_status(repo, druid, workflow, process) ⇒ String
Retrieves the process status of the given workflow for the given object identifier.
-
.get_workflow_xml(repo, druid, workflow) ⇒ String
Retrieves the raw XML for the given workflow.
-
.get_workflows(pid, repo = 'dor') ⇒ Array<String>
Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR.
-
.qualify_step(default_repository, default_workflow, step) ⇒ String
Converts repo-workflow-step into repo:workflow:step.
- .query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document
-
.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow to error.
-
.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow.
- .workflow(repo: 'dor', pid:, workflow_name:) ⇒ Workflow::Response::Workflow
-
.workflow_resource ⇒ Faraday::Connection
MIMICKING ATTRIBUTE READER.
- .workflow_service_exceptions_to_catch ⇒ Object
Class Method Details
.archive_active_workflow(repo, druid) ⇒ Object
409 410 411 412 413 414 |
# File 'lib/dor/services/workflow_service.rb', line 409 def archive_active_workflow(repo, druid) workflows = get_active_workflows(repo, druid) workflows.each do |wf| archive_workflow(repo, druid, wf) end end |
.archive_workflow(_repo, druid, wf_name, version_num = nil) ⇒ Object
418 419 420 421 422 423 424 |
# File 'lib/dor/services/workflow_service.rb', line 418 def archive_workflow(_repo, druid, wf_name, version_num = nil) raise 'Please call Dor::WorkflowService.configure(workflow_service_url, :dor_services_url => DOR_SERVIES_URL) once before archiving workflow' if @@dor_services_url.nil? url = "/v1/objects/#{druid}/workflows/#{wf_name}/archive" url += "/#{version_num}" if version_num workflow_resource_method(url, 'post', '') end |
.base_url ⇒ Object
Get the configured URL for the connection
464 465 466 |
# File 'lib/dor/services/workflow_service.rb', line 464 def base_url workflow_resource.url_prefix end |
.close_version(repo, druid, create_accession_wf = true) ⇒ Object
Calls the versionClose endpoint of the WorkflowService:
- completes the versioningWF:submit-version and versioningWF:start-accession steps
- initiates accesssionWF
434 435 436 437 438 439 |
# File 'lib/dor/services/workflow_service.rb', line 434 def close_version(repo, druid, create_accession_wf = true) uri = "#{repo}/objects/#{druid}/versionClose" uri += '?create-accession=false' unless create_accession_wf workflow_resource_method(uri, 'post', '') true end |
.configure(url_or_connection, opts = {}) ⇒ Faraday::Connection
Configure the workflow service TODO: replace with initialize
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/dor/services/workflow_service.rb', line 491 def configure(url_or_connection, opts = {}) @@logger = opts[:logger] || default_logger @@dor_services_url = opts[:dor_services_url] if opts[:dor_services_url] # params[:ssl_client_cert] = OpenSSL::X509::Certificate.new(File.read(opts[:client_cert_file])) if opts[:client_cert_file] # params[:ssl_client_key] = OpenSSL::PKey::RSA.new(File.read(opts[:client_key_file]), opts[:client_key_pass]) if opts[:client_key_file] @@handler = proc do |exception, attempt_number, total_delay| @@logger.warn "[Attempt #{attempt_number}] #{exception.class}: #{exception.}; #{total_delay} seconds elapsed." end @@http_conn = case url_or_connection when String build_connection(url_or_connection, opts) else url_or_connection end end |
.count_archived_for_workflow(workflow, repository = 'dor') ⇒ Integer
Returns the number of objects that have completed a particular workflow
358 359 360 361 |
# File 'lib/dor/services/workflow_service.rb', line 358 def count_archived_for_workflow(workflow, repository = 'dor') resp = workflow_resource_method "workflow_archive?repository=#{repository}&workflow=#{workflow}&count-only=true" extract_object_count(resp) end |
.count_errored_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'error' in a particular workflow and step
338 339 340 |
# File 'lib/dor/services/workflow_service.rb', line 338 def count_errored_for_workstep(workflow, step, repository = 'dor') count_objects_in_step(workflow, step, 'error', repository) end |
.count_objects_in_step(workflow, step, type, repo) ⇒ Object
507 508 509 510 |
# File 'lib/dor/services/workflow_service.rb', line 507 def count_objects_in_step(workflow, step, type, repo) resp = workflow_resource_method "workflow_queue?repository=#{repo}&workflow=#{workflow}&#{type}=#{step}" extract_object_count(resp) end |
.count_queued_for_workstep(workflow, step, repository = 'dor') ⇒ Integer
Returns the number of objects that have a status of 'queued' in a particular workflow and step
349 350 351 |
# File 'lib/dor/services/workflow_service.rb', line 349 def count_queued_for_workstep(workflow, step, repository = 'dor') count_objects_in_step(workflow, step, 'queued', repository) end |
.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
383 384 385 386 387 |
# File 'lib/dor/services/workflow_service.rb', line 383 def count_stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) + '&count-only=true' doc = Nokogiri::XML(workflow_resource_method(uri_string)) doc.at_xpath('/objects/@count').value.to_i end |
.create_process_xml(params) ⇒ String
391 392 393 394 395 396 397 398 |
# File 'lib/dor/services/workflow_service.rb', line 391 def create_process_xml(params) builder = Nokogiri::XML::Builder.new do |xml| attrs = params.reject { |_k, v| v.nil? } attrs = Hash[attrs.map { |k, v| [k.to_s.camelize(:lower), v] }] # camelize all the keys in the attrs hash xml.process(attrs) end builder.to_xml end |
.create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true }) ⇒ Boolean
Creates a workflow for a given object in the repository. If this particular workflow for this objects exists, it will replace the old workflow with wf_xml passed to this method. You have the option of creating a datastream or not. Returns true on success. Caller must handle any exceptions
47 48 49 50 51 52 53 54 |
# File 'lib/dor/services/workflow_service.rb', line 47 def create_workflow(repo, druid, workflow_name, wf_xml, opts = { create_ds: true }) lane_id = opts.fetch(:lane_id, 'default') xml = add_lane_id_to_workflow_xml(lane_id, wf_xml) status = workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow_name}", 'put', xml, content_type: 'application/xml', params: { 'create-ds' => opts[:create_ds] } true end |
.default_logger(logdev = 'workflow_service.log', shift_age = 'weekly') ⇒ Logger
Among other things, a distinct method helps tests mock default logger
472 473 474 |
# File 'lib/dor/services/workflow_service.rb', line 472 def default_logger(logdev = 'workflow_service.log', shift_age = 'weekly') Logger.new(logdev, shift_age) end |
.delete_workflow(repo, druid, workflow) ⇒ Boolean
Deletes a workflow from a particular repository and druid
184 185 186 187 |
# File 'lib/dor/services/workflow_service.rb', line 184 def delete_workflow(repo, druid, workflow) workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}", 'delete' true end |
.get_active_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table
219 220 221 222 223 224 225 |
# File 'lib/dor/services/workflow_service.rb', line 219 def get_active_lifecycle(repo, druid, milestone) doc = query_lifecycle(repo, druid, true) milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']") return Time.parse(milestone['date']) if milestone nil end |
.get_active_workflows(repo, pid) ⇒ Array<String>
Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR
140 141 142 143 144 |
# File 'lib/dor/services/workflow_service.rb', line 140 def get_active_workflows(repo, pid) Deprecation.warn(self, 'get_active_workflows will be removed without replacement because the workflow server no longer archives processes') doc = Nokogiri::XML(get_workflow_xml(repo, pid, '')) doc.xpath(%(//workflow[not(process/@archived)]/@id )).map(&:value) end |
.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step
322 323 324 325 326 327 328 329 |
# File 'lib/dor/services/workflow_service.rb', line 322 def get_errored_objects_for_workstep(workflow, step, repository = 'dor') resp = workflow_resource_method "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}" result = {} Nokogiri::XML(resp).xpath('//object').collect do |node| result.merge!(node['id'] => node['errorMessage']) end result end |
.get_lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step
447 448 449 450 451 452 |
# File 'lib/dor/services/workflow_service.rb', line 447 def get_lane_ids(repo, workflow, process) uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}" doc = Nokogiri::XML(workflow_resource_method(uri)) nodes = doc.xpath('/lanes/lane') nodes.map { |n| n['id'] } end |
.get_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone from workflow lifecycle
200 201 202 203 204 205 206 |
# File 'lib/dor/services/workflow_service.rb', line 200 def get_lifecycle(repo, druid, milestone) doc = query_lifecycle(repo, druid) milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']") return Time.parse(milestone['date']) if milestone nil end |
.get_milestones(repo, druid) ⇒ Hash
228 229 230 231 232 233 |
# File 'lib/dor/services/workflow_service.rb', line 228 def get_milestones(repo, druid) doc = query_lifecycle(repo, druid) doc.xpath('//lifecycle/milestone').collect do |node| { milestone: node.text, at: Time.parse(node['date']), version: node['version'] } end end |
.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/dor/services/workflow_service.rb', line 283 def get_objects_for_workstep(completed, waiting, lane_id = 'default', = {}) waiting_param = qualify_step([:default_repository], [:default_workflow], waiting) uri_string = "workflow_queue?waiting=#{waiting_param}" if completed Array(completed).each do |step| completed_param = qualify_step([:default_repository], [:default_workflow], step) uri_string += "&completed=#{completed_param}" end end uri_string += "&limit=#{[:limit].to_i}" if [:limit]&.to_i&.positive? uri_string += "&lane-id=#{lane_id}" resp = workflow_resource_method uri_string # # response looks like: # <objects count="2"> # <object id="druid:ab123de4567"/> # <object id="druid:ab123de9012"/> # </objects> # # convert into: # ['druid:ab123de4567', 'druid:ab123de9012'] # result = Nokogiri::XML(resp).xpath('//object[@id]') result.map { |n| n[:id] } end |
.get_stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager
372 373 374 375 |
# File 'lib/dor/services/workflow_service.rb', line 372 def get_stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) parse_queued_workflows_response workflow_resource_method(uri_string) end |
.get_workflow_status(repo, druid, workflow, process) ⇒ String
Retrieves the process status of the given workflow for the given object identifier
97 98 99 100 101 102 103 104 105 |
# File 'lib/dor/services/workflow_service.rb', line 97 def get_workflow_status(repo, druid, workflow, process) workflow_md = get_workflow_xml(repo, druid, workflow) doc = Nokogiri::XML(workflow_md) raise Dor::WorkflowException, "Unable to parse response:\n#{workflow_md}" if doc.root.nil? processes = doc.root.xpath("//process[@name='#{process}']") process = processes.max { |a, b| a.attr('version').to_i <=> b.attr('version').to_i } process&.attr('status') end |
.get_workflow_xml(repo, druid, workflow) ⇒ String
Retrieves the raw XML for the given workflow
113 114 115 |
# File 'lib/dor/services/workflow_service.rb', line 113 def get_workflow_xml(repo, druid, workflow) workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}" end |
.get_workflows(pid, repo = 'dor') ⇒ Array<String>
Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR
126 127 128 129 |
# File 'lib/dor/services/workflow_service.rb', line 126 def get_workflows(pid, repo = 'dor') xml_doc = Nokogiri::XML(get_workflow_xml(repo, pid, '')) xml_doc.xpath('//workflow').collect { |workflow| workflow['id'] } end |
.qualify_step(default_repository, default_workflow, step) ⇒ String
Converts repo-workflow-step into repo:workflow:step
242 243 244 245 246 247 |
# File 'lib/dor/services/workflow_service.rb', line 242 def qualify_step(default_repository, default_workflow, step) current = step.split(/:/, 3) current.unshift(default_workflow) if current.length < 3 current.unshift(default_repository) if current.length < 3 current.join(':') end |
.query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document
401 402 403 404 405 |
# File 'lib/dor/services/workflow_service.rb', line 401 def query_lifecycle(repo, druid, active_only = false) req = "#{repo}/objects/#{druid}/lifecycle" req += '?active-only=true' if active_only Nokogiri::XML(workflow_resource_method(req)) end |
.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow to error. Returns true on success. Caller must handle any exceptions
Http Call
The method does an HTTP PUT to the URL defined in Dor::WF_URI
.
PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"error\" />"
172 173 174 175 176 177 |
# File 'lib/dor/services/workflow_service.rb', line 172 def update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) opts = { error_text: nil }.merge!(opts) xml = create_process_xml({ name: process, status: 'error', errorMessage: error_msg }.merge!(opts)) workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}", 'put', xml, content_type: 'application/xml' true end |
.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow. Returns true on success. Caller must handle any exceptions
Http Call
The method does an HTTP PUT to the URL defined in Dor::WF_URI
. As an example:
PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"completed\" />"
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/dor/services/workflow_service.rb', line 77 def update_workflow_status(repo, druid, workflow, process, status, opts = {}) raise ArgumentError, "Unknown status value #{status}" unless VALID_STATUS.include?(status.downcase) opts = { elapsed: 0, lifecycle: nil, note: nil }.merge!(opts) opts[:elapsed] = opts[:elapsed].to_s current_status = opts.delete(:current_status) xml = create_process_xml({ name: process, status: status.downcase }.merge!(opts)) uri = "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}" uri += "?current-status=#{current_status.downcase}" if current_status workflow_resource_method(uri, 'put', xml, content_type: 'application/xml') true end |
.workflow(repo: 'dor', pid:, workflow_name:) ⇒ Workflow::Response::Workflow
150 151 152 153 |
# File 'lib/dor/services/workflow_service.rb', line 150 def workflow(repo: 'dor', pid:, workflow_name:) xml = get_workflow_xml(repo, pid, workflow_name) Workflow::Response::Workflow.new(xml: xml) end |
.workflow_resource ⇒ Faraday::Connection
MIMICKING ATTRIBUTE READER
456 457 458 459 460 |
# File 'lib/dor/services/workflow_service.rb', line 456 def workflow_resource raise 'Please call Dor::WorkflowService.configure(url) once before calling any WorkflowService methods' if @@http_conn.nil? @@http_conn end |
.workflow_service_exceptions_to_catch ⇒ Object
476 477 478 |
# File 'lib/dor/services/workflow_service.rb', line 476 def workflow_service_exceptions_to_catch [Faraday::Error] end |