Class: Rjob::WorkerProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/rjob/worker_process.rb

Overview

TODO: find a mechanism to recover from jobs that went to working but never returned

Constant Summary collapse

ITERATION_TIMEOUT =
2
HEARTBEAT_TIMEOUT =
15
StopSubscription =
Class.new(StandardError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(context) ⇒ WorkerProcess

Returns a new instance of WorkerProcess.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rjob/worker_process.rb', line 16

def initialize(context)
  @context = context
  @prefix = @context.prefix
  @pubsub_redis = @context.create_redis_connection

  init_worker_name

  @iteration_no = 0
  @max_queue_size = 20
  max_threads = @context.config.fetch(:max_threads, 2)

  @subscription_thread = nil
  @thread_pool = Concurrent::ThreadPoolExecutor.new(
    min_threads: [2, max_threads].min,
    max_threads: max_threads,
    max_queue: @max_queue_size,
    fallback_policy: :abort # Concurrent::RejectedExecutionError
  )

  @processed_count = Concurrent::AtomicFixnum.new
  @failed_count = Concurrent::AtomicFixnum.new
  @returned_count = Concurrent::AtomicFixnum.new

  @leader = nil
  @state = :new
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



11
12
13
# File 'lib/rjob/worker_process.rb', line 11

def context
  @context
end

#leaderObject (readonly)

Returns the value of attribute leader.



14
15
16
# File 'lib/rjob/worker_process.rb', line 14

def leader
  @leader
end

#stateObject (readonly)

Returns the value of attribute state.



13
14
15
# File 'lib/rjob/worker_process.rb', line 13

def state
  @state
end

#worker_nameObject (readonly)

Returns the value of attribute worker_name.



12
13
14
# File 'lib/rjob/worker_process.rb', line 12

def worker_name
  @worker_name
end

Instance Method Details

#run_foreverObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rjob/worker_process.rb', line 43

def run_forever
  register_worker

  Signal.trap("INT") do
    if @state == :exiting
      puts "Force exit requested. Exiting immediately"
      exit 1
    else
      @state = :exiting
      puts "Exiting..."
    end
  end

  @state = :running
  loop do
    break if @state == :exited
    run_iteration
  end
ensure
  unregister_worker
end