Class: Qsagi::ConfirmedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/qsagi/confirmed_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ ConfirmedQueue

Returns a new instance of ConfirmedQueue.



7
8
9
10
11
12
13
# File 'lib/qsagi/confirmed_queue.rb', line 7

def initialize(queue)
  @queue = queue
  @nacked_messages = []
  @unconfirmed_messages = {}
  @wait_for_confirms = false
  @semaphore = Mutex.new
end

Instance Attribute Details

#nacked_messagesObject (readonly)

Returns the value of attribute nacked_messages.



5
6
7
# File 'lib/qsagi/confirmed_queue.rb', line 5

def nacked_messages
  @nacked_messages
end

Instance Method Details

#_channelObject



41
42
43
# File 'lib/qsagi/confirmed_queue.rb', line 41

def _channel
  @queue.channel
end

#_confirm_messages!(attributes) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/qsagi/confirmed_queue.rb', line 45

def _confirm_messages!(attributes)
  if attributes[:is_nack]
    if attributes[:multiple]
      @nacked_messages += @unconfirmed_messages.select { |k,v| k <= attributes[:delivery_tag] }.values
      @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] }
    else
      @nacked_messages << @unconfirmed_messages.delete(attributes[:delivery_tag])
    end
  else
    if attributes[:multiple]
      @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] }
    else
      @unconfirmed_messages.delete(attributes[:delivery_tag])
    end
  end
end

#_confirm_selectObject



62
63
64
65
66
67
68
69
70
# File 'lib/qsagi/confirmed_queue.rb', line 62

def _confirm_select
  callback = lambda do |delivery_tag, multiple, is_nack|
    @semaphore.synchronize do
      _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack)
    end
  end

  _channel.confirm_select(callback)
end

#_wait_for_confirms?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/qsagi/confirmed_queue.rb', line 72

def _wait_for_confirms?
  @wait_for_confirms
end

#connectObject



15
16
17
18
# File 'lib/qsagi/confirmed_queue.rb', line 15

def connect
  @queue.connect
  _confirm_select
end

#disconnectObject



20
21
22
# File 'lib/qsagi/confirmed_queue.rb', line 20

def disconnect
  @queue.disconnect
end

#pop(opts = {}) ⇒ Object



33
34
35
# File 'lib/qsagi/confirmed_queue.rb', line 33

def pop(opts={})
  @queue.pop(opts)
end

#push(message) ⇒ Object



24
25
26
27
28
29
30
31
# File 'lib/qsagi/confirmed_queue.rb', line 24

def push(message)
  next_sequence_number = _channel.next_publish_seq_no
  @semaphore.synchronize do
    @unconfirmed_messages[next_sequence_number] = message
  end
  @queue.push(message)
  @wait_for_confirms = true
end

#wait_for_confirmsObject



37
38
39
# File 'lib/qsagi/confirmed_queue.rb', line 37

def wait_for_confirms
  _channel.wait_for_confirms if _wait_for_confirms?
end