Class: Aws::Xray::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/xray/worker.rb

Defined Under Namespace

Classes: Configuration

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ Worker

Returns a new instance of Worker.



46
47
48
# File 'lib/aws/xray/worker.rb', line 46

def initialize(queue)
  @queue = queue
end

Class Method Details

.post(payload) ⇒ Object

Parameters:



18
19
20
21
22
23
24
25
26
27
# File 'lib/aws/xray/worker.rb', line 18

def post(payload)
  Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post received a job")
  @post_lock.synchronize do
    refresh_if_forked
    @queue.push(payload)
  end
  Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post pushed a job")
rescue ThreadError => e
  raise QueueIsFullError.new(e)
end

.reset(config) ⇒ Object

Parameters:



30
31
32
33
34
# File 'lib/aws/xray/worker.rb', line 30

def reset(config)
  @queue = Thread::SizedQueue.new(config.max_queue_size)
  @workers.each(&:kill) if defined?(@workers) && !@workers.empty?
  @workers = Array.new(config.num) { new(@queue).run }
end

Instance Method Details

#runObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/aws/xray/worker.rb', line 50

def run
  th = Thread.new(@queue) do |queue|
    loop do
      Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run waits a job")
      payload = queue.pop
      Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received a job")
      if payload
        Client.send_payload(payload.to_s)
        Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run sent a payload")
      else
        Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received invalid item, ignored it")
      end
    end
  end
  th.abort_on_exception = true
  th
end