Class: HotBunnies::Queue::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/hot_bunnies/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue_name, options = {}) ⇒ Subscription

Returns a new instance of Subscription.



64
65
66
67
68
69
70
# File 'lib/hot_bunnies/queue.rb', line 64

def initialize(channel, queue_name, options={})
  @channel    = channel
  @queue_name = queue_name
  @ack        = options.fetch(:ack, false)

  @cancelled  = java.util.concurrent.atomic.AtomicBoolean.new(false)
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



62
63
64
# File 'lib/hot_bunnies/queue.rb', line 62

def channel
  @channel
end

#consumer_tagObject (readonly)

Returns the value of attribute consumer_tag.



62
63
64
# File 'lib/hot_bunnies/queue.rb', line 62

def consumer_tag
  @consumer_tag
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



62
63
64
# File 'lib/hot_bunnies/queue.rb', line 62

def queue_name
  @queue_name
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/hot_bunnies/queue.rb', line 105

def active?
  !@cancelled.get && !@subscriber.nil? && !@subscriber.consumer_tag.nil?
end

#cancelObject



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/hot_bunnies/queue.rb', line 88

def cancel
  raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !self.active?
  @channel.basic_cancel(@subscriber.consumer_tag)

  # RabbitMQ Java client won't clear consumer_tag from cancelled consumers,
  # so we have to do this. Sharing consumers
  # between threads in general is a can of worms but someone somewhere
  # will almost certainly do it, so. MK.
  @cancelled.set(true)

  maybe_shutdown_executor
end

#cancelled?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/hot_bunnies/queue.rb', line 101

def cancelled?
  @cancelled.get
end

#each(options = {}, &block) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/hot_bunnies/queue.rb', line 72

def each(options={}, &block)
  raise 'The subscription already has a message listener' if @subscriber
  if options.fetch(:blocking, true)
    run(&block)
  else
    if options[:executor]
      @shut_down_executor = false
      @executor = options[:executor]
    else
      @shut_down_executor = true
      @executor = java.util.concurrent.Executors.new_single_thread_executor
    end
    @executor.submit { run(&block) }
  end
end

#shutdown!Object



109
110
111
112
113
# File 'lib/hot_bunnies/queue.rb', line 109

def shutdown!
  if @executor && @shut_down_executor
    @executor.shutdown_now
  end
end