Module: Dor::WorkflowService
- Defined in:
- lib/dor/services/workflow_service.rb
Overview
Methods to create and update workflow
Constant Summary collapse
- VALID_STATUS =
From Workflow Service's admin/Process.java
%w{waiting completed error queued skipped hold}- @@resource =
nil- @@dor_services_url =
nil
Class Method Summary collapse
- .archive_active_workflow(repo, druid) ⇒ Object
- .archive_workflow(repo, druid, wf_name, version_num = nil) ⇒ Object
-
.close_version(repo, druid, create_accession_wf = true) ⇒ Object
Calls the versionClose endpoint of the WorkflowService:.
-
.configure(url, opts = {}) ⇒ RestClient::Resource
Configure the workflow service.
-
.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in.
- .create_process_xml(params) ⇒ String
-
.create_workflow(repo, druid, workflow_name, wf_xml, opts = {:create_ds => true}) ⇒ Boolean
Creates a workflow for a given object in the repository.
-
.delete_workflow(repo, druid, workflow) ⇒ Boolean
Deletes a workflow from a particular repository and druid.
-
.get_active_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table.
-
.get_active_workflows(repo, pid) ⇒ Array<String>
Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR.
-
.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step.
-
.get_lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step.
-
.get_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone from workflow lifecycle.
- .get_milestones(repo, druid) ⇒ Hash
-
.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params.
-
.get_stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager.
-
.get_workflow_status(repo, druid, workflow, process) ⇒ String
Retrieves the process status of the given workflow for the given object identifier.
-
.get_workflow_xml(repo, druid, workflow) ⇒ String
Retrieves the raw XML for the given workflow.
-
.get_workflows(pid, repo = 'dor') ⇒ Array<String>
Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR.
-
.qualify_step(default_repository, default_workflow, step) ⇒ String
Converts repo-workflow-step into repo:workflow:step.
- .query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document
-
.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow to error.
-
.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow.
-
.workflow_resource ⇒ RestClient::Resource
The REST client resource.
Class Method Details
.archive_active_workflow(repo, druid) ⇒ Object
365 366 367 368 369 370 |
# File 'lib/dor/services/workflow_service.rb', line 365 def archive_active_workflow(repo, druid) workflows = get_active_workflows(repo, druid) workflows.each do |wf| archive_workflow(repo, druid, wf) end end |
.archive_workflow(repo, druid, wf_name, version_num = nil) ⇒ Object
372 373 374 375 376 377 378 379 |
# File 'lib/dor/services/workflow_service.rb', line 372 def archive_workflow(repo, druid, wf_name, version_num=nil) raise "Please call Dor::WorkflowService.configure(workflow_service_url, :dor_services_url => DOR_SERVIES_URL) once before archiving workflow" if(@@dor_services_url.nil?) dor_services = RestClient::Resource.new(@@dor_services_url) url = "/v1/objects/#{druid}/workflows/#{wf_name}/archive" url << "/#{version_num}" if(version_num) dor_services[url].post '' end |
.close_version(repo, druid, create_accession_wf = true) ⇒ Object
Calls the versionClose endpoint of the WorkflowService:
- completes the versioningWF:submit-version and versioningWF:start-accession steps
- initiates accesssionWF
389 390 391 392 393 394 |
# File 'lib/dor/services/workflow_service.rb', line 389 def close_version(repo, druid, create_accession_wf = true) uri = "#{repo}/objects/#{druid}/versionClose" uri << "?create-accession=false" if(!create_accession_wf) workflow_resource[uri].post '' return true end |
.configure(url, opts = {}) ⇒ RestClient::Resource
Configure the workflow service
425 426 427 428 429 430 431 432 |
# File 'lib/dor/services/workflow_service.rb', line 425 def configure(url, opts={}) params = {} params[:timeout] = opts[:timeout] if opts[:timeout] @@dor_services_url = opts[:dor_services_url] if opts[:dor_services_url] #params[:ssl_client_cert] = OpenSSL::X509::Certificate.new(File.read(opts[:client_cert_file])) if opts[:client_cert_file] #params[:ssl_client_key] = OpenSSL::PKey::RSA.new(File.read(opts[:client_key_file]), opts[:client_key_pass]) if opts[:client_key_file] @@resource = RestClient::Resource.new(url, params) end |
.count_stale_queued_workflows(repository, opts = {}) ⇒ Integer
Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
339 340 341 342 343 344 345 |
# File 'lib/dor/services/workflow_service.rb', line 339 def count_stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) uri_string << "&count-only=true" xml = workflow_resource[uri_string].get doc = Nokogiri::XML(xml) return doc.at_xpath('/objects/@count').value.to_i end |
.create_process_xml(params) ⇒ String
348 349 350 351 352 353 354 355 |
# File 'lib/dor/services/workflow_service.rb', line 348 def create_process_xml(params) builder = Nokogiri::XML::Builder.new do |xml| attrs = params.reject { |k,v| v.nil? } attrs = Hash[ attrs.map {|k,v| [k.to_s.camelize(:lower), v]}] # camelize all the keys in the attrs hash xml.process(attrs) end return builder.to_xml end |
.create_workflow(repo, druid, workflow_name, wf_xml, opts = {:create_ds => true}) ⇒ Boolean
Creates a workflow for a given object in the repository. If this particular workflow for this objects exists, it will replace the old workflow with wf_xml passed to this method. You have the option of creating a datastream or not. Returns true on success. Caller must handle any exceptions
31 32 33 34 35 36 37 |
# File 'lib/dor/services/workflow_service.rb', line 31 def create_workflow(repo, druid, workflow_name, wf_xml, opts = {:create_ds => true}) lane_id = opts.fetch(:lane_id, 'default') xml = add_lane_id_to_workflow_xml(lane_id, wf_xml) workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow_name}"].put(xml, :content_type => 'application/xml', :params => {'create-ds' => opts[:create_ds] }) return true end |
.delete_workflow(repo, druid, workflow) ⇒ Boolean
Deletes a workflow from a particular repository and druid
158 159 160 161 |
# File 'lib/dor/services/workflow_service.rb', line 158 def delete_workflow(repo, druid, workflow) workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}"].delete return true end |
.get_active_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table
195 196 197 198 199 200 201 202 203 |
# File 'lib/dor/services/workflow_service.rb', line 195 def get_active_lifecycle(repo, druid, milestone) doc = self.query_lifecycle(repo, druid, true) milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']") if(milestone) return Time.parse(milestone['date']) end nil end |
.get_active_workflows(repo, pid) ⇒ Array<String>
Get active workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR
124 125 126 127 |
# File 'lib/dor/services/workflow_service.rb', line 124 def get_active_workflows(repo, pid) doc = Nokogiri::XML(get_workflow_xml(repo,pid,'')) doc.xpath( %(//workflow[not(process/@archived)]/@id ) ).map {|n| n.value} end |
.get_errored_objects_for_workstep(workflow, step, repository = 'dor') ⇒ Hash
Get a list of druids that have errored out in a particular workflow and step
308 309 310 311 312 313 314 315 316 |
# File 'lib/dor/services/workflow_service.rb', line 308 def get_errored_objects_for_workstep workflow, step, repository='dor' result = {} uri_string = "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}" resp = workflow_resource[uri_string].get objs = Nokogiri::XML(resp).xpath('//object').collect do |node| result.merge!(node['id'] => node['errorMessage']) end result end |
.get_lane_ids(repo, workflow, process) ⇒ Array<String>
Returns all the distinct laneIds for a given workflow step
402 403 404 405 406 407 |
# File 'lib/dor/services/workflow_service.rb', line 402 def get_lane_ids(repo, workflow, process) uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}" doc = Nokogiri::XML(workflow_resource[uri].get) nodes = doc.xpath('/lanes/lane') nodes.map {|n| n['id']} end |
.get_lifecycle(repo, druid, milestone) ⇒ Time
Returns the Date for a requested milestone from workflow lifecycle
174 175 176 177 178 179 180 181 182 |
# File 'lib/dor/services/workflow_service.rb', line 174 def get_lifecycle(repo, druid, milestone) doc = self.query_lifecycle(repo, druid) milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']") if(milestone) return Time.parse(milestone['date']) end nil end |
.get_milestones(repo, druid) ⇒ Hash
206 207 208 209 210 211 |
# File 'lib/dor/services/workflow_service.rb', line 206 def get_milestones(repo, druid) doc = self.query_lifecycle(repo, druid) doc.xpath("//lifecycle/milestone").collect do |node| { :milestone => node.text, :at => Time.parse(node['date']), :version => node['version'] } end end |
.get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {}) ⇒ Array<String>
Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/dor/services/workflow_service.rb', line 264 def get_objects_for_workstep completed, waiting, lane_id='default', = {} result = nil waiting_param = qualify_step([:default_repository],[:default_workflow],waiting) uri_string = "workflow_queue?waiting=#{waiting_param}" if(completed) Array(completed).each do |step| completed_param = qualify_step([:default_repository],[:default_workflow],step) uri_string << "&completed=#{completed_param}" end end if [:limit] and [:limit].to_i > 0 uri_string << "&limit=#{options[:limit].to_i}" end uri_string << "&lane-id=#{lane_id}" workflow_resource.[:timeout] = 5 * 60 unless(workflow_resource..include?(:timeout)) resp = workflow_resource[uri_string].get # # response looks like: # <objects count="2"> # <object id="druid:ab123de4567"/> # <object id="druid:ab123de9012"/> # </objects> # # convert into: # ['druid:ab123de4567', 'druid:ab123de9012'] # result = Nokogiri::XML(resp).xpath('//object[@id]') result.map { |n| n[:id] } end |
.get_stale_queued_workflows(repository, opts = {}) ⇒ Array[Hash]
Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in This will enable re-queueing of jobs that have been lost by the job manager
327 328 329 330 331 |
# File 'lib/dor/services/workflow_service.rb', line 327 def get_stale_queued_workflows(repository, opts = {}) uri_string = build_queued_uri(repository, opts) xml = workflow_resource[uri_string].get parse_queued_workflows_response xml end |
.get_workflow_status(repo, druid, workflow, process) ⇒ String
Retrieves the process status of the given workflow for the given object identifier
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/dor/services/workflow_service.rb', line 79 def get_workflow_status(repo, druid, workflow, process) workflow_md = get_workflow_xml(repo, druid, workflow) doc = Nokogiri::XML(workflow_md) raise Exception.new("Unable to parse response:\n#{workflow_md}") if(doc.root.nil?) status = doc.root.at_xpath("//process[@name='#{process}']/@status") if status status=status.content end return status end |
.get_workflow_xml(repo, druid, workflow) ⇒ String
Retrieves the raw XML for the given workflow
97 98 99 |
# File 'lib/dor/services/workflow_service.rb', line 97 def get_workflow_xml(repo, druid, workflow) workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}"].get end |
.get_workflows(pid, repo = 'dor') ⇒ Array<String>
Get workflow names into an array for given PID This method only works when this gem is used in a project that is configured to connect to DOR
110 111 112 113 |
# File 'lib/dor/services/workflow_service.rb', line 110 def get_workflows(pid, repo='dor') xml_doc=Nokogiri::XML(get_workflow_xml(repo,pid,'')) return xml_doc.xpath('//workflow').collect {|workflow| workflow['id']} end |
.qualify_step(default_repository, default_workflow, step) ⇒ String
Converts repo-workflow-step into repo:workflow:step
223 224 225 226 227 228 |
# File 'lib/dor/services/workflow_service.rb', line 223 def qualify_step(default_repository, default_workflow, step) current = step.split(/:/,3) current.unshift(default_workflow) if current.length < 3 current.unshift(default_repository) if current.length < 3 current.join(':') end |
.query_lifecycle(repo, druid, active_only = false) ⇒ Nokogiri::XML::Document
358 359 360 361 362 363 |
# File 'lib/dor/services/workflow_service.rb', line 358 def query_lifecycle(repo, druid, active_only = false) req = "#{repo}/objects/#{druid}/lifecycle" req << '?active-only=true' if active_only lifecycle_xml = workflow_resource[req].get return Nokogiri::XML(lifecycle_xml) end |
.update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow to error. Returns true on success. Caller must handle any exceptions
Http Call
The method does an HTTP PUT to the URL defined in Dor::WF_URI.
PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"error\" />"
146 147 148 149 150 151 |
# File 'lib/dor/services/workflow_service.rb', line 146 def update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {}) opts = {:error_text => nil}.merge!(opts) xml = create_process_xml({:name => process, :status => 'error', :errorMessage => error_msg}.merge!(opts)) workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}"].put(xml, :content_type => 'application/xml') return true end |
.update_workflow_status(repo, druid, workflow, process, status, opts = {}) ⇒ Boolean
Updates the status of one step in a workflow. Returns true on success. Caller must handle any exceptions
Http Call
The method does an HTTP PUT to the URL defined in Dor::WF_URI. As an example:
PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
<process name=\"convert\" status=\"completed\" />"
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/dor/services/workflow_service.rb', line 60 def update_workflow_status(repo, druid, workflow, process, status, opts = {}) raise ArgumentError, "Unknown status value #{status}" unless VALID_STATUS.include?(status.downcase) opts = {:elapsed => 0, :lifecycle => nil, :note => nil}.merge!(opts) opts[:elapsed] = opts[:elapsed].to_s current_status = opts.delete(:current_status) xml = create_process_xml({:name => process, :status => status.downcase}.merge!(opts)) uri = "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}" uri << "?current-status=#{current_status.downcase}" if current_status workflow_resource[uri].put(xml, :content_type => 'application/xml') return true end |
.workflow_resource ⇒ RestClient::Resource
Returns the REST client resource.
410 411 412 413 |
# File 'lib/dor/services/workflow_service.rb', line 410 def workflow_resource raise "Please call Dor::WorkflowService.configure(url) once before calling any WorkflowService methods" if(@@resource.nil?) @@resource end |