Class: SQSRun::Worker
- Inherits:
-
Object
- Object
- SQSRun::Worker
- Defined in:
- lib/sqsrun/worker.rb
Defined Under Namespace
Classes: TimerThread
Instance Method Summary collapse
- #finished? ⇒ Boolean
- #init_proc(run_proc, kill_proc) ⇒ Object
-
#initialize(conf) ⇒ Worker
constructor
A new instance of Worker.
- #receive! ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(conf) ⇒ Worker
Returns a new instance of Worker.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/sqsrun/worker.rb', line 7 def initialize(conf) require 'right_aws' @key_id = conf[:key_id] @secret_key = conf[:secret_key] @queue_name = conf[:queue] @visibility_timeout = conf[:timeout] @extend_timeout = conf[:extend_timeout] @kill_timeout = conf[:kill_timeout] @kill_retry = conf[:kill_retry] @interval = conf[:interval] @release_on_fail = conf[:release_on_fail] @env = conf[:env] || {} @finished = false @extender = TimerThread.new(@visibility_timeout, @extend_timeout, @kill_timeout, @kill_retry) @sqs = RightAws::SqsGen2.new(@key_id, @secret_key) @queue = @sqs.queue(@queue_name) @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#finished? ⇒ Boolean
58 59 60 |
# File 'lib/sqsrun/worker.rb', line 58 def finished? @finished end |
#init_proc(run_proc, kill_proc) ⇒ Object
29 30 31 32 |
# File 'lib/sqsrun/worker.rb', line 29 def init_proc(run_proc, kill_proc) @run_proc = run_proc @extender.init_proc(kill_proc) end |
#receive! ⇒ Object
52 53 54 55 56 |
# File 'lib/sqsrun/worker.rb', line 52 def receive! @mutex.synchronize { @cond.broadcast } end |
#run ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/sqsrun/worker.rb', line 34 def run @extender.start until @finished msg = @queue.receive(@visibility_timeout) if msg process(msg) else cond_wait(@interval) end end end |
#shutdown ⇒ Object
46 47 48 49 50 |
# File 'lib/sqsrun/worker.rb', line 46 def shutdown @finished = true @extender.shutdown receive! end |