Class: DirtyPipeline::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/dirty_pipeline/base.rb

Defined Under Namespace

Classes: InvalidTransition

Constant Summary collapse

DEFAULT_RETRY_DELAY =

5 minutes

5 * 60
DEFAULT_CLEANUP_DELAY =

1 day

60 * 60 * 24

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subject, uuid: nil) ⇒ Base

Returns a new instance of Base.



37
38
39
40
41
42
43
# File 'lib/dirty_pipeline/base.rb', line 37

def initialize(subject, uuid: nil)
  @uuid = uuid || SecureRandom.uuid
  @subject = subject
  @storage = Storage.new(subject, self.class.pipeline_storage)
  @railway = Railway.new(subject, @uuid)
  @status = Status.success(subject)
end

Class Attribute Details

.cleanup_delayObject

Returns the value of attribute cleanup_delay.



20
21
22
# File 'lib/dirty_pipeline/base.rb', line 20

def cleanup_delay
  @cleanup_delay
end

.pipeline_storageObject

Returns the value of attribute pipeline_storage.



20
21
22
# File 'lib/dirty_pipeline/base.rb', line 20

def pipeline_storage
  @pipeline_storage
end

.retry_delayObject

Returns the value of attribute retry_delay.



20
21
22
# File 'lib/dirty_pipeline/base.rb', line 20

def retry_delay
  @retry_delay
end

.transitions_mapObject (readonly)

Returns the value of attribute transitions_map.



13
14
15
# File 'lib/dirty_pipeline/base.rb', line 13

def transitions_map
  @transitions_map
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



36
37
38
# File 'lib/dirty_pipeline/base.rb', line 36

def queue
  @queue
end

#railwayObject (readonly)

Returns the value of attribute railway.



36
37
38
# File 'lib/dirty_pipeline/base.rb', line 36

def railway
  @railway
end

#statusObject (readonly)

Returns the value of attribute status.



36
37
38
# File 'lib/dirty_pipeline/base.rb', line 36

def status
  @status
end

#storageObject (readonly)

Returns the value of attribute storage.



36
37
38
# File 'lib/dirty_pipeline/base.rb', line 36

def storage
  @storage
end

#subjectObject (readonly)

Returns the value of attribute subject.



36
37
38
# File 'lib/dirty_pipeline/base.rb', line 36

def subject
  @subject
end

#uuidObject (readonly)

Returns the value of attribute uuid.



36
37
38
# File 'lib/dirty_pipeline/base.rb', line 36

def uuid
  @uuid
end

Class Method Details

.find_subject(*args) ⇒ Object



9
10
11
# File 'lib/dirty_pipeline/base.rb', line 9

def find_subject(*args)
  fail NotImplemented
end

.inherited(child) ⇒ Object



14
15
16
17
18
19
# File 'lib/dirty_pipeline/base.rb', line 14

def inherited(child)
  child.instance_variable_set(
    :@transitions_map,
    transitions_map || Hash.new
  )
end

.transition(name, from:, to:, action: nil, attempts: 1) ⇒ Object



24
25
26
27
28
29
30
31
32
33
# File 'lib/dirty_pipeline/base.rb', line 24

def transition(name, from:, to:, action: nil, attempts: 1)
  action ||= method(name) if respond_to?(name)
  action ||= const_get(name.to_s.camelcase(:upper))
  @transitions_map[name.to_s] = {
    action: action,
    from: Array(from).map(&:to_s),
    to: to.to_s,
    attempts: attempts,
  }
end

Instance Method Details

#callObject Also known as: call_next



69
70
71
72
73
# File 'lib/dirty_pipeline/base.rb', line 69

def call
  # HANDLE ANOTHER ACTION IN PROGRESS EXPLICITLY
  return self if (enqueued_event = railway.next).nil?
  execute(load_event(enqueued_event))
end

#chain(*args, operation: :call) ⇒ Object

FIXME operation :call - argument



64
65
66
67
# File 'lib/dirty_pipeline/base.rb', line 64

def chain(*args, operation: :call)
  railway[operation] << Event.create(*args, tx_id: @uuid)
  self
end

#cleanObject



76
77
78
79
80
81
82
# File 'lib/dirty_pipeline/base.rb', line 76

def clean
  finished = railway.queue.to_a.empty?
  finished &&= railway.queue.processing_event.nil?
  return self if finished
  railway.switch_to(:undo)
  call
end

#cleanup_delayObject



104
# File 'lib/dirty_pipeline/base.rb', line 104

def cleanup_delay; self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY; end

#clear!Object



58
59
60
61
# File 'lib/dirty_pipeline/base.rb', line 58

def clear!
  storage.reset!
  reset!
end

#find_transition(name) ⇒ Object



45
46
47
48
49
50
51
52
# File 'lib/dirty_pipeline/base.rb', line 45

def find_transition(name)
  self.class.transitions_map.fetch(name.to_s).tap do |from:, **kwargs|
    next unless railway.operation.eql?(:call)
    next if from == Array(storage.status)
    next if from.include?(storage.status.to_s)
    raise InvalidTransition, "from `#{storage.status}` by `#{name}`"
  end
end

#reset!Object



54
55
56
# File 'lib/dirty_pipeline/base.rb', line 54

def reset!
  railway.clear!
end

#retryObject



84
85
86
87
# File 'lib/dirty_pipeline/base.rb', line 84

def retry
  return self if (enqueued_event = railway.queue.processing_event).nil?
  execute(load_event(enqueued_event), attempt_retry: true)
end

#retry_delayObject



107
# File 'lib/dirty_pipeline/base.rb', line 107

def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end

#schedule(operation = "call", delay = nil) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/dirty_pipeline/base.rb', line 89

def schedule(operation = "call", delay = nil)
  job_args = {
    "transaction_id" => @uuid,
    "enqueued_pipeline" => self.class.to_s,
    "find_subject_args" => find_subject_args,
    "operation" => operation,
  }

  if delay.nil?
    ::DirtyPipeline::Worker.perform_async(job_args)
  else
    ::DirtyPipeline::Worker.perform_in(delay, job_args)
  end
end

#schedule_cleanupObject



105
# File 'lib/dirty_pipeline/base.rb', line 105

def schedule_cleanup; schedule("cleanup", cleanup_delay); end

#schedule_retryObject



108
# File 'lib/dirty_pipeline/base.rb', line 108

def schedule_retry; schedule("retry",   retry_delay); end

#when_failure(tag = status.tag) {|status.data, _self| ... } ⇒ Object

Yields:

Yield Parameters:



120
121
122
123
# File 'lib/dirty_pipeline/base.rb', line 120

def when_failure(tag = status.tag)
  yield(status.data, self) if status.failure? && status.tag == tag
  self
end

#when_skipped {|nil, _self| ... } ⇒ Object

Yields:

  • (nil, _self)

Yield Parameters:



110
111
112
113
# File 'lib/dirty_pipeline/base.rb', line 110

def when_skipped
  yield(nil, self) if railway.other_transaction_in_progress?
  self
end

#when_success {|status.data, _self| ... } ⇒ Object

Yields:

Yield Parameters:



115
116
117
118
# File 'lib/dirty_pipeline/base.rb', line 115

def when_success
  yield(status.data, self) if status.success?
  self
end