Class: PgEventstore::BasicRunner

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Extensions::CallbacksExtension
Defined in:
lib/pg_eventstore/subscriptions/basic_runner.rb

Overview

Implements simple background job runner. A job execution is done via declaring a callback on a specific action. The implementation also allows you to hook into different places of life cycle of the runner by defining callbacks on various actions. Here is the list of available actions:

  • :before_runner_started. Happens before the runner’s state switches from “initial”/“stopped” to “running” and runner’s thread is started. It is also fired when the runner is restoring - right after :before_runner_restored action.

  • :after_runner_stopped. Happens after runner’s state got switched from “running”/“dead” to “stopped” and runner’s thread is terminated.

  • :before_runner_restored. Happens before runner’s state gets switched from “dead” to “running” and runner’s thread is started.

  • :process_async. Happens each @run_interval seconds within runner’s thread.

  • :after_runner_died. Happens when runner’s state switches to “dead” because of exception inside runner’s thread. Callback function must be able to accept one argument - the exception which caused the runner to die will be passed.

  • :change_state. It happens each time the runner changes the state. Callback function must be able to accept one argument - current state will be passed.

Example of BasicRunner usage:

class MyAwesomeRunner
  extend Forwardable

  def_delegators :@basic_runner, :start, :stop, :wait_for_finish, :stop_async, :restore

  class SimpleRecoveryStrategy
    include PgEventstore::RunnerRecoveryStrategy

    def initialize(restore_func)
      @attempts_count = 0
      @restore_func = restore_func
    end

    def recovers?(error)
      error.message.include?("I can not handle this any more!")
    end

    def recover(error)
      (@attempts_count < 3).tap do |res|
        @attempts_count += 1
        @restore_func.call if res
      end
    end
  end

  def initialize
    @basic_runner = PgEventstore::BasicRunner.new(
      run_interval: 1, async_shutdown_time: 2, recovery_strategies: recovery_strategies
    )
    @jobs_performed = 0
    @jobs_limit = 3
    attach_runner_callbacks
  end

  protected

  def work_harder
    @jobs_limit += 3
  end

  private

  def attach_runner_callbacks
    @basic_runner.define_callback(:change_state, :after, method(:state_changed))
    @basic_runner.define_callback(:process_async, :before, method(:process_action))
    @basic_runner.define_callback(:process_async, :after, method(:count_jobs))
    @basic_runner.define_callback(:before_runner_started, :before, method(:before_runner_started))
    @basic_runner.define_callback(:after_runner_stopped, :before, method(:after_runner_stopped))
    @basic_runner.define_callback(:after_runner_died, :before, method(:after_runner_died))
  end

  def process_action
    raise "What's the point? I can not handle this any more!" if @jobs_performed >= @jobs_limit
    puts "Doing some heavy lifting job"
    sleep 2 # Simulate long running job
  end

  def count_jobs
    @jobs_performed += 1
  end

  # @param state [String]
  def state_changed(state)
    puts "New state is #{state.inspect}"
  end

  def before_runner_started
    puts "Doing some preparations..."
  end

  def after_runner_stopped
    puts "You job is not processing any more. Total jobs performed: #{@jobs_performed}. Bye-bye!"
  end

  def after_runner_died(error)
    puts "Error occurred: #{error.inspect}"
  end

  def recovery_strategies
    [SimpleRecoveryStrategy.new(method(:work_harder))]
  end
end

runner = MyAwesomeRunner.new
runner.start # to start your background runner to process the job, defined by #process_action method
runner.stop # to stop the runner

See RunnerState for the list of available states See CallbacksExtension and Callbacks for more info about how to use callbacks

Instance Method Summary collapse

Methods included from Extensions::CallbacksExtension

#define_callback, included

Constructor Details

#initialize(run_interval:, async_shutdown_time:, recovery_strategies: []) ⇒ BasicRunner

Returns a new instance of BasicRunner.

Parameters:

  • run_interval (Integer, Float)

    seconds. Determines how often to run async task. Async task is determined by :after_runner_stopped callback

  • async_shutdown_time (Integer, Float)

    seconds. Determines how long to wait before force-shutdown the runner. It is only meaningful for the #stop_async



121
122
123
124
125
126
127
128
129
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 121

def initialize(run_interval:, async_shutdown_time:, recovery_strategies: [])
  @run_interval = run_interval
  @async_shutdown_time = async_shutdown_time
  @recovery_strategies = recovery_strategies
  @state = RunnerState.new
  @mutex = Thread::Mutex.new
  @runner = nil
  delegate_change_state_cbx
end

Instance Method Details

#restoreself

Restores the runner after its death.

Returns:

  • (self)


198
199
200
201
202
203
204
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 198

def restore
  within_state(:dead) do
    callbacks.run_callbacks(:before_runner_restored)
    _start
  end
  self
end

#startself

Start asynchronous runner. If the runner is dead - please use #restore to restart it.

Returns:

  • (self)


133
134
135
136
137
138
139
140
141
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 133

def start
  synchronize do
    return self unless @state.initial? || @state.stopped?

    callbacks.run_callbacks(:before_runner_started)
    _start
  end
  self
end

#stateString

Returns:

  • (String)


221
222
223
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 221

def state
  @state.to_s
end

#stopself

Stop asynchronous runner. This operation is immediate and it won’t be waiting for current job to finish - it will instantly halt it. If you care about the result of your async job - use #stop_async instead.

Returns:

  • (self)


146
147
148
149
150
151
152
153
154
155
156
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 146

def stop
  synchronize do
    return self unless @state.running? || @state.dead?

    @runner&.exit
    @runner = nil
    @state.stopped!
    callbacks.run_callbacks(:after_runner_stopped)
  end
  self
end

#stop_asyncself

Asynchronously stop asynchronous runner. This operation spawns another thread to gracefully stop the runner. It will wait up to @async_shutdown_time seconds before force-stopping the runner.

Returns:

  • (self)


161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 161

def stop_async
  synchronize do
    return self unless @state.running? || @state.dead?

    begin
      @state.halting!
    ensure
      Thread.new do
        stopping_at = Time.now.utc
        halt = false
        loop do
          synchronize do
            # Give the runner up to @async_shutdown_time seconds for graceful shutdown
            @runner&.exit if Time.now.utc - stopping_at > @async_shutdown_time

            unless @runner&.alive?
              @state.stopped!
              callbacks.run_callbacks(:after_runner_stopped)
            end
          ensure
            next if @runner&.alive?

            @runner = nil
            halt = true
          end
          break if halt

          sleep 0.1
        end
      end
    end
  end
  self
end

#wait_for_finishself

Wait until the runner switches the state to either “stopped” or “dead”. This operation is synchronous.

Returns:

  • (self)


208
209
210
211
212
213
214
215
216
217
218
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 208

def wait_for_finish
  loop do
    continue = synchronize do
      @state.halting? || @state.running?
    end
    break unless continue

    sleep 0.1
  end
  self
end

#within_state(state, &_blk) ⇒ Object?

Returns a result of evaluating of passed block.

Parameters:

  • state (Symbol)

Returns:

  • (Object, nil)

    a result of evaluating of passed block



227
228
229
230
231
232
233
# File 'lib/pg_eventstore/subscriptions/basic_runner.rb', line 227

def within_state(state, &_blk)
  synchronize do
    return unless @state.public_send("#{RunnerState::STATES.fetch(state)}?")

    yield
  end
end