Class: PgEventstore::BasicRunner
- Inherits:
-
Object
- Object
- PgEventstore::BasicRunner
- Extended by:
- Forwardable
- Includes:
- Extensions::CallbacksExtension
- Defined in:
- lib/pg_eventstore/subscriptions/basic_runner.rb
Overview
Implements simple background job runner. A job execution is done via declaring a callback on a specific action. The implementation also allows you to hook into different places of life cycle of the runner by defining callbacks on various actions. Here is the list of available actions:
-
:before_runner_started. Happens before the runner’s state switches from “initial”/“stopped” to “running” and runner’s thread is started. It is also fired when the runner is restoring - right after :before_runner_restored action.
-
:after_runner_stopped. Happens after runner’s state got switched from “running”/“dead” to “stopped” and runner’s thread is terminated.
-
:before_runner_restored. Happens before runner’s state gets switched from “dead” to “running” and runner’s thread is started.
-
:process_async. Happens each @run_interval seconds within runner’s thread.
-
:after_runner_died. Happens when runner’s state switches to “dead” because of exception inside runner’s thread. Callback function must be able to accept one argument - the exception which caused the runner to die will be passed.
-
:change_state. It happens each time the runner changes the state. Callback function must be able to accept one argument - current state will be passed.
Example of BasicRunner usage:
class MyAwesomeRunner
extend Forwardable
def_delegators :@basic_runner, :start, :stop, :wait_for_finish, :stop_async, :restore
class SimpleRecoveryStrategy
include PgEventstore::RunnerRecoveryStrategy
def initialize(restore_func)
@attempts_count = 0
@restore_func = restore_func
end
def recovers?(error)
error..include?("I can not handle this any more!")
end
def recover(error)
(@attempts_count < 3).tap do |res|
@attempts_count += 1
@restore_func.call if res
end
end
end
def initialize
@basic_runner = PgEventstore::BasicRunner.new(
run_interval: 1, async_shutdown_time: 2, recovery_strategies: recovery_strategies
)
@jobs_performed = 0
@jobs_limit = 3
attach_runner_callbacks
end
protected
def work_harder
@jobs_limit += 3
end
private
def attach_runner_callbacks
@basic_runner.define_callback(:change_state, :after, method(:state_changed))
@basic_runner.define_callback(:process_async, :before, method(:process_action))
@basic_runner.define_callback(:process_async, :after, method(:count_jobs))
@basic_runner.define_callback(:before_runner_started, :before, method(:before_runner_started))
@basic_runner.define_callback(:after_runner_stopped, :before, method(:after_runner_stopped))
@basic_runner.define_callback(:after_runner_died, :before, method(:after_runner_died))
end
def process_action
raise "What's the point? I can not handle this any more!" if @jobs_performed >= @jobs_limit
puts "Doing some heavy lifting job"
sleep 2 # Simulate long running job
end
def count_jobs
@jobs_performed += 1
end
# @param state [String]
def state_changed(state)
puts "New state is #{state.inspect}"
end
def before_runner_started
puts "Doing some preparations..."
end
def after_runner_stopped
puts "You job is not processing any more. Total jobs performed: #{@jobs_performed}. Bye-bye!"
end
def after_runner_died(error)
puts "Error occurred: #{error.inspect}"
end
def recovery_strategies
[SimpleRecoveryStrategy.new(method(:work_harder))]
end
end
runner = MyAwesomeRunner.new
runner.start # to start your background runner to process the job, defined by #process_action method
runner.stop # to stop the runner
See RunnerState for the list of available states See CallbacksExtension and Callbacks for more info about how to use callbacks
Instance Method Summary collapse
-
#initialize(run_interval:, async_shutdown_time:, recovery_strategies: []) ⇒ BasicRunner
constructor
A new instance of BasicRunner.
-
#restore ⇒ self
Restores the runner after its death.
-
#start ⇒ self
Start asynchronous runner.
- #state ⇒ String
-
#stop ⇒ self
Stop asynchronous runner.
-
#stop_async ⇒ self
Asynchronously stop asynchronous runner.
-
#wait_for_finish ⇒ self
Wait until the runner switches the state to either “stopped” or “dead”.
-
#within_state(state, &_blk) ⇒ Object?
A result of evaluating of passed block.
Methods included from Extensions::CallbacksExtension
Constructor Details
#initialize(run_interval:, async_shutdown_time:, recovery_strategies: []) ⇒ BasicRunner
Returns a new instance of BasicRunner.
121 122 123 124 125 126 127 128 129 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 121 def initialize(run_interval:, async_shutdown_time:, recovery_strategies: []) @run_interval = run_interval @async_shutdown_time = async_shutdown_time @recovery_strategies = recovery_strategies @state = RunnerState.new @mutex = Thread::Mutex.new @runner = nil delegate_change_state_cbx end |
Instance Method Details
#restore ⇒ self
Restores the runner after its death.
198 199 200 201 202 203 204 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 198 def restore within_state(:dead) do callbacks.run_callbacks(:before_runner_restored) _start end self end |
#start ⇒ self
Start asynchronous runner. If the runner is dead - please use #restore to restart it.
133 134 135 136 137 138 139 140 141 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 133 def start synchronize do return self unless @state.initial? || @state.stopped? callbacks.run_callbacks(:before_runner_started) _start end self end |
#state ⇒ String
221 222 223 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 221 def state @state.to_s end |
#stop ⇒ self
Stop asynchronous runner. This operation is immediate and it won’t be waiting for current job to finish - it will instantly halt it. If you care about the result of your async job - use #stop_async instead.
146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 146 def stop synchronize do return self unless @state.running? || @state.dead? @runner&.exit @runner = nil @state.stopped! callbacks.run_callbacks(:after_runner_stopped) end self end |
#stop_async ⇒ self
Asynchronously stop asynchronous runner. This operation spawns another thread to gracefully stop the runner. It will wait up to @async_shutdown_time seconds before force-stopping the runner.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 161 def stop_async synchronize do return self unless @state.running? || @state.dead? begin @state.halting! ensure Thread.new do stopping_at = Time.now.utc halt = false loop do synchronize do # Give the runner up to @async_shutdown_time seconds for graceful shutdown @runner&.exit if Time.now.utc - stopping_at > @async_shutdown_time unless @runner&.alive? @state.stopped! callbacks.run_callbacks(:after_runner_stopped) end ensure next if @runner&.alive? @runner = nil halt = true end break if halt sleep 0.1 end end end end self end |
#wait_for_finish ⇒ self
Wait until the runner switches the state to either “stopped” or “dead”. This operation is synchronous.
208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 208 def wait_for_finish loop do continue = synchronize do @state.halting? || @state.running? end break unless continue sleep 0.1 end self end |
#within_state(state, &_blk) ⇒ Object?
Returns a result of evaluating of passed block.
227 228 229 230 231 232 233 |
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 227 def within_state(state, &_blk) synchronize do return unless @state.public_send("#{RunnerState::STATES.fetch(state)}?") yield end end |