Class: Fluffle::Confirmer

Inherits:
Object
  • Object
show all
Defined in:
lib/fluffle/confirmer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel:) ⇒ Confirmer

Returns a new instance of Confirmer.



5
6
7
8
9
# File 'lib/fluffle/confirmer.rb', line 5

def initialize(channel:)
  @channel = channel

  @pending_confirms = Concurrent::Map.new
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



3
4
5
# File 'lib/fluffle/confirmer.rb', line 3

def channel
  @channel
end

Instance Method Details

#confirm_selectObject

Enables confirms on the channel and sets up callback to receive and unblock corresponding ‘with_confirmation` call.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fluffle/confirmer.rb', line 13

def confirm_select
  handle_confirm = ->(tag, _multiple, nack) do
    ivar = @pending_confirms.delete tag

    if ivar
      ivar.set nack
    else
      self.logger.error "Missing confirm IVar: tag=#{tag}"
    end
  end

  # Set the channel in confirmation mode so that we can receive confirms
  # of published messages
  @channel.confirm_select handle_confirm
end

#with_confirmation(timeout:) ⇒ Object

Wraps a block (which should publish a message) with a blocking check that it received a confirmation from the RabbitMQ server that the message that was received and routed successfully.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/fluffle/confirmer.rb', line 32

def with_confirmation(timeout:)
  tag = @channel.next_publish_seq_no
  confirm_ivar = Concurrent::IVar.new
  @pending_confirms[tag] = confirm_ivar

  result = yield

  nack = confirm_ivar.value timeout
  if confirm_ivar.incomplete?
    raise Errors::ConfirmTimeoutError.new('Timed out waiting for confirm')
  elsif nack
    raise Errors::NackError.new('Received nack from confirmation')
  end

  result
end