Class: Conflow::Flow

Inherits:
Redis::Field show all
Includes:
JobHandler, Redis::Findable, Redis::Identifier, Redis::Model
Defined in:
lib/conflow/flow.rb,
lib/conflow/flow/job_builder.rb,
lib/conflow/flow/job_handler.rb

Overview

Flow is a set of steps needed to complete certain task. It is composed of jobs which have dependency relations with one another.

Flow class is designed to be inherited in your application. You must supply #queue method

Defined Under Namespace

Modules: JobHandler Classes: JobBuilder

Instance Attribute Summary collapse

Attributes included from Redis::Identifier

#id

Attributes inherited from Redis::Field

#key

Class Method Summary collapse

Instance Method Summary collapse

Methods included from JobHandler

#finish, #run, #start

Methods included from Redis::Findable

included, #initialize

Methods included from Redis::Identifier

included, #initialize

Methods included from Redis::Model

#==, #assign_attributes, #destroy!, included

Methods inherited from Redis::Field

#initialize

Instance Attribute Details

#indegreeConflow::Redis::SortedSetField

Sorted set (Redis zset) of job ids. Each job has a score attached, which is the number of “indegree” nodes - the nodes on which given job depends. This changes dynamically and score equal to 0 means that all dependencies are fulfilled.

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set
  field :lock,        :value

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap do |flow|
      flow.with_lock do
        flow.configure(*args)
      end
    end
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end

  # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished
  # before second is enqueued, therefore "finishing" the flow.
  # @api private
  def with_lock
    self.lock = 1
    yield
    self.lock = 0
    queue_available_jobs
  end
end

#jobsArray<Conflow::Job> (readonly)

Read-only array of jobs added to the flow.

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set
  field :lock,        :value

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap do |flow|
      flow.with_lock do
        flow.configure(*args)
      end
    end
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end

  # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished
  # before second is enqueued, therefore "finishing" the flow.
  # @api private
  def with_lock
    self.lock = 1
    yield
    self.lock = 0
    queue_available_jobs
  end
end

#queued_jobsConflow::Redis::SetField (readonly)

Set of jobs that are currently queued (and not yet finished).

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set
  field :lock,        :value

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap do |flow|
      flow.with_lock do
        flow.configure(*args)
      end
    end
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end

  # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished
  # before second is enqueued, therefore "finishing" the flow.
  # @api private
  def with_lock
    self.lock = 1
    yield
    self.lock = 0
    queue_available_jobs
  end
end

Class Method Details

.create(*args) ⇒ Conflow::Job

Create new flow with given parameters

Examples:

Simple configurable flow

class MyFlow < Conflow::Flow
  def configure(id:, strict:)
    run UpsertJob, params: { id: id }
    run CheckerJob, params: { id: id } if strict
  end
end

MyFlow.create(id: 320, strict: false)
MyFlow.create(id: 15, strict: true)

Parameters:

  • args (Array<Object>)

    any parameters that will be passed to #configure method

Returns:



59
60
61
62
63
64
65
# File 'lib/conflow/flow.rb', line 59

def self.create(*args)
  new.tap do |flow|
    flow.with_lock do
      flow.configure(*args)
    end
  end
end

Instance Method Details

#configure(*args) ⇒ Object

This method is abstract.

Override this method in order to contain your flow definition inside the class. This method will be called if flow is created using create method.

Parameters:

  • args (Array<Object>)

    any arguments needed to start a flow

See Also:



78
# File 'lib/conflow/flow.rb', line 78

def configure(*args); end

#finished?Boolean

Returns whether or not the flow is finished (all jobs were processed)

Returns:

  • (Boolean)

    true if no pending jobs



69
70
71
# File 'lib/conflow/flow.rb', line 69

def finished?
  lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero?
end

#queue(job) ⇒ Object

This method is abstract.

Queues job to be performed. Both id of the flow and id of the job must be preserved in order to recreate job in worker.

Examples:

Queue sidekiq job

class MyBaseFlow < Conflow::Flow
  def queue(job)
    Sidekiq::Client.enqueue(FlowWorkerJob, id, job.id)
  end
end

Parameters:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set
  field :lock,        :value

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap do |flow|
      flow.with_lock do
        flow.configure(*args)
      end
    end
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    lock.value != 1 && queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end

  # Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished
  # before second is enqueued, therefore "finishing" the flow.
  # @api private
  def with_lock
    self.lock = 1
    yield
    self.lock = 0
    queue_available_jobs
  end
end

#with_lockObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Lock prevents flow from enqueuing jobs during configuration - it could happen that first job is finished before second is enqueued, therefore “finishing” the flow.



83
84
85
86
87
88
# File 'lib/conflow/flow.rb', line 83

def with_lock
  self.lock = 1
  yield
  self.lock = 0
  queue_available_jobs
end