Class: Vx::Lib::Consumer::Subscriber

Inherits:
Bunny::Consumer
  • Object
show all
Includes:
Instrument
Defined in:
lib/vx/lib/consumer/subscriber.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Instrument

#instrument

Constructor Details

#initialize(*args) ⇒ Subscriber

Returns a new instance of Subscriber.



13
14
15
16
# File 'lib/vx/lib/consumer/subscriber.rb', line 13

def initialize(*args)
  super(*args)
  @lock = Mutex.new
end

Instance Attribute Details

#queue_nameObject

Returns the value of attribute queue_name.



11
12
13
# File 'lib/vx/lib/consumer/subscriber.rb', line 11

def queue_name
  @queue_name
end

#vx_consumer_nameObject

Returns the value of attribute vx_consumer_name.



11
12
13
# File 'lib/vx/lib/consumer/subscriber.rb', line 11

def vx_consumer_name
  @vx_consumer_name
end

Instance Method Details

#call(*args) ⇒ Object



50
51
52
53
54
55
# File 'lib/vx/lib/consumer/subscriber.rb', line 50

def call(*args)
  in_progress do
    @on_delivery.call(*args) if @on_delivery
    sleep 0
  end
end

#cancelObject



57
58
59
60
61
62
63
# File 'lib/vx/lib/consumer/subscriber.rb', line 57

def cancel
  instrument('cancel_consumer', consumer: vx_consumer_name, channel: channel.id)
  unless closed?
    super
    channel.close unless closed?
  end
end

#closed?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/vx/lib/consumer/subscriber.rb', line 65

def closed?
  channel.closed?
end

#graceful_shutdownObject



18
19
20
21
22
23
24
# File 'lib/vx/lib/consumer/subscriber.rb', line 18

def graceful_shutdown
  instrument('try_graceful_shutdown_consumer', consumer: vx_consumer_name)
  in_progress do
    cancel
    instrument('graceful_shutdown_consumer', consumer: vx_consumer_name)
  end
end

#in_progressObject



40
41
42
43
44
# File 'lib/vx/lib/consumer/subscriber.rb', line 40

def in_progress
  @lock.synchronize do
    yield
  end
end

#joinObject



69
70
71
# File 'lib/vx/lib/consumer/subscriber.rb', line 69

def join
  channel.work_pool.join
end

#running?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/vx/lib/consumer/subscriber.rb', line 46

def running?
  @lock.locked?
end

#try_graceful_shutdownObject



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/vx/lib/consumer/subscriber.rb', line 26

def try_graceful_shutdown
  if @lock.try_lock
    begin
      instrument('graceful_shutdown_consumer', consumer: vx_consumer_name)
      cancel
    ensure
      @lock.unlock
    end
    true
  else
    false
  end
end

#wait_shutdownObject



73
74
75
76
77
78
79
# File 'lib/vx/lib/consumer/subscriber.rb', line 73

def wait_shutdown
  Thread.new do
    Thread.current.abort_on_exception = true
    Consumer.wait_shutdown
    cancel
  end
end