Class: Conflow::Flow
- Inherits:
-
Redis::Field
- Object
- Redis::Field
- Conflow::Flow
- Includes:
- JobHandler, Redis::Findable, Redis::Identifier, Redis::Model
- Defined in:
- lib/conflow/flow.rb,
lib/conflow/flow/job_builder.rb,
lib/conflow/flow/job_handler.rb
Overview
Defined Under Namespace
Modules: JobHandler Classes: JobBuilder
Instance Attribute Summary collapse
-
#indegree ⇒ Conflow::Redis::SortedSetField
Sorted set (Redis zset) of job ids.
-
#jobs ⇒ Array<Conflow::Job>
readonly
Read-only array of jobs added to the flow.
-
#queued_jobs ⇒ Conflow::Redis::SetField
readonly
Set of jobs that are currently queued (and not yet finished).
Attributes included from Redis::Identifier
Attributes inherited from Redis::Field
Class Method Summary collapse
-
.create(*args) ⇒ Conflow::Job
Create new flow with given parameters.
Instance Method Summary collapse
-
#configure(*args) ⇒ Object
abstract
Override this method in order to contain your flow definition inside the class.
-
#finished? ⇒ Boolean
Returns whether or not the flow is finished (all jobs were processed).
-
#queue(job) ⇒ Object
abstract
Queues job to be performed.
-
#with_lock ⇒ Object
private
Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished before second is enqueued, therefore “finishing” the flow.
Methods included from JobHandler
Methods included from Redis::Findable
Methods included from Redis::Identifier
Methods included from Redis::Model
#==, #assign_attributes, #destroy!, included
Methods inherited from Redis::Field
Instance Attribute Details
#indegree ⇒ Conflow::Redis::SortedSetField
Sorted set (Redis zset) of job ids. Each job has a score attached, which is the number of “indegree” nodes - the nodes on which given job depends. This changes dynamically and score equal to 0 means that all dependencies are fulfilled.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set field :lock, :value # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap do |flow| flow.with_lock do flow.configure(*args) end end end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished # before second is enqueued, therefore "finishing" the flow. # @api private def with_lock self.lock = 1 yield self.lock = 0 queue_available_jobs end end |
#jobs ⇒ Array<Conflow::Job> (readonly)
Read-only array of jobs added to the flow.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set field :lock, :value # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap do |flow| flow.with_lock do flow.configure(*args) end end end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished # before second is enqueued, therefore "finishing" the flow. # @api private def with_lock self.lock = 1 yield self.lock = 0 queue_available_jobs end end |
#queued_jobs ⇒ Conflow::Redis::SetField (readonly)
Set of jobs that are currently queued (and not yet finished).
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set field :lock, :value # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap do |flow| flow.with_lock do flow.configure(*args) end end end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished # before second is enqueued, therefore "finishing" the flow. # @api private def with_lock self.lock = 1 yield self.lock = 0 queue_available_jobs end end |
Class Method Details
.create(*args) ⇒ Conflow::Job
Create new flow with given parameters
59 60 61 62 63 64 65 |
# File 'lib/conflow/flow.rb', line 59 def self.create(*args) new.tap do |flow| flow.with_lock do flow.configure(*args) end end end |
Instance Method Details
#configure(*args) ⇒ Object
Override this method in order to contain your flow definition inside the class. This method will be called if flow is created using create method.
78 |
# File 'lib/conflow/flow.rb', line 78 def configure(*args); end |
#finished? ⇒ Boolean
Returns whether or not the flow is finished (all jobs were processed)
69 70 71 |
# File 'lib/conflow/flow.rb', line 69 def finished? lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero? end |
#queue(job) ⇒ Object
Queues job to be performed. Both id of the flow and id of the job must be preserved in order to recreate job in worker.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set field :lock, :value # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap do |flow| flow.with_lock do flow.configure(*args) end end end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished # before second is enqueued, therefore "finishing" the flow. # @api private def with_lock self.lock = 1 yield self.lock = 0 queue_available_jobs end end |
#with_lock ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished before second is enqueued, therefore “finishing” the flow.
83 84 85 86 87 88 |
# File 'lib/conflow/flow.rb', line 83 def with_lock self.lock = 1 yield self.lock = 0 queue_available_jobs end |