Class: Karafka::Pro::RecurringTasks::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/recurring_tasks/task.rb

Overview

Represents a single recurring task that can be executed when the time comes. Tasks should be lightweight. Anything heavy should be executed by scheduling appropriate jobs here.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, cron:, previous_time: 0, enabled: true, &block) ⇒ Task



40
41
42
43
44
45
46
47
48
49
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 40

def initialize(id:, cron:, previous_time: 0, enabled: true, &block)
  @id = id
  @cron = ::Fugit::Cron.do_parse(cron)
  @previous_time = previous_time
  @start_time = Time.now
  @executable = block
  @enabled = enabled
  @trigger = false
  @changed = false
end

Instance Attribute Details

#cronFugit::Cron (readonly)



29
30
31
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 29

def cron
  @cron
end

#idString (readonly)



26
27
28
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 26

def id
  @id
end

#previous_timeObject

Allows for update of previous time when restoring the materialized state



32
33
34
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 32

def previous_time
  @previous_time
end

Instance Method Details

#callObject

Executes the given task and publishes appropriate notification bus events.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 95

def call
  monitor.instrument(
    'recurring_tasks.task.executed',
    task: self
  ) do
    # We check for presence of the `@executable` because user can define cron schedule
    # without the code block
    return unless @executable

    execute
  end
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    task: self,
    type: 'recurring_tasks.task.execute.error'
  )
ensure
  @trigger = false
  @previous_time = Time.now
end

#call?Boolean



85
86
87
88
89
90
91
92
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 85

def call?
  return true if @trigger
  return false unless enabled?

  # Ensure the job is only due if current_time is strictly after the next_time
  # Please note that we can only compare eorbi against time and not the other way around
  next_time <= Time.now
end

#changed?Boolean



52
53
54
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 52

def changed?
  @changed
end

#clearObject

Removes the changes indicator flag



126
127
128
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 126

def clear
  @changed = false
end

#disableObject

Disables this task execution indefinitely



57
58
59
60
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 57

def disable
  touch
  @enabled = false
end

#enableObject

Enables back this task



63
64
65
66
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 63

def enable
  touch
  @enabled = true
end

#enabled?Boolean



69
70
71
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 69

def enabled?
  @enabled
end

#executeObject

Runs the executable block without any instrumentation or error handling. Useful for debugging and testing



121
122
123
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 121

def execute
  @executable.call
end

#next_timeEtOrbi::EoTime



80
81
82
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 80

def next_time
  @cron.next_time(@previous_time.to_i.zero? ? @start_time : @previous_time)
end

#to_hHash



131
132
133
134
135
136
137
138
139
140
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 131

def to_h
  {
    id: id,
    cron: @cron.original,
    previous_time: previous_time,
    next_time: next_time,
    changed: changed?,
    enabled: enabled?
  }
end

#triggerObject

Triggers the execution of this task at the earliest opportunity



74
75
76
77
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 74

def trigger
  touch
  @trigger = true
end