Module: Plines::Step
- Defined in:
- lib/plines/step.rb
Overview
This is the module that should be included in any class that is intended to be a Plines step.
Defined Under Namespace
Modules: InstanceMethods
Classes: DependencyData, NotDeclaredInPipelineError, QlessJobOptions
Constant Summary
collapse
- DEFAULT_DEPENDENCY_FILTER =
Proc.new { true }
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.extended(klass) ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/plines/step.rb', line 33
def self.extended(klass)
klass.class_eval do
include InstanceMethods
unless pipeline.is_a?(Plines::Pipeline)
raise NotDeclaredInPipelineError,
"#{klass} is not nested in a pipeline module and thus " +
"cannot be a Plines::Step. All plines steps must be " +
"declared within pipeline modules."
end
fan_out { |d| [d] } pipeline.step_classes << self
end
end
|
Instance Method Details
#depended_on_by_all_steps ⇒ Object
57
58
59
|
# File 'lib/plines/step.rb', line 57
def depended_on_by_all_steps
pipeline.initial_step = self
end
|
#dependencies_for(job, batch_data) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/plines/step.rb', line 111
def dependencies_for(job, batch_data)
Enumerator.new do |yielder|
has_dependencies = false
each_declared_dependency_job_for(job, batch_data) do |job|
has_dependencies = true
yielder.yield job
end
each_initial_step_job_for(job, batch_data) do |job|
yielder.yield job
end unless has_dependencies
end
end
|
#depends_on(*klasses, &block) ⇒ Object
51
52
53
54
55
|
# File 'lib/plines/step.rb', line 51
def depends_on(*klasses, &block)
klasses.each do |klass|
dependency_filters[klass] = (block || DEFAULT_DEPENDENCY_FILTER)
end
end
|
#depends_on_all_steps ⇒ Object
61
62
63
|
# File 'lib/plines/step.rb', line 61
def depends_on_all_steps
pipeline.terminal_step = self
end
|
#enqueue_qless_job(qless, data, options = {}) ⇒ Object
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
# File 'lib/plines/step.rb', line 163
def enqueue_qless_job(qless, data, options = {})
queue_name = if has_external_dependencies_for?(data)
Pipeline::AWAITING_EXTERNAL_DEPENDENCY_QUEUE
elsif options[:queue] && processing_queue == :plines
options[:queue]
else
processing_queue
end
queue = qless.queues[queue_name]
options[:priority] = qless_options.priority if qless_options.priority
options[:retries] = qless_options.retries if qless_options.retries
options[:tags] = Array(options[:tags]) | qless_options.tags
queue.put(self, data, options)
end
|
#external_dependencies_for(data) ⇒ Object
101
102
103
104
105
106
107
108
109
|
# File 'lib/plines/step.rb', line 101
def external_dependencies_for(data)
list = ExternalDependencyList.new
external_dependency_definitions.each do |block|
block.call(list, data)
end
list.to_a
end
|
#external_dependency_definitions ⇒ Object
153
154
155
|
# File 'lib/plines/step.rb', line 153
def external_dependency_definitions
@external_dependency_definitions ||= []
end
|
#fan_out(&block) ⇒ Object
75
76
77
78
|
# File 'lib/plines/step.rb', line 75
def fan_out(&block)
@fan_out_blocks ||= []
@fan_out_blocks << block
end
|
#has_external_dependencies(*args, &block) ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/plines/step.rb', line 80
def has_external_dependencies(*args, &block)
block ||= begin
options = args.last.is_a?(Hash) ? args.pop : {}
lambda do |deps, _|
args.each do |name|
deps.add name, options
end
end
end
external_dependency_definitions << block
end
|
#has_external_dependencies_for?(data) ⇒ Boolean
93
94
95
96
97
98
99
|
# File 'lib/plines/step.rb', line 93
def has_external_dependencies_for?(data)
external_dependency_definitions.any? do |block|
list = ExternalDependencyList.new
block.call(list, data)
list.any?
end
end
|
#jobs_for(batch_data) ⇒ Object
126
127
128
129
130
131
132
|
# File 'lib/plines/step.rb', line 126
def jobs_for(batch_data)
@fan_out_blocks.inject([batch_data]) do |job_data_hashes, fan_out_block|
job_data_hashes.flat_map { |job_data| fan_out_block.call(job_data) }
end.map do |job_data|
Job.build(self, job_data)
end
end
|
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/plines/step.rb', line 134
def perform(qless_job)
batch = JobBatch.find(qless_job.client, pipeline,
qless_job.data.fetch("_job_batch_id"))
if batch.creation_in_progress?
qless_job.move(qless_job.queue_name, delay: 2)
return
end
job_data = DynamicStruct.new(qless_job.data)
qless_job.after_complete do
batch.mark_job_as_complete(qless_job.jid)
end
new(batch, job_data, qless_job)
.send(:around_perform)
end
|
#pipeline ⇒ Object
185
186
187
188
189
190
191
|
# File 'lib/plines/step.rb', line 185
def pipeline
@pipeline ||= begin
namespaces = name.split('::')
namespaces.pop namespaces.inject(Object) { |ns, mod| ns.const_get(mod) }
end
end
|
#processing_queue ⇒ Object
181
182
183
|
# File 'lib/plines/step.rb', line 181
def processing_queue
qless_options.queue
end
|
#qless_options {|@qless_options| ... } ⇒ Object
157
158
159
160
161
|
# File 'lib/plines/step.rb', line 157
def qless_options
@qless_options ||= QlessJobOptions.new
yield @qless_options if block_given?
@qless_options
end
|
#run_jobs_in_serial ⇒ Object
65
66
67
68
69
70
71
72
73
|
# File 'lib/plines/step.rb', line 65
def run_jobs_in_serial
depends_on step_name do |data|
prior_data = data.my_data_hashes.each_cons(2) do |(prior, current)|
break prior if current == data.my_data
end
data.their_data == prior_data
end
end
|
#step_name ⇒ Object
193
194
195
|
# File 'lib/plines/step.rb', line 193
def step_name
@step_name ||= name.split('::').last.to_sym
end
|