Class: Conflow::Flow

Inherits:
Redis::Field show all
Includes:
JobHandler, Redis::Findable, Redis::Identifier, Redis::Model
Defined in:
lib/conflow/flow.rb,
lib/conflow/flow/job_handler.rb

Overview

Flow is a set of steps needed to complete certain task. It is composed of jobs which have dependency relations with one another.

Flow class is designed to be inherited in your application. You must supply #queue method

Defined Under Namespace

Modules: JobHandler

Instance Attribute Summary collapse

Attributes included from Redis::Identifier

#id

Attributes inherited from Redis::Field

#key

Class Method Summary collapse

Instance Method Summary collapse

Methods included from JobHandler

#finish, #run

Methods included from Redis::Findable

included, #initialize

Methods included from Redis::Identifier

included, #initialize

Methods included from Redis::Model

#==, #destroy!, included

Methods inherited from Redis::Field

#initialize

Instance Attribute Details

#indegreeConflow::Redis::SortedSetField

Sorted set (Redis zset) of job ids. Each job has a score attached, which is the number of “indegree” nodes - the nodes on which given job depends. This changes dynamically and score equal to 0 means that all dependencies are fulfilled.

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  def configure(*args); end
end

#jobsArray<Conflow::Job> (readonly)

Read-only array of jobs added to the flow.

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  def configure(*args); end
end

#queued_jobsConflow::Redis::SetField (readonly)

Set of jobs that are currently queued (and not yet finished).

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  def configure(*args); end
end

Class Method Details

.create(*args) ⇒ Object

Create new flow with given parameters

Examples:

Simple configurable flow

class MyFlow < Conflow::Flow
  def configure(id:, strict:)
    run UpsertJob, params: { id: id }
    run CheckerJob, params: { id: id } if strict
  end
end

MyFlow.create(id: 320, strict: false)
MyFlow.create(id: 15, strict: true)


56
57
58
# File 'lib/conflow/flow.rb', line 56

def self.create(*args)
  new.tap { |flow| flow.configure(*args) }
end

Instance Method Details

#configure(*args) ⇒ Object

This method is abstract.

Override this method in order to contain your flow definition inside the class. This method will be called if flow is created using create method.



69
# File 'lib/conflow/flow.rb', line 69

def configure(*args); end

#finished?Boolean

Returns whether or not the flow is finished (all jobs were processed)

Returns:

  • (Boolean)

    true if no pending jobs



62
63
64
# File 'lib/conflow/flow.rb', line 62

def finished?
  queued_jobs.size.zero? && indegree.size.zero?
end

#queue(job) ⇒ Object

This method is abstract.

Queues job to be performed. Both id of the flow and id of the job must be preserved in order to recreate job in worker.

Examples:

Queue sidekiq job

class MyBaseFlow < Conflow::Flow
  def queue(job)
    Sidekiq::Client.enqueue(FlowWorkerJob, id, job.id)
  end
end

Parameters:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/conflow/flow.rb', line 35

class Flow < Conflow::Redis::Field
  include Conflow::Redis::Model
  include Conflow::Redis::Identifier
  include Conflow::Redis::Findable
  include JobHandler

  has_many :jobs, Conflow::Job
  field :queued_jobs, :set
  field :indegree,    :sorted_set

  # Create new flow with given parameters
  # @example Simple configurable flow
  #   class MyFlow < Conflow::Flow
  #     def configure(id:, strict:)
  #       run UpsertJob, params: { id: id }
  #       run CheckerJob, params: { id: id } if strict
  #     end
  #   end
  #
  #   MyFlow.create(id: 320, strict: false)
  #   MyFlow.create(id: 15, strict: true)
  def self.create(*args)
    new.tap { |flow| flow.configure(*args) }
  end

  # Returns whether or not the flow is finished (all jobs were processed)
  # @return [Boolean] true if no pending jobs
  def finished?
    queued_jobs.size.zero? && indegree.size.zero?
  end

  # @abstract
  # Override this method in order to contain your flow definition inside the class.
  # This method will be called if flow is created using {.create} method.
  def configure(*args); end
end