Class: Rworkflow::Flow

Inherits:
Object
  • Object
show all
Defined in:
lib/rworkflow/flow.rb

Direct Known Subclasses

SidekiqFlow

Constant Summary collapse

STATE_SUCCESSFUL =
:successful
STATE_FAILED =
:failed
STATES_TERMINAL =
[STATE_FAILED, STATE_SUCCESSFUL].freeze
STATES_FAILED =
[STATE_FAILED].freeze
REDIS_NS =
'flow'.freeze
WORKFLOW_REGISTRY =
"#{REDIS_NS}:__registry".freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id) ⇒ Flow

Returns a new instance of Flow.



14
15
16
17
18
19
20
21
22
23
# File 'lib/rworkflow/flow.rb', line 14

def initialize(id)
  @id = id
  @redis_key = "#{REDIS_NS}:#{id}"

  @storage = RedisRds::Hash.new(@redis_key)
  @flow_data = RedisRds::Hash.new("#{@redis_key}__data")
  @processing = RedisRds::Hash.new("#{@redis_key}__processing")

  load_lifecycle
end

Instance Attribute Details

#idObject

Returns the value of attribute id.



11
12
13
# File 'lib/rworkflow/flow.rb', line 11

def id
  @id
end

#lifecycleObject

Returns the value of attribute lifecycle.



12
13
14
# File 'lib/rworkflow/flow.rb', line 12

def lifecycle
  @lifecycle
end

Class Method Details

.all(options = {}) ⇒ Object



394
395
396
# File 'lib/rworkflow/flow.rb', line 394

def all(options = {})
  return registry.all(options.reverse_merge(parent_class: self)).map { |id| load(id) }
end

.cleanup(id) ⇒ Object



381
382
383
384
# File 'lib/rworkflow/flow.rb', line 381

def cleanup(id)
  workflow = new(id)
  workflow.cleanup
end

.create(lifecycle, name = '', options = {}) ⇒ Object



360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/rworkflow/flow.rb', line 360

def create(lifecycle, name = '', options = {})
  id = generate_id(name)
  workflow = new(id)
  workflow.name = name
  workflow.lifecycle = lifecycle
  workflow.set(:created_at, Time.now.to_i)
  workflow.set(:public, options.fetch(:public, false))
  workflow.set(:logging, options.fetch(:logging, true))

  register(workflow)

  return workflow
end

.failure?(state) ⇒ Boolean

Returns:

  • (Boolean)


435
436
437
# File 'lib/rworkflow/flow.rb', line 435

def failure?(state)
  return self::STATES_FAILED.include?(state)
end

.generate_id(workflow_name) ⇒ Object



374
375
376
377
378
# File 'lib/rworkflow/flow.rb', line 374

def generate_id(workflow_name)
  now = Time.now.to_f
  random = Random.new(now)
  return "#{name}__#{workflow_name}__#{(Time.now.to_f * 1000).to_i}__#{random.rand(now).to_i}"
end

.get_private_workflows(options = {}) ⇒ Object



390
391
392
# File 'lib/rworkflow/flow.rb', line 390

def get_private_workflows(options = {})
  return registry.private_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) }
end

.get_public_workflows(options = {}) ⇒ Object



386
387
388
# File 'lib/rworkflow/flow.rb', line 386

def get_public_workflows(options = {})
  return registry.public_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) }
end

.load(id, klass = nil) ⇒ Object



398
399
400
401
402
403
404
# File 'lib/rworkflow/flow.rb', line 398

def load(id, klass = nil)
  workflow = nil

  klass = read_flow_class(id) if klass.nil?
  workflow = klass.new(id) if klass.respond_to?(:new)
  return workflow
end

.read_flow_class(id) ⇒ Object



406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/rworkflow/flow.rb', line 406

def read_flow_class(id)
  klass = nil
  raw_class = id.split('__').first
  klass = begin
    raw_class.constantize
  rescue NameError => _
    Rails.logger.warn("Unknown flow class for workflow id #{id}")
    nil
  end if !raw_class.nil?

  return klass
end

.register(workflow) ⇒ Object



423
424
425
# File 'lib/rworkflow/flow.rb', line 423

def register(workflow)
  registry.add(workflow)
end

.registered?(workflow) ⇒ Boolean

Returns:

  • (Boolean)


419
420
421
# File 'lib/rworkflow/flow.rb', line 419

def registered?(workflow)
  return registry.include?(workflow)
end

.registryObject



439
440
441
442
443
# File 'lib/rworkflow/flow.rb', line 439

def registry
  return @registry ||= begin
    FlowRegistry.new(Rworkflow::VERSION.to_s)
  end
end

.serializerObject



445
446
447
# File 'lib/rworkflow/flow.rb', line 445

def serializer
  YAML
end

.terminal?(state) ⇒ Boolean

Returns:

  • (Boolean)


431
432
433
# File 'lib/rworkflow/flow.rb', line 431

def terminal?(state)
  return self::STATES_TERMINAL.include?(state)
end

.unregister(workflow) ⇒ Object



427
428
429
# File 'lib/rworkflow/flow.rb', line 427

def unregister(workflow)
  registry.remove(workflow)
end

Instance Method Details

#cleaned_up?Boolean

Returns:

  • (Boolean)


205
206
207
# File 'lib/rworkflow/flow.rb', line 205

def cleaned_up?
  return states_list.all? { |name| !get_list(name).exists? }
end

#cleanupObject



297
298
299
300
301
302
303
304
305
# File 'lib/rworkflow/flow.rb', line 297

def cleanup
  return if Rails.env.test?
  states_cleanup
  logger.delete
  @processing.delete
  self.class.unregister(self)
  @flow_data.delete
  @storage.delete
end

#count(state) ⇒ Object



89
90
91
# File 'lib/rworkflow/flow.rb', line 89

def count(state)
  return get_list(state).size
end

#created_atObject



57
58
59
# File 'lib/rworkflow/flow.rb', line 57

def created_at
  return @created_at ||= begin Time.at(get(:created_at, 0)) end
end

#expected_durationObject



81
82
83
# File 'lib/rworkflow/flow.rb', line 81

def expected_duration
  return Float::INFINITY
end

#failed?Boolean

Returns:

  • (Boolean)


350
351
352
353
# File 'lib/rworkflow/flow.rb', line 350

def failed?
  return false if !finished?
  return total_objects_failed > 0
end

#fetch(fetcher_id, state_name) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/rworkflow/flow.rb', line 127

def fetch(fetcher_id, state_name)
  @processing.set(fetcher_id, 1)
  list = get_state_list(state_name)
  unless list.nil?
    failed = []
    cardinality = @lifecycle.states[state_name].cardinality
    cardinality = get(:start_count).to_i if cardinality == Lifecycle::CARDINALITY_ALL_STARTED
    force_list_complete = @lifecycle.states[state_name].policy == State::STATE_POLICY_WAIT
    raw_objects = list.lpop(cardinality, force_list_complete)
    unless raw_objects.empty?
      objects = raw_objects.map do |raw_object|
        begin
          self.class.serializer.load(raw_object)
        rescue StandardError => _
          failed << raw_object
          nil
        end
      end.compact
      @processing.set(fetcher_id, objects.size)

      unless failed.empty?
        push(failed, STATE_FAILED)
        Rails.logger.error("Failed to parse #{failed.size} in workflow #{@id} for fetcher id #{fetcher_id} at state #{state_name}")
      end

      yield(objects) if block_given?
    end
  end
ensure
  @processing.remove(fetcher_id)
  terminate if finished?
end

#finish_timeObject



77
78
79
# File 'lib/rworkflow/flow.rb', line 77

def finish_time
  return Time.at(get(:finish_time, 0))
end

#finished?Boolean

Returns:

  • (Boolean)


41
42
43
44
45
46
47
48
# File 'lib/rworkflow/flow.rb', line 41

def finished?
  return false unless started?
  total = get_counters.reduce(0) do |sum, pair|
    self.class.terminal?(pair[0]) ? sum : (sum + pair[1].to_i)
  end

  return total == 0
end

#get(key, default = nil) ⇒ Object



269
270
271
272
273
274
# File 'lib/rworkflow/flow.rb', line 269

def get(key, default = nil)
  value = @flow_data.get(key)
  value = if value.nil? then default else self.class.serializer.load(value) end

  return value
end

#get_countersObject



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rworkflow/flow.rb', line 93

def get_counters
  counters = @storage.get(:counters)
  if !counters.nil?
    counters = begin
      self.class.serializer.load(counters)
    rescue => e
      Rails.logger.error("Error loading stored flow counters: #{e.message}")
      nil
    end
  end
  return counters || get_counters!
end

#get_state_cardinality(state_name) ⇒ Object



259
260
261
262
263
# File 'lib/rworkflow/flow.rb', line 259

def get_state_cardinality(state_name)
  cardinality = @lifecycle.states[state_name].cardinality
  cardinality = self.get(:start_count).to_i if cardinality == Rworkflow::Lifecycle::CARDINALITY_ALL_STARTED
  return cardinality
end

#incr(key, value = 1) ⇒ Object



276
277
278
# File 'lib/rworkflow/flow.rb', line 276

def incr(key, value = 1)
  return @flow_data.incrby(key, value)
end

#list_objects(state_name, limit = -1)) ⇒ Object



160
161
162
163
# File 'lib/rworkflow/flow.rb', line 160

def list_objects(state_name, limit = -1)
  list = get_list(state_name)
  return list.get(0, limit).map {|object| self.class.serializer.load(object)}
end

#log(from_state, transition, num_objects) ⇒ Object



235
236
237
# File 'lib/rworkflow/flow.rb', line 235

def log(from_state, transition, num_objects)
  logger.incrby("#{from_state}__#{transition}", num_objects.to_i) if logging?
end

#loggerObject



239
240
241
242
243
# File 'lib/rworkflow/flow.rb', line 239

def logger
  return @logger ||= begin
    RedisRds::Hash.new("#{@redis_key}__logger")
  end
end

#logging?Boolean

Returns:

  • (Boolean)


231
232
233
# File 'lib/rworkflow/flow.rb', line 231

def logging?
  return get(:logging, false)
end

#logsObject



245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/rworkflow/flow.rb', line 245

def logs
  logs = {}
  if valid? && logging?
    state_transition_counters = logger.getall
    state_transition_counters.each do |state_transition, counter|
      state, transition = state_transition.split('__')
      logs[state] = {} unless logs.key?(state)
      logs[state][transition] = counter.to_i
    end
  end

  return logs
end

#metadata_stringObject



201
202
203
# File 'lib/rworkflow/flow.rb', line 201

def 
  return "Rworkflow: #{self.name}"
end

#nameObject



65
66
67
# File 'lib/rworkflow/flow.rb', line 65

def name
  return get(:name, @id)
end

#name=(name) ⇒ Object



69
70
71
# File 'lib/rworkflow/flow.rb', line 69

def name=(name)
  return set(:name, name)
end

#public?Boolean

Returns:

  • (Boolean)


355
356
357
# File 'lib/rworkflow/flow.rb', line 355

def public?
  return @public ||= begin get(:public, false) end
end

#set(key, value) ⇒ Object



265
266
267
# File 'lib/rworkflow/flow.rb', line 265

def set(key, value)
  @flow_data.set(key, self.class.serializer.dump(value))
end

#start(objects) ⇒ Object



313
314
315
316
317
318
319
# File 'lib/rworkflow/flow.rb', line 313

def start(objects)
  objects = Array.wrap(objects)
  set(:start_time, Time.now.to_i)
  set(:start_count, objects.size)
  push(objects, lifecycle.initial)
  log(lifecycle.initial, 'initial', objects.size)
end

#start_timeObject



73
74
75
# File 'lib/rworkflow/flow.rb', line 73

def start_time
  return Time.at(get(:start_time, 0))
end

#started?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/rworkflow/flow.rb', line 61

def started?
  return !get(:start_time).nil?
end

#states_listObject



209
210
211
212
213
214
# File 'lib/rworkflow/flow.rb', line 209

def states_list
  states = self.class::STATES_TERMINAL
  states += @lifecycle.states.keys if valid?

  return states
end

#statusObject



50
51
52
53
54
55
# File 'lib/rworkflow/flow.rb', line 50

def status
  status = 'Running'
  status = (successful?) ? 'Finished' : 'Failed' if finished?

  return status
end

#successful?Boolean

Returns:

  • (Boolean)


345
346
347
348
# File 'lib/rworkflow/flow.rb', line 345

def successful?
  return false if !finished?
  return !failed?
end

#terminateObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/rworkflow/flow.rb', line 178

def terminate
  mutex = RedisRds::Mutex.new(self.id)
  mutex.synchronize do
    if !self.cleaned_up?
      set(:finish_time, Time.now.to_i)
      post_process

      if self.public?
        counters = get_counters!
        counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done
        @storage.setnx(:counters, self.class.serializer.dump(counters))
        states_cleanup
      else
        self.cleanup
      end
    end
  end
end

#total_objects(counters = nil) ⇒ Object



331
332
333
# File 'lib/rworkflow/flow.rb', line 331

def total_objects(counters = nil)
  return (counters || get_counters).reduce(0) { |sum, pair| sum + pair[1] }
end

#total_objects_failed(counters = nil) ⇒ Object



335
336
337
338
339
340
341
342
343
# File 'lib/rworkflow/flow.rb', line 335

def total_objects_failed(counters = nil)
  return (counters || get_counters).reduce(0) do |sum, pair|
    if self.class.failure?(pair[0])
      sum + pair[1]
    else
      sum
    end
  end
end

#total_objects_processed(counters = nil) ⇒ Object



321
322
323
324
325
326
327
328
329
# File 'lib/rworkflow/flow.rb', line 321

def total_objects_processed(counters = nil)
  return (counters || get_counters).reduce(0) do |sum, pair|
    if self.class.terminal?(pair[0])
      sum + pair[1]
    else
      sum
    end
  end
end

#transition(from_state, name, objects) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/rworkflow/flow.rb', line 216

def transition(from_state, name, objects)
  objects = Array.wrap(objects)
  to_state = begin
    lifecycle.transition(from_state, name)
  rescue Rworkflow::StateError => e
    Rails.logger.error("Error transitioning: #{e}")
    nil
  end

  if !to_state.nil?
    push(objects, to_state)
    log(from_state, name, objects.size)
  end
end

#valid?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/rworkflow/flow.rb', line 85

def valid?
  return !@lifecycle.nil?
end