Class: Conflow::Flow

Inherits:
Redis::Field show all
Includes:
JobHandler, Redis::Findable, Redis::Identifier, Redis::Model
Defined in:
lib/conflow/flow.rb,
lib/conflow/flow/job_builder.rb,
lib/conflow/flow/job_handler.rb

Overview

Flow is a set of steps needed to complete certain task. It is composed of jobs which have dependency relations with one another.

Flow class is designed to be inherited in your application. You must supply #queue method

Defined Under Namespace

Modules: JobHandler Classes: JobBuilder

Instance Attribute Summary collapse

Attributes included from Redis::Identifier

#id

Attributes inherited from Redis::Field

#key

Class Method Summary collapse

Instance Method Summary collapse

Methods included from JobHandler

#finish, #run, #start

Methods included from Redis::Findable

included, #initialize

Methods included from Redis::Identifier

included, #initialize

Methods included from Redis::Model

#==, #assign_attributes, #destroy!, included

Methods inherited from Redis::Field

#initialize

Instance Attribute Details

#indegreeConflow::Redis::SortedSetField

Sorted set (Redis zset) of job ids. Each job has a score attached, which is the number of “indegree” nodes - the nodes on which given job depends. This changes dynamically and score equal to 0 means that all dependencies are fulfilled.

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end
end

#jobsArray<Conflow::Job> (readonly)

Read-only array of jobs added to the flow.

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end
end

#queued_jobsConflow::Redis::SetField (readonly)

Set of jobs that are currently queued (and not yet finished).

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end
end

Class Method Details

.create(*args) ⇒ Conflow::Job

Create new flow with given parameters

Examples:

Simple configurable flow

class MyFlow < Conflow::Flow
  def configure(id:, strict:)
    run UpsertJob, params: { id: id }
    run CheckerJob, params: { id: id } if strict
  end
end

MyFlow.create(id: 320, strict: false)
MyFlow.create(id: 15, strict: true)

Parameters:

  • args (Array<Object>)

    any parameters that will be passed to #configure method

Returns:



58
59
60
# File 'lib/conflow/flow.rb', line 58

def self.create(*args)
  new.tap { |flow| flow.configure(*args) }
end

Instance Method Details

#configure(*args) ⇒ Object

This method is abstract.

Override this method in order to contain your flow definition inside the class. This method will be called if flow is created using create method.

Parameters:

  • args (Array<Object>)

    any arguments needed to start a flow

See Also:



73
# File 'lib/conflow/flow.rb', line 73

def configure(*args); end

#finished?Boolean

Returns whether or not the flow is finished (all jobs were processed)

Returns:

  • (Boolean)

    true if no pending jobs



64
65
66
# File 'lib/conflow/flow.rb', line 64

def finished?
  queued_jobs.size.zero? && indegree.size.zero?
end

#queue(job) ⇒ Object

This method is abstract.

Queues job to be performed. Both id of the flow and id of the job must be preserved in order to recreate job in worker.

Examples:

Queue sidekiq job

class MyBaseFlow < Conflow::Flow
  def queue(job)
    Sidekiq::Client.enqueue(FlowWorkerJob, id, job.id)
  end
end

Parameters:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @param args [Array<Object>] any parameters that will be passed to {#configure} method
  # @return [Conflow::Job] job object representing created job
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  # @param args [Array<Object>] any arguments needed to start a flow
  # @see create
  def configure(*args); end
end