Class: Rworkflow::Flow
- Inherits:
-
Object
- Object
- Rworkflow::Flow
- Defined in:
- lib/rworkflow/flow.rb
Direct Known Subclasses
Constant Summary collapse
- STATE_SUCCESSFUL =
:successful- STATE_FAILED =
:failed- STATES_TERMINAL =
[STATE_FAILED, STATE_SUCCESSFUL].freeze
- STATES_FAILED =
[STATE_FAILED].freeze
- REDIS_NS =
'flow'.freeze
- WORKFLOW_REGISTRY =
"#{REDIS_NS}:__registry".freeze
Instance Attribute Summary collapse
-
#id ⇒ Object
Returns the value of attribute id.
-
#lifecycle ⇒ Object
Returns the value of attribute lifecycle.
Class Method Summary collapse
- .all(options = {}) ⇒ Object
- .cleanup(id) ⇒ Object
- .create(lifecycle, name = '', options = {}) ⇒ Object
- .failure?(state) ⇒ Boolean
- .generate_id(workflow_name) ⇒ Object
- .get_private_workflows(options = {}) ⇒ Object
- .get_public_workflows(options = {}) ⇒ Object
- .load(id, klass = nil) ⇒ Object
- .read_flow_class(id) ⇒ Object
- .register(workflow) ⇒ Object
- .registered?(workflow) ⇒ Boolean
- .registry ⇒ Object
- .serializer ⇒ Object
- .terminal?(state) ⇒ Boolean
- .unregister(workflow) ⇒ Object
Instance Method Summary collapse
- #cleaned_up? ⇒ Boolean
- #cleanup ⇒ Object
- #count(state) ⇒ Object
- #created_at ⇒ Object
- #expected_duration ⇒ Object
- #failed? ⇒ Boolean
- #fetch(fetcher_id, state_name) ⇒ Object
- #finish_time ⇒ Object
- #finished? ⇒ Boolean
- #get(key, default = nil) ⇒ Object
- #get_counters ⇒ Object
- #get_state_cardinality(state_name) ⇒ Object
- #incr(key, value = 1) ⇒ Object
-
#initialize(id) ⇒ Flow
constructor
A new instance of Flow.
- #list_objects(state_name, limit = -1)) ⇒ Object
- #log(from_state, transition, num_objects) ⇒ Object
- #logger ⇒ Object
- #logging? ⇒ Boolean
- #logs ⇒ Object
- #metadata_string ⇒ Object
- #name ⇒ Object
- #name=(name) ⇒ Object
- #public? ⇒ Boolean
- #set(key, value) ⇒ Object
- #start(objects) ⇒ Object
- #start_time ⇒ Object
- #started? ⇒ Boolean
- #states_list ⇒ Object
- #status ⇒ Object
- #successful? ⇒ Boolean
- #terminate ⇒ Object
- #total_objects(counters = nil) ⇒ Object
- #total_objects_failed(counters = nil) ⇒ Object
- #total_objects_processed(counters = nil) ⇒ Object
- #transition(from_state, name, objects) ⇒ Object
- #valid? ⇒ Boolean
Constructor Details
#initialize(id) ⇒ Flow
Returns a new instance of Flow.
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rworkflow/flow.rb', line 14 def initialize(id) @id = id @redis_key = "#{REDIS_NS}:#{id}" @storage = RedisRds::Hash.new(@redis_key) @flow_data = RedisRds::Hash.new("#{@redis_key}__data") @processing = RedisRds::Hash.new("#{@redis_key}__processing") load_lifecycle end |
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
11 12 13 |
# File 'lib/rworkflow/flow.rb', line 11 def id @id end |
#lifecycle ⇒ Object
Returns the value of attribute lifecycle.
12 13 14 |
# File 'lib/rworkflow/flow.rb', line 12 def lifecycle @lifecycle end |
Class Method Details
.all(options = {}) ⇒ Object
394 395 396 |
# File 'lib/rworkflow/flow.rb', line 394 def all( = {}) return registry.all(.reverse_merge(parent_class: self)).map { |id| load(id) } end |
.cleanup(id) ⇒ Object
381 382 383 384 |
# File 'lib/rworkflow/flow.rb', line 381 def cleanup(id) workflow = new(id) workflow.cleanup end |
.create(lifecycle, name = '', options = {}) ⇒ Object
360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/rworkflow/flow.rb', line 360 def create(lifecycle, name = '', = {}) id = generate_id(name) workflow = new(id) workflow.name = name workflow.lifecycle = lifecycle workflow.set(:created_at, Time.now.to_i) workflow.set(:public, .fetch(:public, false)) workflow.set(:logging, .fetch(:logging, true)) register(workflow) return workflow end |
.failure?(state) ⇒ Boolean
435 436 437 |
# File 'lib/rworkflow/flow.rb', line 435 def failure?(state) return self::STATES_FAILED.include?(state) end |
.generate_id(workflow_name) ⇒ Object
374 375 376 377 378 |
# File 'lib/rworkflow/flow.rb', line 374 def generate_id(workflow_name) now = Time.now.to_f random = Random.new(now) return "#{name}__#{workflow_name}__#{(Time.now.to_f * 1000).to_i}__#{random.rand(now).to_i}" end |
.get_private_workflows(options = {}) ⇒ Object
390 391 392 |
# File 'lib/rworkflow/flow.rb', line 390 def get_private_workflows( = {}) return registry.private_flows(.reverse_merge(parent_class: self)).map { |id| load(id) } end |
.get_public_workflows(options = {}) ⇒ Object
386 387 388 |
# File 'lib/rworkflow/flow.rb', line 386 def get_public_workflows( = {}) return registry.public_flows(.reverse_merge(parent_class: self)).map { |id| load(id) } end |
.load(id, klass = nil) ⇒ Object
398 399 400 401 402 403 404 |
# File 'lib/rworkflow/flow.rb', line 398 def load(id, klass = nil) workflow = nil klass = read_flow_class(id) if klass.nil? workflow = klass.new(id) if klass.respond_to?(:new) return workflow end |
.read_flow_class(id) ⇒ Object
406 407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/rworkflow/flow.rb', line 406 def read_flow_class(id) klass = nil raw_class = id.split('__').first klass = begin raw_class.constantize rescue NameError => _ Rails.logger.warn("Unknown flow class for workflow id #{id}") nil end if !raw_class.nil? return klass end |
.register(workflow) ⇒ Object
423 424 425 |
# File 'lib/rworkflow/flow.rb', line 423 def register(workflow) registry.add(workflow) end |
.registered?(workflow) ⇒ Boolean
419 420 421 |
# File 'lib/rworkflow/flow.rb', line 419 def registered?(workflow) return registry.include?(workflow) end |
.registry ⇒ Object
439 440 441 442 443 |
# File 'lib/rworkflow/flow.rb', line 439 def registry return @registry ||= begin FlowRegistry.new(Rworkflow::VERSION.to_s) end end |
.serializer ⇒ Object
445 446 447 |
# File 'lib/rworkflow/flow.rb', line 445 def serializer YAML end |
.terminal?(state) ⇒ Boolean
431 432 433 |
# File 'lib/rworkflow/flow.rb', line 431 def terminal?(state) return self::STATES_TERMINAL.include?(state) end |
.unregister(workflow) ⇒ Object
427 428 429 |
# File 'lib/rworkflow/flow.rb', line 427 def unregister(workflow) registry.remove(workflow) end |
Instance Method Details
#cleaned_up? ⇒ Boolean
205 206 207 |
# File 'lib/rworkflow/flow.rb', line 205 def cleaned_up? return states_list.all? { |name| !get_list(name).exists? } end |
#cleanup ⇒ Object
297 298 299 300 301 302 303 304 305 |
# File 'lib/rworkflow/flow.rb', line 297 def cleanup return if Rails.env.test? states_cleanup logger.delete @processing.delete self.class.unregister(self) @flow_data.delete @storage.delete end |
#count(state) ⇒ Object
89 90 91 |
# File 'lib/rworkflow/flow.rb', line 89 def count(state) return get_list(state).size end |
#created_at ⇒ Object
57 58 59 |
# File 'lib/rworkflow/flow.rb', line 57 def created_at return @created_at ||= begin Time.at(get(:created_at, 0)) end end |
#expected_duration ⇒ Object
81 82 83 |
# File 'lib/rworkflow/flow.rb', line 81 def expected_duration return Float::INFINITY end |
#failed? ⇒ Boolean
350 351 352 353 |
# File 'lib/rworkflow/flow.rb', line 350 def failed? return false if !finished? return total_objects_failed > 0 end |
#fetch(fetcher_id, state_name) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/rworkflow/flow.rb', line 127 def fetch(fetcher_id, state_name) @processing.set(fetcher_id, 1) list = get_state_list(state_name) unless list.nil? failed = [] cardinality = @lifecycle.states[state_name].cardinality cardinality = get(:start_count).to_i if cardinality == Lifecycle::CARDINALITY_ALL_STARTED force_list_complete = @lifecycle.states[state_name].policy == State::STATE_POLICY_WAIT raw_objects = list.lpop(cardinality, force_list_complete) unless raw_objects.empty? objects = raw_objects.map do |raw_object| begin self.class.serializer.load(raw_object) rescue StandardError => _ failed << raw_object nil end end.compact @processing.set(fetcher_id, objects.size) unless failed.empty? push(failed, STATE_FAILED) Rails.logger.error("Failed to parse #{failed.size} in workflow #{@id} for fetcher id #{fetcher_id} at state #{state_name}") end yield(objects) if block_given? end end ensure @processing.remove(fetcher_id) terminate if finished? end |
#finish_time ⇒ Object
77 78 79 |
# File 'lib/rworkflow/flow.rb', line 77 def finish_time return Time.at(get(:finish_time, 0)) end |
#finished? ⇒ Boolean
41 42 43 44 45 46 47 48 |
# File 'lib/rworkflow/flow.rb', line 41 def finished? return false unless started? total = get_counters.reduce(0) do |sum, pair| self.class.terminal?(pair[0]) ? sum : (sum + pair[1].to_i) end return total == 0 end |
#get(key, default = nil) ⇒ Object
269 270 271 272 273 274 |
# File 'lib/rworkflow/flow.rb', line 269 def get(key, default = nil) value = @flow_data.get(key) value = if value.nil? then default else self.class.serializer.load(value) end return value end |
#get_counters ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/rworkflow/flow.rb', line 93 def get_counters counters = @storage.get(:counters) if !counters.nil? counters = begin self.class.serializer.load(counters) rescue => e Rails.logger.error("Error loading stored flow counters: #{e.message}") nil end end return counters || get_counters! end |
#get_state_cardinality(state_name) ⇒ Object
259 260 261 262 263 |
# File 'lib/rworkflow/flow.rb', line 259 def get_state_cardinality(state_name) cardinality = @lifecycle.states[state_name].cardinality cardinality = self.get(:start_count).to_i if cardinality == Rworkflow::Lifecycle::CARDINALITY_ALL_STARTED return cardinality end |
#incr(key, value = 1) ⇒ Object
276 277 278 |
# File 'lib/rworkflow/flow.rb', line 276 def incr(key, value = 1) return @flow_data.incrby(key, value) end |
#list_objects(state_name, limit = -1)) ⇒ Object
160 161 162 163 |
# File 'lib/rworkflow/flow.rb', line 160 def list_objects(state_name, limit = -1) list = get_list(state_name) return list.get(0, limit).map {|object| self.class.serializer.load(object)} end |
#log(from_state, transition, num_objects) ⇒ Object
235 236 237 |
# File 'lib/rworkflow/flow.rb', line 235 def log(from_state, transition, num_objects) logger.incrby("#{from_state}__#{transition}", num_objects.to_i) if logging? end |
#logger ⇒ Object
239 240 241 242 243 |
# File 'lib/rworkflow/flow.rb', line 239 def logger return @logger ||= begin RedisRds::Hash.new("#{@redis_key}__logger") end end |
#logging? ⇒ Boolean
231 232 233 |
# File 'lib/rworkflow/flow.rb', line 231 def logging? return get(:logging, false) end |
#logs ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/rworkflow/flow.rb', line 245 def logs logs = {} if valid? && logging? state_transition_counters = logger.getall state_transition_counters.each do |state_transition, counter| state, transition = state_transition.split('__') logs[state] = {} unless logs.key?(state) logs[state][transition] = counter.to_i end end return logs end |
#metadata_string ⇒ Object
201 202 203 |
# File 'lib/rworkflow/flow.rb', line 201 def return "Rworkflow: #{self.name}" end |
#name ⇒ Object
65 66 67 |
# File 'lib/rworkflow/flow.rb', line 65 def name return get(:name, @id) end |
#name=(name) ⇒ Object
69 70 71 |
# File 'lib/rworkflow/flow.rb', line 69 def name=(name) return set(:name, name) end |
#public? ⇒ Boolean
355 356 357 |
# File 'lib/rworkflow/flow.rb', line 355 def public? return @public ||= begin get(:public, false) end end |
#set(key, value) ⇒ Object
265 266 267 |
# File 'lib/rworkflow/flow.rb', line 265 def set(key, value) @flow_data.set(key, self.class.serializer.dump(value)) end |
#start(objects) ⇒ Object
313 314 315 316 317 318 319 |
# File 'lib/rworkflow/flow.rb', line 313 def start(objects) objects = Array.wrap(objects) set(:start_time, Time.now.to_i) set(:start_count, objects.size) push(objects, lifecycle.initial) log(lifecycle.initial, 'initial', objects.size) end |
#start_time ⇒ Object
73 74 75 |
# File 'lib/rworkflow/flow.rb', line 73 def start_time return Time.at(get(:start_time, 0)) end |
#started? ⇒ Boolean
61 62 63 |
# File 'lib/rworkflow/flow.rb', line 61 def started? return !get(:start_time).nil? end |
#states_list ⇒ Object
209 210 211 212 213 214 |
# File 'lib/rworkflow/flow.rb', line 209 def states_list states = self.class::STATES_TERMINAL states += @lifecycle.states.keys if valid? return states end |
#status ⇒ Object
50 51 52 53 54 55 |
# File 'lib/rworkflow/flow.rb', line 50 def status status = 'Running' status = (successful?) ? 'Finished' : 'Failed' if finished? return status end |
#successful? ⇒ Boolean
345 346 347 348 |
# File 'lib/rworkflow/flow.rb', line 345 def successful? return false if !finished? return !failed? end |
#terminate ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/rworkflow/flow.rb', line 178 def terminate mutex = RedisRds::Mutex.new(self.id) mutex.synchronize do if !self.cleaned_up? set(:finish_time, Time.now.to_i) post_process if self.public? counters = get_counters! counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done @storage.setnx(:counters, self.class.serializer.dump(counters)) states_cleanup else self.cleanup end end end end |
#total_objects(counters = nil) ⇒ Object
331 332 333 |
# File 'lib/rworkflow/flow.rb', line 331 def total_objects(counters = nil) return (counters || get_counters).reduce(0) { |sum, pair| sum + pair[1] } end |
#total_objects_failed(counters = nil) ⇒ Object
335 336 337 338 339 340 341 342 343 |
# File 'lib/rworkflow/flow.rb', line 335 def total_objects_failed(counters = nil) return (counters || get_counters).reduce(0) do |sum, pair| if self.class.failure?(pair[0]) sum + pair[1] else sum end end end |
#total_objects_processed(counters = nil) ⇒ Object
321 322 323 324 325 326 327 328 329 |
# File 'lib/rworkflow/flow.rb', line 321 def total_objects_processed(counters = nil) return (counters || get_counters).reduce(0) do |sum, pair| if self.class.terminal?(pair[0]) sum + pair[1] else sum end end end |
#transition(from_state, name, objects) ⇒ Object
216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/rworkflow/flow.rb', line 216 def transition(from_state, name, objects) objects = Array.wrap(objects) to_state = begin lifecycle.transition(from_state, name) rescue Rworkflow::StateError => e Rails.logger.error("Error transitioning: #{e}") nil end if !to_state.nil? push(objects, to_state) log(from_state, name, objects.size) end end |
#valid? ⇒ Boolean
85 86 87 |
# File 'lib/rworkflow/flow.rb', line 85 def valid? return !@lifecycle.nil? end |