Class: SQSRun::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sqsrun/worker.rb

Defined Under Namespace

Classes: TimerThread

Instance Method Summary collapse

Constructor Details

#initialize(conf) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/sqsrun/worker.rb', line 7

def initialize(conf)
  require 'right_aws'
  @key_id = conf[:key_id]
  @secret_key = conf[:secret_key]
  @queue_name = conf[:queue]
  @visibility_timeout = conf[:timeout]
  @extend_timeout = conf[:extend_timeout]
  @kill_timeout = conf[:kill_timeout]
  @kill_retry = conf[:kill_retry]
  @interval = conf[:interval]
  @release_on_fail = conf[:release_on_fail]
  @env = conf[:env] || {}
  @finished = false

  @extender = TimerThread.new(@visibility_timeout, @extend_timeout, @kill_timeout, @kill_retry)
  @sqs = RightAws::SqsGen2.new(@key_id, @secret_key)
  @queue = @sqs.queue(@queue_name)

  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#finished?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/sqsrun/worker.rb', line 58

def finished?
  @finished
end

#init_proc(run_proc, kill_proc) ⇒ Object



29
30
31
32
# File 'lib/sqsrun/worker.rb', line 29

def init_proc(run_proc, kill_proc)
  @run_proc = run_proc
  @extender.init_proc(kill_proc)
end

#receive!Object



52
53
54
55
56
# File 'lib/sqsrun/worker.rb', line 52

def receive!
  @mutex.synchronize {
    @cond.broadcast
  }
end

#runObject



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/sqsrun/worker.rb', line 34

def run
  @extender.start
  until @finished
    msg = @queue.receive(@visibility_timeout)
    if msg
      process(msg)
    else
      cond_wait(@interval)
    end
  end
end

#shutdownObject



46
47
48
49
50
# File 'lib/sqsrun/worker.rb', line 46

def shutdown
  @finished = true
  @extender.shutdown
  receive!
end