Class: Sidekiq::Hierarchy::Workflow
- Inherits:
-
Object
- Object
- Sidekiq::Hierarchy::Workflow
- Extended by:
- Forwardable
- Defined in:
- lib/sidekiq/hierarchy/workflow.rb
Instance Attribute Summary collapse
-
#root ⇒ Object
readonly
Returns the value of attribute root.
Class Method Summary collapse
Instance Method Summary collapse
- #==(other_workflow) ⇒ Object
- #complete? ⇒ Boolean
-
#complete_at ⇒ Object
Returns the time at which all jobs were complete; nil if any jobs are still incomplete.
- #delete ⇒ Object
-
#enqueued_at ⇒ Object
Calculated metrics.
- #failed? ⇒ Boolean
-
#failed_at ⇒ Object
Returns the earliest time at which a job failed; nil if none did.
- #finished_at ⇒ Object
-
#initialize(root) ⇒ Workflow
constructor
A new instance of Workflow.
- #run_at ⇒ Object
- #running? ⇒ Boolean
-
#status ⇒ Object
Status.
- #to_s ⇒ Object
- #update_status(from_job_status) ⇒ Object
- #workflow_set ⇒ Object
Constructor Details
Instance Attribute Details
#root ⇒ Object (readonly)
Returns the value of attribute root.
6 7 8 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 6 def root @root end |
Class Method Details
.find_by_jid(root_jid) ⇒ Object
15 16 17 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 15 def find_by_jid(root_jid) find(Job.find(root_jid)) end |
Instance Method Details
#==(other_workflow) ⇒ Object
22 23 24 25 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 22 def ==(other_workflow) other_workflow.instance_of?(self.class) && self.jid == other_workflow.jid end |
#complete? ⇒ Boolean
79 80 81 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 79 def complete? status == :complete end |
#complete_at ⇒ Object
Returns the time at which all jobs were complete; nil if any jobs are still incomplete
100 101 102 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 100 def complete_at Time.at(self[Job::WORKFLOW_FINISHED_AT_FIELD].to_f) if complete? end |
#delete ⇒ Object
31 32 33 34 35 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 31 def delete wset = workflow_set # save it for later root.delete # deleting nodes is more important than a dangling reference wset.remove(self) if wset # now we can clear out from the set end |
#enqueued_at ⇒ Object
Calculated metrics
90 91 92 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 90 def enqueued_at root.enqueued_at end |
#failed? ⇒ Boolean
83 84 85 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 83 def failed? status == :failed end |
#failed_at ⇒ Object
Returns the earliest time at which a job failed; nil if none did
106 107 108 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 106 def failed_at Time.at(self[Job::WORKFLOW_FINISHED_AT_FIELD].to_f) if failed? end |
#finished_at ⇒ Object
110 111 112 113 114 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 110 def finished_at if = self[Job::WORKFLOW_FINISHED_AT_FIELD] Time.at(.to_f) end end |
#run_at ⇒ Object
94 95 96 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 94 def run_at root.run_at end |
#running? ⇒ Boolean
75 76 77 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 75 def running? status == :running end |
#status ⇒ Object
Status
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 43 def status case self[Job::WORKFLOW_STATUS_FIELD] when Job::STATUS_RUNNING :running when Job::STATUS_COMPLETE :complete when Job::STATUS_FAILED :failed else :unknown end end |
#to_s ⇒ Object
121 122 123 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 121 def to_s Sidekiq.dump_json(self.as_json) end |
#update_status(from_job_status) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 56 def update_status(from_job_status) old_status = status return if [:failed, :complete].include?(old_status) # these states are final if [:enqueued, :running, :requeued].include?(from_job_status) new_status, s_val = :running, Job::STATUS_RUNNING elsif from_job_status == :failed new_status, s_val = :failed, Job::STATUS_FAILED elsif from_job_status == :complete && root.subtree_size == root.finished_subtree_size new_status, s_val = :complete, Job::STATUS_COMPLETE end return if !new_status || new_status == old_status # don't publish null updates self[Job::WORKFLOW_STATUS_FIELD] = s_val self[Job::WORKFLOW_FINISHED_AT_FIELD] = Time.now.to_f.to_s if [:failed, :complete].include?(new_status) Sidekiq::Hierarchy.publish(Notifications::WORKFLOW_UPDATE, self, new_status, old_status) end |
#workflow_set ⇒ Object
27 28 29 |
# File 'lib/sidekiq/hierarchy/workflow.rb', line 27 def workflow_set WorkflowSet.for_status(status) end |