Class: Conflow::Flow
- Inherits:
-
Redis::Field
- Object
- Redis::Field
- Conflow::Flow
- Includes:
- JobHandler, Redis::Findable, Redis::Identifier, Redis::Model
- Defined in:
- lib/conflow/flow.rb,
lib/conflow/flow/job_builder.rb,
lib/conflow/flow/job_handler.rb
Overview
Defined Under Namespace
Modules: JobHandler Classes: JobBuilder
Instance Attribute Summary collapse
-
#indegree ⇒ Conflow::Redis::SortedSetField
Sorted set (Redis zset) of job ids.
-
#jobs ⇒ Array<Conflow::Job>
readonly
Read-only array of jobs added to the flow.
-
#queued_jobs ⇒ Conflow::Redis::SetField
readonly
Set of jobs that are currently queued (and not yet finished).
Attributes included from Redis::Identifier
Attributes inherited from Redis::Field
Class Method Summary collapse
-
.create(*args) ⇒ Conflow::Job
Create new flow with given parameters.
Instance Method Summary collapse
-
#configure(*args) ⇒ Object
abstract
Override this method in order to contain your flow definition inside the class.
-
#finished? ⇒ Boolean
Returns whether or not the flow is finished (all jobs were processed).
-
#queue(job) ⇒ Object
abstract
Queues job to be performed.
Methods included from JobHandler
Methods included from Redis::Findable
Methods included from Redis::Identifier
Methods included from Redis::Model
#==, #assign_attributes, #destroy!, included
Methods inherited from Redis::Field
Instance Attribute Details
#indegree ⇒ Conflow::Redis::SortedSetField
Sorted set (Redis zset) of job ids. Each job has a score attached, which is the number of “indegree” nodes - the nodes on which given job depends. This changes dynamically and score equal to 0 means that all dependencies are fulfilled.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end end |
#jobs ⇒ Array<Conflow::Job> (readonly)
Read-only array of jobs added to the flow.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end end |
#queued_jobs ⇒ Conflow::Redis::SetField (readonly)
Set of jobs that are currently queued (and not yet finished).
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end end |
Class Method Details
.create(*args) ⇒ Conflow::Job
Create new flow with given parameters
58 59 60 |
# File 'lib/conflow/flow.rb', line 58 def self.create(*args) new.tap { |flow| flow.configure(*args) } end |
Instance Method Details
#configure(*args) ⇒ Object
Override this method in order to contain your flow definition inside the class. This method will be called if flow is created using create method.
73 |
# File 'lib/conflow/flow.rb', line 73 def configure(*args); end |
#finished? ⇒ Boolean
Returns whether or not the flow is finished (all jobs were processed)
64 65 66 |
# File 'lib/conflow/flow.rb', line 64 def finished? queued_jobs.size.zero? && indegree.size.zero? end |
#queue(job) ⇒ Object
Queues job to be performed. Both id of the flow and id of the job must be preserved in order to recreate job in worker.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @param args [Array<Object>] any parameters that will be passed to {#configure} method # @return [Conflow::Job] job object representing created job # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. # @param args [Array<Object>] any arguments needed to start a flow # @see create def configure(*args); end end |