Class: Rworkflow::SidekiqFlow

Inherits:
Flow
  • Object
show all
Defined in:
lib/rworkflow/sidekiq_flow.rb

Constant Summary collapse

STATE_POLICY_GATED =
:gated
MAX_EXPECTED_DURATION =
4.hours
PRIORITIES =
[:critical, :high, nil, :low]

Constants inherited from Flow

Flow::REDIS_NS, Flow::STATES_FAILED, Flow::STATES_TERMINAL, Flow::STATE_FAILED, Flow::STATE_SUCCESSFUL, Flow::WORKFLOW_REGISTRY

Instance Attribute Summary

Attributes inherited from Flow

#id, #lifecycle

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Flow

all, #cleaned_up?, cleanup, #count, #created_at, #failed?, failure?, #fetch, #finish_time, #finished?, generate_id, #get, #get_counters, get_private_workflows, get_public_workflows, #get_state_cardinality, #incr, #list_objects, load, #log, #logger, #logging?, #logs, #metadata_string, #name, #name=, #public?, read_flow_class, register, registered?, registry, serializer, #set, #start, #start_time, #started?, #states_list, #successful?, terminal?, #terminate, #total_objects, #total_objects_failed, #total_objects_processed, #transition, unregister, #valid?

Constructor Details

#initialize(id) ⇒ SidekiqFlow

Returns a new instance of SidekiqFlow.



8
9
10
11
# File 'lib/rworkflow/sidekiq_flow.rb', line 8

def initialize(id)
  super(id)
  @open_gates = RedisRds::Set.new("#{@redis_key}__open_gates")
end

Class Method Details

.build_flow_mapObject



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/rworkflow/sidekiq_flow.rb', line 148

def build_flow_map
  flow_map = {}
  queues = SidekiqHelper.get_queue_sizes.keys
  queues.each do |queue_name|
    queue = Sidekiq::Queue.new(queue_name)
    queue.each do |job|
      klass = begin
        job.klass.constantize
      rescue NameError => _
        nil
      end

      if !klass.nil? && klass <= Rworkflow::Worker
        id = job.args.first
        state_name = job.args.second
        state_map = flow_map.fetch(id, {})
        state_map[state_name] = state_map.fetch(state_name, 0) + 1
        flow_map[id] = state_map
      end
    end
  end
  return flow_map
end

.cleanup_broken_flowsObject



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rworkflow/sidekiq_flow.rb', line 117

def cleanup_broken_flows
  broken = []
  flows = self.all
  flows.each do |flow|
    if flow.valid?
      if flow.finished? && !flow.public?
        broken << [flow, 'finished']
      elsif !flow.started? && flow.created_at < 1.day.ago
        broken << [flow, 'never started']
      end
    else
      broken << [flow, 'invalid']
    end
  end

  broken.each do |flow_pair|
    flow_pair.first.cleanup
    puts "Cleaned up #{flow_pair.second} flow #{flow_pair.first.id}"
  end
  puts ">>> Cleaned up #{broken.size} broken flows <<<"
end

.create(lifecycle, name = '', options) ⇒ Object



106
107
108
109
110
111
# File 'lib/rworkflow/sidekiq_flow.rb', line 106

def create(lifecycle, name = '', options)
  workflow = super(lifecycle, name, options)
  workflow.set(:priority, options[:priority]) unless options[:priority].nil?

  return workflow
end

.create_missing_jobs(flow, state_map) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/rworkflow/sidekiq_flow.rb', line 172

def create_missing_jobs(flow, state_map)
  counters = flow.get_counters
  counters.each do |state, num_objects|
    next if flow.class.terminal?(state) || state == :processing
    enqueued = state_map.fetch(state, 0) * flow.get_state_cardinality(state)
    missing = num_objects - enqueued
    if missing > 0
      flow.create_jobs(state, missing)
      puts "Created #{missing} missing jobs for state #{state} in flow #{flow.id}"
    end
  end
end

.enqueue_missing_jobsObject



139
140
141
142
143
144
145
146
# File 'lib/rworkflow/sidekiq_flow.rb', line 139

def enqueue_missing_jobs
  queued_flow_map = build_flow_map
  running_flows = self.all.select { |f| f.valid? && !f.finished? && !f.paused? }
  running_flows.each do |flow|
    state_map = queued_flow_map.fetch(flow.id, {})
    create_missing_jobs(flow, state_map)
  end
end

.get_manual_priorityObject



113
114
115
# File 'lib/rworkflow/sidekiq_flow.rb', line 113

def get_manual_priority
  return :high
end

Instance Method Details

#cleanupObject



13
14
15
16
# File 'lib/rworkflow/sidekiq_flow.rb', line 13

def cleanup
  super()
  @open_gates.delete()
end

#close_gate(state_name) ⇒ Object



101
102
103
# File 'lib/rworkflow/sidekiq_flow.rb', line 101

def close_gate(state_name)
  @open_gates.remove(state_name)
end

#continueObject

for now assumes



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rworkflow/sidekiq_flow.rb', line 46

def continue
  return if self.finished? || !self.valid? || !self.paused?
  if @flow_data.decr(:paused) == 0
    workers = Hash[get_counters.select { |name, _| !self.class.terminal?(name) && name != :processing }]

    # enqueue jobs
    workers.each { |worker, num_objects| create_jobs(worker, num_objects) }
  end
rescue StandardError => e
  Rails.logger.error("Error continuing flow #{self.id}: #{e.message}")
end

#create_jobs(state_name, num_objects) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rworkflow/sidekiq_flow.rb', line 58

def create_jobs(state_name, num_objects)
  return if paused? || num_objects < 1 || self.class.terminal?(state_name) || gated?(state_name)
  state = @lifecycle.states[state_name]
  worker_class = if state.respond_to?(:worker_class)
    state.worker_class
  else
    begin
      state_name.constantize
    rescue NameError => _
      Rails.logger.error("Trying to push to a non existent worker class #{state_name} in workflow #{@id}")
      nil
    end
  end

  if !worker_class.nil?
    cardinality = get_state_cardinality(state_name)

    if state.policy == State::STATE_POLICY_WAIT
      amount = ((num_objects + get_state_list(state_name).size) / cardinality.to_f).floor
    else
      amount = (num_objects / cardinality.to_f).ceil
    end

    state_priority = self.priority || state.priority
    amount.times { worker_class.enqueue_job_with_priority(state_priority, @id, state_name) }
  end
end

#expected_durationObject



26
27
28
# File 'lib/rworkflow/sidekiq_flow.rb', line 26

def expected_duration
  return MAX_EXPECTED_DURATION
end

#gated?(state_name) ⇒ Boolean

Returns:

  • (Boolean)


90
91
92
93
# File 'lib/rworkflow/sidekiq_flow.rb', line 90

def gated?(state_name)
  state = @lifecycle.states[state_name]
  return state.policy == STATE_POLICY_GATED && !@open_gates.include?(state_name)
end

#open_gate(state_name) ⇒ Object



95
96
97
98
99
# File 'lib/rworkflow/sidekiq_flow.rb', line 95

def open_gate(state_name)
  @open_gates.add(state_name)
  num_objects = count(state_name)
  create_jobs(state_name, num_objects)
end

#pauseObject



38
39
40
41
42
43
# File 'lib/rworkflow/sidekiq_flow.rb', line 38

def pause
  return if self.finished?
  @flow_data.incr(:paused)
rescue StandardError => e
  Rails.logger.error("Error pausing flow #{self.id}: #{e.message}")
end

#paused?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/rworkflow/sidekiq_flow.rb', line 30

def paused?
  return @flow_data.get(:paused).to_i > 0
end

#priorityObject



86
87
88
# File 'lib/rworkflow/sidekiq_flow.rb', line 86

def priority
  return @priority ||= begin self.get(:priority) end
end

#push(objects, name) ⇒ Object



18
19
20
21
22
23
24
# File 'lib/rworkflow/sidekiq_flow.rb', line 18

def push(objects, name)
  pushed = 0
  pushed = super(objects, name)
ensure
  create_jobs(name, pushed) if pushed > 0
  return pushed
end

#statusObject



34
35
36
# File 'lib/rworkflow/sidekiq_flow.rb', line 34

def status
  return (paused?) ? 'Paused' : super()
end