Class: OnStomp::Failover::Buffers::Receipts
- Defined in:
- lib/onstomp/failover/buffers/receipts.rb
Overview
TODO:
Quite a lot of this code is shared between Written and Receipts, we'll want to factor the common stuff out.
A buffer that ensures frames are RECEIPTed against a client's connection and replays the ones that were not when the failover client reconnects.
Instance Method Summary collapse
-
#debuffer_frame(r) ⇒ Object
Removes frames that neither transactional nor SUBSCRIBEs from the buffer by looking the buffered frames up by their
receiptheader. -
#initialize(failover) ⇒ Receipts
constructor
A new instance of Receipts.
Methods inherited from Base
Constructor Details
#initialize(failover) ⇒ Receipts
Returns a new instance of Receipts.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/onstomp/failover/buffers/receipts.rb', line 10 def initialize failover super [:send, :commit, :abort, :subscribe].each do |ev| failover.__send__(:"before_#{ev}") do |f, *_| add_to_buffer f, {:receipt => OnStomp.next_serial} end end failover.before_begin do |f, *_| add_to_transactions f, {:receipt => OnStomp.next_serial} end # We can scrub the subscription before UNSUBSCRIBE is fully written # because if we replay before UNSUBSCRIBE was sent, we still don't # want to be subscribed when we reconnect. failover.before_unsubscribe do |f, *_| remove_subscribe_from_buffer f end failover.on_receipt { |r, *_| debuffer_frame r } failover.on_failover_connected { |f,c,*_| replay_buffer c } end |
Instance Method Details
#debuffer_frame(r) ⇒ Object
Removes frames that neither transactional nor SUBSCRIBEs from the buffer
by looking the buffered frames up by their receipt header.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/onstomp/failover/buffers/receipts.rb', line 33 def debuffer_frame r orig = @buffer_mutex.synchronize do @buffer.detect { |f| f[:receipt] == r[:'receipt-id'] } end if orig # COMMIT and ABORT debuffer the whole transaction sequence if ['COMMIT', 'ABORT'].include? orig.command remove_from_transactions orig # Otherwise, if this isn't part of a transaction, debuffer the # particular frame (if it's not a SUBSCRIBE) elsif orig.command != 'SUBSCRIBE' && !orig.header?(:transaction) @buffer_mutex.synchronize { @buffer.delete orig } end end end |