Module: Plines::Step

Defined in:
lib/plines/step.rb

Overview

This is the module that should be included in any class that is intended to be a Plines step.

Defined Under Namespace

Modules: InstanceMethods Classes: DependencyData, NotDeclaredInPipelineError, QlessJobOptions

Constant Summary collapse

DEFAULT_DEPENDENCY_FILTER =
Proc.new { true }

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(klass) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/plines/step.rb', line 33

def self.extended(klass)
  klass.class_eval do
    include InstanceMethods

    unless pipeline.is_a?(Plines::Pipeline)
      raise NotDeclaredInPipelineError,
        "#{klass} is not nested in a pipeline module and thus " +
        "cannot be a Plines::Step. All plines steps must be " +
        "declared within pipeline modules."
    end

    fan_out { |d| [d] } # default to one step instance
    pipeline.step_classes << self
  end
end

Instance Method Details

#depended_on_by_all_stepsObject



57
58
59
# File 'lib/plines/step.rb', line 57

def depended_on_by_all_steps
  pipeline.initial_step = self
end

#dependencies_for(job, batch_data) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/plines/step.rb', line 111

def dependencies_for(job, batch_data)
  Enumerator.new do |yielder|
    has_dependencies = false

    each_declared_dependency_job_for(job, batch_data) do |job|
      has_dependencies = true
      yielder.yield job
    end

    each_initial_step_job_for(job, batch_data) do |job|
      yielder.yield job
    end unless has_dependencies
  end
end

#depends_on(*klasses, &block) ⇒ Object



51
52
53
54
55
# File 'lib/plines/step.rb', line 51

def depends_on(*klasses, &block)
  klasses.each do |klass|
    dependency_filters[klass] = (block || DEFAULT_DEPENDENCY_FILTER)
  end
end

#depends_on_all_stepsObject



61
62
63
# File 'lib/plines/step.rb', line 61

def depends_on_all_steps
  pipeline.terminal_step = self
end

#enqueue_qless_job(qless, data, options = {}) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/plines/step.rb', line 163

def enqueue_qless_job(qless, data, options = {})
  queue_name = if has_external_dependencies_for?(data)
    Pipeline::AWAITING_EXTERNAL_DEPENDENCY_QUEUE
  elsif options[:queue] && processing_queue == :plines
    options[:queue]
  else
    processing_queue
  end

  queue = qless.queues[queue_name]

  options[:priority] = qless_options.priority if qless_options.priority
  options[:retries] = qless_options.retries if qless_options.retries
  options[:tags] = Array(options[:tags]) | qless_options.tags

  queue.put(self, data, options)
end

#external_dependencies_for(data) ⇒ Object



101
102
103
104
105
106
107
108
109
# File 'lib/plines/step.rb', line 101

def external_dependencies_for(data)
  list = ExternalDependencyList.new

  external_dependency_definitions.each do |block|
    block.call(list, data)
  end

  list.to_a
end

#external_dependency_definitionsObject



153
154
155
# File 'lib/plines/step.rb', line 153

def external_dependency_definitions
  @external_dependency_definitions ||= []
end

#fan_out(&block) ⇒ Object



75
76
77
78
# File 'lib/plines/step.rb', line 75

def fan_out(&block)
  @fan_out_blocks ||= []
  @fan_out_blocks << block
end

#has_external_dependencies(*args, &block) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/plines/step.rb', line 80

def has_external_dependencies(*args, &block)
  block ||= begin
    options = args.last.is_a?(Hash) ? args.pop : {}
    lambda do |deps, _|
      args.each do |name|
        deps.add name, options
      end
    end
  end

  external_dependency_definitions << block
end

#has_external_dependencies_for?(data) ⇒ Boolean

Returns:

  • (Boolean)


93
94
95
96
97
98
99
# File 'lib/plines/step.rb', line 93

def has_external_dependencies_for?(data)
  external_dependency_definitions.any? do |block|
    list = ExternalDependencyList.new
    block.call(list, data)
    list.any?
  end
end

#jobs_for(batch_data) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/plines/step.rb', line 126

def jobs_for(batch_data)
  @fan_out_blocks.inject([batch_data]) do |job_data_hashes, fan_out_block|
    job_data_hashes.flat_map { |job_data| fan_out_block.call(job_data) }
  end.map do |job_data|
    Job.build(self, job_data)
  end
end

#perform(qless_job) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/plines/step.rb', line 134

def perform(qless_job)
  batch = JobBatch.find(qless_job.client, pipeline,
                        qless_job.data.fetch("_job_batch_id"))

  if batch.creation_in_progress?
    qless_job.move(qless_job.queue_name, delay: 2)
    return
  end

  job_data = DynamicStruct.new(qless_job.data)

  qless_job.after_complete do
    batch.mark_job_as_complete(qless_job.jid)
  end

  new(batch, job_data, qless_job)
    .send(:around_perform)
end

#pipelineObject



185
186
187
188
189
190
191
# File 'lib/plines/step.rb', line 185

def pipeline
  @pipeline ||= begin
    namespaces = name.split('::')
    namespaces.pop # ignore the last one
    namespaces.inject(Object) { |ns, mod| ns.const_get(mod) }
  end
end

#processing_queueObject



181
182
183
# File 'lib/plines/step.rb', line 181

def processing_queue
  qless_options.queue
end

#qless_options {|@qless_options| ... } ⇒ Object

Yields:



157
158
159
160
161
# File 'lib/plines/step.rb', line 157

def qless_options
  @qless_options ||= QlessJobOptions.new
  yield @qless_options if block_given?
  @qless_options
end

#run_jobs_in_serialObject



65
66
67
68
69
70
71
72
73
# File 'lib/plines/step.rb', line 65

def run_jobs_in_serial
  depends_on step_name do |data|
    prior_data = data.my_data_hashes.each_cons(2) do |(prior, current)|
      break prior if current == data.my_data
    end

    data.their_data == prior_data
  end
end

#step_nameObject



193
194
195
# File 'lib/plines/step.rb', line 193

def step_name
  @step_name ||= name.split('::').last.to_sym
end