Class: OnStomp::Failover::Buffers::Receipts

Inherits:
Base
  • Object
show all
Defined in:
lib/onstomp/failover/buffers/receipts.rb

Overview

TODO:

Quite a lot of this code is shared between Written and Receipts, we'll want to factor the common stuff out.

A buffer that ensures frames are RECEIPTed against a client's connection and replays the ones that were not when the failover client reconnects.

Instance Method Summary collapse

Methods inherited from Base

#buffered

Constructor Details

#initialize(failover) ⇒ Receipts

Returns a new instance of Receipts.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/onstomp/failover/buffers/receipts.rb', line 10

def initialize failover
  super
  [:send, :commit, :abort, :subscribe].each do |ev|
    failover.__send__(:"before_#{ev}") do |f, *_|
      add_to_buffer f, {:receipt => OnStomp.next_serial}
    end
  end
  failover.before_begin do |f, *_|
    add_to_transactions f, {:receipt => OnStomp.next_serial}
  end
  # We can scrub the subscription before UNSUBSCRIBE is fully written
  # because if we replay before UNSUBSCRIBE was sent, we still don't
  # want to be subscribed when we reconnect.
  failover.before_unsubscribe do |f, *_|
    remove_subscribe_from_buffer f
  end
  failover.on_receipt { |r, *_| debuffer_frame r }
  failover.on_failover_connected { |f,c,*_| replay_buffer c }
end

Instance Method Details

#debuffer_frame(r) ⇒ Object

Removes frames that neither transactional nor SUBSCRIBEs from the buffer by looking the buffered frames up by their receipt header.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/onstomp/failover/buffers/receipts.rb', line 33

def debuffer_frame r
  orig = @buffer_mutex.synchronize do
    @buffer.detect { |f| f[:receipt] == r[:'receipt-id'] }
  end
  if orig
    # COMMIT and ABORT debuffer the whole transaction sequence
    if ['COMMIT', 'ABORT'].include? orig.command
      remove_from_transactions orig
    # Otherwise, if this isn't part of a transaction, debuffer the
    # particular frame (if it's not a SUBSCRIBE)
    elsif orig.command != 'SUBSCRIBE' && !orig.header?(:transaction)
      @buffer_mutex.synchronize { @buffer.delete orig }
    end
  end
end