Class: Sidekiq::Hierarchy::Workflow

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/sidekiq/hierarchy/workflow.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(root) ⇒ Workflow

Returns a new instance of Workflow.



8
9
10
# File 'lib/sidekiq/hierarchy/workflow.rb', line 8

def initialize(root)
  @root = root
end

Instance Attribute Details

#rootObject (readonly)

Returns the value of attribute root.



6
7
8
# File 'lib/sidekiq/hierarchy/workflow.rb', line 6

def root
  @root
end

Class Method Details

.find_by_jid(root_jid) ⇒ Object



15
16
17
# File 'lib/sidekiq/hierarchy/workflow.rb', line 15

def find_by_jid(root_jid)
  find(Job.find(root_jid))
end

Instance Method Details

#==(other_workflow) ⇒ Object



22
23
24
25
# File 'lib/sidekiq/hierarchy/workflow.rb', line 22

def ==(other_workflow)
  other_workflow.instance_of?(self.class) &&
    self.jid == other_workflow.jid
end

#complete?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/sidekiq/hierarchy/workflow.rb', line 79

def complete?
  status == :complete
end

#complete_atObject

Returns the time at which all jobs were complete; nil if any jobs are still incomplete



100
101
102
# File 'lib/sidekiq/hierarchy/workflow.rb', line 100

def complete_at
  Time.at(self[Job::WORKFLOW_FINISHED_AT_FIELD].to_f) if complete?
end

#deleteObject



31
32
33
34
35
# File 'lib/sidekiq/hierarchy/workflow.rb', line 31

def delete
  wset = workflow_set  # save it for later
  root.delete  # deleting nodes is more important than a dangling reference
  wset.remove(self) if wset  # now we can clear out from the set
end

#enqueued_atObject

Calculated metrics



90
91
92
# File 'lib/sidekiq/hierarchy/workflow.rb', line 90

def enqueued_at
  root.enqueued_at
end

#failed?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/sidekiq/hierarchy/workflow.rb', line 83

def failed?
  status == :failed
end

#failed_atObject

Returns the earliest time at which a job failed; nil if none did



106
107
108
# File 'lib/sidekiq/hierarchy/workflow.rb', line 106

def failed_at
  Time.at(self[Job::WORKFLOW_FINISHED_AT_FIELD].to_f) if failed?
end

#finished_atObject



110
111
112
113
114
# File 'lib/sidekiq/hierarchy/workflow.rb', line 110

def finished_at
  if timestamp = self[Job::WORKFLOW_FINISHED_AT_FIELD]
    Time.at(timestamp.to_f)
  end
end

#run_atObject



94
95
96
# File 'lib/sidekiq/hierarchy/workflow.rb', line 94

def run_at
  root.run_at
end

#running?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/sidekiq/hierarchy/workflow.rb', line 75

def running?
  status == :running
end

#statusObject

Status



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/sidekiq/hierarchy/workflow.rb', line 43

def status
  case self[Job::WORKFLOW_STATUS_FIELD]
  when Job::STATUS_RUNNING
    :running
  when Job::STATUS_COMPLETE
    :complete
  when Job::STATUS_FAILED
    :failed
  else
    :unknown
  end
end

#to_sObject



121
122
123
# File 'lib/sidekiq/hierarchy/workflow.rb', line 121

def to_s
  Sidekiq.dump_json(self.as_json)
end

#update_status(from_job_status) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/sidekiq/hierarchy/workflow.rb', line 56

def update_status(from_job_status)
  old_status = status
  return if [:failed, :complete].include?(old_status)  # these states are final

  if [:enqueued, :running, :requeued].include?(from_job_status)
    new_status, s_val = :running, Job::STATUS_RUNNING
  elsif from_job_status == :failed
    new_status, s_val = :failed, Job::STATUS_FAILED
  elsif from_job_status == :complete && root.subtree_size == root.finished_subtree_size
    new_status, s_val = :complete, Job::STATUS_COMPLETE
  end
  return if !new_status || new_status == old_status  # don't publish null updates

  self[Job::WORKFLOW_STATUS_FIELD] = s_val
  self[Job::WORKFLOW_FINISHED_AT_FIELD] = Time.now.to_f.to_s if [:failed, :complete].include?(new_status)

  Sidekiq::Hierarchy.publish(Notifications::WORKFLOW_UPDATE, self, new_status, old_status)
end

#workflow_setObject



27
28
29
# File 'lib/sidekiq/hierarchy/workflow.rb', line 27

def workflow_set
  WorkflowSet.for_status(status)
end