Class: DirtyPipeline::Base
- Inherits:
-
Object
- Object
- DirtyPipeline::Base
show all
- Defined in:
- lib/dirty_pipeline/base.rb
Defined Under Namespace
Classes: InvalidTransition
Constant Summary
collapse
- DEFAULT_RETRY_DELAY =
5 * 60
- DEFAULT_CLEANUP_DELAY =
60 * 60 * 24
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(subject, uuid: nil) ⇒ Base
Returns a new instance of Base.
37
38
39
40
41
42
43
|
# File 'lib/dirty_pipeline/base.rb', line 37
def initialize(subject, uuid: nil)
@uuid = uuid || SecureRandom.uuid
@subject = subject
@storage = Storage.new(subject, self.class.pipeline_storage)
@railway = Railway.new(subject, @uuid)
@status = Status.success(subject)
end
|
Class Attribute Details
.cleanup_delay ⇒ Object
Returns the value of attribute cleanup_delay.
20
21
22
|
# File 'lib/dirty_pipeline/base.rb', line 20
def cleanup_delay
@cleanup_delay
end
|
.pipeline_storage ⇒ Object
Returns the value of attribute pipeline_storage.
20
21
22
|
# File 'lib/dirty_pipeline/base.rb', line 20
def pipeline_storage
@pipeline_storage
end
|
.retry_delay ⇒ Object
Returns the value of attribute retry_delay.
20
21
22
|
# File 'lib/dirty_pipeline/base.rb', line 20
def retry_delay
@retry_delay
end
|
.transitions_map ⇒ Object
Returns the value of attribute transitions_map.
13
14
15
|
# File 'lib/dirty_pipeline/base.rb', line 13
def transitions_map
@transitions_map
end
|
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
36
37
38
|
# File 'lib/dirty_pipeline/base.rb', line 36
def queue
@queue
end
|
#railway ⇒ Object
Returns the value of attribute railway.
36
37
38
|
# File 'lib/dirty_pipeline/base.rb', line 36
def railway
@railway
end
|
#status ⇒ Object
Returns the value of attribute status.
36
37
38
|
# File 'lib/dirty_pipeline/base.rb', line 36
def status
@status
end
|
#storage ⇒ Object
Returns the value of attribute storage.
36
37
38
|
# File 'lib/dirty_pipeline/base.rb', line 36
def storage
@storage
end
|
#subject ⇒ Object
Returns the value of attribute subject.
36
37
38
|
# File 'lib/dirty_pipeline/base.rb', line 36
def subject
@subject
end
|
#uuid ⇒ Object
Returns the value of attribute uuid.
36
37
38
|
# File 'lib/dirty_pipeline/base.rb', line 36
def uuid
@uuid
end
|
Class Method Details
.find_subject(*args) ⇒ Object
9
10
11
|
# File 'lib/dirty_pipeline/base.rb', line 9
def find_subject(*args)
fail NotImplemented
end
|
.inherited(child) ⇒ Object
14
15
16
17
18
19
|
# File 'lib/dirty_pipeline/base.rb', line 14
def inherited(child)
child.instance_variable_set(
:@transitions_map,
transitions_map || Hash.new
)
end
|
.transition(name, from:, to:, action: nil, attempts: 1) ⇒ Object
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/dirty_pipeline/base.rb', line 24
def transition(name, from:, to:, action: nil, attempts: 1)
action ||= method(name) if respond_to?(name)
action ||= const_get(name.to_s.camelcase(:upper))
@transitions_map[name.to_s] = {
action: action,
from: Array(from).map(&:to_s),
to: to.to_s,
attempts: attempts,
}
end
|
Instance Method Details
#call ⇒ Object
Also known as:
call_next
69
70
71
72
73
|
# File 'lib/dirty_pipeline/base.rb', line 69
def call
return self if (enqueued_event = railway.next).nil?
execute(load_event(enqueued_event))
end
|
#chain(*args, operation: :call) ⇒ Object
FIXME operation :call - argument
64
65
66
67
|
# File 'lib/dirty_pipeline/base.rb', line 64
def chain(*args, operation: :call)
railway[operation] << Event.create(*args, tx_id: @uuid)
self
end
|
#clean ⇒ Object
76
77
78
79
80
81
82
|
# File 'lib/dirty_pipeline/base.rb', line 76
def clean
finished = railway.queue.to_a.empty?
finished &&= railway.queue.processing_event.nil?
return self if finished
railway.switch_to(:undo)
call
end
|
#cleanup_delay ⇒ Object
104
|
# File 'lib/dirty_pipeline/base.rb', line 104
def cleanup_delay; self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY; end
|
#clear! ⇒ Object
58
59
60
61
|
# File 'lib/dirty_pipeline/base.rb', line 58
def clear!
storage.reset!
reset!
end
|
#find_transition(name) ⇒ Object
45
46
47
48
49
50
51
52
|
# File 'lib/dirty_pipeline/base.rb', line 45
def find_transition(name)
self.class.transitions_map.fetch(name.to_s).tap do |from:, **kwargs|
next unless railway.operation.eql?(:call)
next if from == Array(storage.status)
next if from.include?(storage.status.to_s)
raise InvalidTransition, "from `#{storage.status}` by `#{name}`"
end
end
|
#reset! ⇒ Object
54
55
56
|
# File 'lib/dirty_pipeline/base.rb', line 54
def reset!
railway.clear!
end
|
#retry ⇒ Object
84
85
86
87
|
# File 'lib/dirty_pipeline/base.rb', line 84
def retry
return self if (enqueued_event = railway.queue.processing_event).nil?
execute(load_event(enqueued_event), attempt_retry: true)
end
|
#retry_delay ⇒ Object
107
|
# File 'lib/dirty_pipeline/base.rb', line 107
def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end
|
#schedule(operation = "call", delay = nil) ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/dirty_pipeline/base.rb', line 89
def schedule(operation = "call", delay = nil)
job_args = {
"transaction_id" => @uuid,
"enqueued_pipeline" => self.class.to_s,
"find_subject_args" => find_subject_args,
"operation" => operation,
}
if delay.nil?
::DirtyPipeline::Worker.perform_async(job_args)
else
::DirtyPipeline::Worker.perform_in(delay, job_args)
end
end
|
#schedule_cleanup ⇒ Object
105
|
# File 'lib/dirty_pipeline/base.rb', line 105
def schedule_cleanup; schedule("cleanup", cleanup_delay); end
|
#schedule_retry ⇒ Object
108
|
# File 'lib/dirty_pipeline/base.rb', line 108
def schedule_retry; schedule("retry", retry_delay); end
|
#when_failure(tag = status.tag) {|status.data, _self| ... } ⇒ Object
120
121
122
123
|
# File 'lib/dirty_pipeline/base.rb', line 120
def when_failure(tag = status.tag)
yield(status.data, self) if status.failure? && status.tag == tag
self
end
|
#when_skipped {|nil, _self| ... } ⇒ Object
110
111
112
113
|
# File 'lib/dirty_pipeline/base.rb', line 110
def when_skipped
yield(nil, self) if railway.other_transaction_in_progress?
self
end
|
#when_success {|status.data, _self| ... } ⇒ Object
115
116
117
118
|
# File 'lib/dirty_pipeline/base.rb', line 115
def when_success
yield(status.data, self) if status.success?
self
end
|