Class: Conflow::Flow
- Inherits:
-
Redis::Field
- Object
- Redis::Field
- Conflow::Flow
- Includes:
- JobHandler, Redis::Findable, Redis::Identifier, Redis::Model
- Defined in:
- lib/conflow/flow.rb,
lib/conflow/flow/job_handler.rb
Overview
Defined Under Namespace
Modules: JobHandler
Instance Attribute Summary collapse
-
#indegree ⇒ Conflow::Redis::SortedSetField
Sorted set (Redis zset) of job ids.
-
#jobs ⇒ Array<Conflow::Job>
readonly
Read-only array of jobs added to the flow.
-
#queued_jobs ⇒ Conflow::Redis::SetField
readonly
Set of jobs that are currently queued (and not yet finished).
Attributes included from Redis::Identifier
Attributes inherited from Redis::Field
Class Method Summary collapse
-
.create(*args) ⇒ Object
Create new flow with given parameters.
Instance Method Summary collapse
-
#configure(*args) ⇒ Object
abstract
Override this method in order to contain your flow definition inside the class.
-
#finished? ⇒ Boolean
Returns whether or not the flow is finished (all jobs were processed).
-
#queue(job) ⇒ Object
abstract
Queues job to be performed.
Methods included from JobHandler
Methods included from Redis::Findable
Methods included from Redis::Identifier
Methods included from Redis::Model
Methods inherited from Redis::Field
Instance Attribute Details
#indegree ⇒ Conflow::Redis::SortedSetField
Sorted set (Redis zset) of job ids. Each job has a score attached, which is the number of “indegree” nodes - the nodes on which given job depends. This changes dynamically and score equal to 0 means that all dependencies are fulfilled.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. def configure(*args); end end |
#jobs ⇒ Array<Conflow::Job> (readonly)
Read-only array of jobs added to the flow.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. def configure(*args); end end |
#queued_jobs ⇒ Conflow::Redis::SetField (readonly)
Set of jobs that are currently queued (and not yet finished).
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. def configure(*args); end end |
Class Method Details
.create(*args) ⇒ Object
Create new flow with given parameters
56 57 58 |
# File 'lib/conflow/flow.rb', line 56 def self.create(*args) new.tap { |flow| flow.configure(*args) } end |
Instance Method Details
#configure(*args) ⇒ Object
Override this method in order to contain your flow definition inside the class. This method will be called if flow is created using create method.
69 |
# File 'lib/conflow/flow.rb', line 69 def configure(*args); end |
#finished? ⇒ Boolean
Returns whether or not the flow is finished (all jobs were processed)
62 63 64 |
# File 'lib/conflow/flow.rb', line 62 def finished? queued_jobs.size.zero? && indegree.size.zero? end |
#queue(job) ⇒ Object
Queues job to be performed. Both id of the flow and id of the job must be preserved in order to recreate job in worker.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/conflow/flow.rb', line 35 class Flow < Conflow::Redis::Field include Conflow::Redis::Model include Conflow::Redis::Identifier include Conflow::Redis::Findable include JobHandler has_many :jobs, Conflow::Job field :queued_jobs, :set field :indegree, :sorted_set # Create new flow with given parameters # @example Simple configurable flow # class MyFlow < Conflow::Flow # def configure(id:, strict:) # run UpsertJob, params: { id: id } # run CheckerJob, params: { id: id } if strict # end # end # # MyFlow.create(id: 320, strict: false) # MyFlow.create(id: 15, strict: true) def self.create(*args) new.tap { |flow| flow.configure(*args) } end # Returns whether or not the flow is finished (all jobs were processed) # @return [Boolean] true if no pending jobs def finished? queued_jobs.size.zero? && indegree.size.zero? end # @abstract # Override this method in order to contain your flow definition inside the class. # This method will be called if flow is created using {.create} method. def configure(*args); end end |