Class: HotBunnies::Queue::Subscription

Inherits:
Object
  • Object
show all
Includes:
JavaConcurrent
Defined in:
lib/hot_bunnies/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue_name, options = {}) ⇒ Subscription

Returns a new instance of Subscription.



76
77
78
79
80
81
82
# File 'lib/hot_bunnies/queue.rb', line 76

def initialize(channel, queue_name, options={})
  @channel    = channel
  @queue_name = queue_name
  @ack        = options.fetch(:ack, false)

  @cancelled  = AtomicBoolean.new(false)
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



74
75
76
# File 'lib/hot_bunnies/queue.rb', line 74

def channel
  @channel
end

#consumer_tagObject (readonly)

Returns the value of attribute consumer_tag.



74
75
76
# File 'lib/hot_bunnies/queue.rb', line 74

def consumer_tag
  @consumer_tag
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



74
75
76
# File 'lib/hot_bunnies/queue.rb', line 74

def queue_name
  @queue_name
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/hot_bunnies/queue.rb', line 114

def active?
  !@cancelled.get && !@consumer.nil? && !@consumer.consumer_tag.nil?
end

#cancelObject



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/hot_bunnies/queue.rb', line 97

def cancel
  raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !self.active?
  @consumer.cancel

  # RabbitMQ Java client won't clear consumer_tag from cancelled consumers,
  # so we have to do this. Sharing consumers
  # between threads in general is a can of worms but someone somewhere
  # will almost certainly do it, so. MK.
  @cancelled.set(true)

  maybe_shutdown_executor
end

#cancelled?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/hot_bunnies/queue.rb', line 110

def cancelled?
  @cancelled.get
end

#each(options = {}, &block) ⇒ Object Also known as: each_message



84
85
86
87
88
# File 'lib/hot_bunnies/queue.rb', line 84

def each(options={}, &block)
  raise 'The subscription already has a message listener' if @consumer
  start(create_consumer(options, block))
  nil
end

#shutdown!Object Also known as: shut_down!



118
119
120
121
122
# File 'lib/hot_bunnies/queue.rb', line 118

def shutdown!
  if @executor && @shut_down_executor
    @executor.shutdown_now
  end
end

#start(consumer) ⇒ Object



91
92
93
94
95
# File 'lib/hot_bunnies/queue.rb', line 91

def start(consumer)
  @consumer = consumer
  @consumer_tag = @channel.basic_consume(@queue_name, !@ack, @consumer)
  @consumer.start
end