Class: Alephant::Sequencer::Sequencer
- Inherits:
-
Object
- Object
- Alephant::Sequencer::Sequencer
- Includes:
- Logger
- Defined in:
- lib/alephant/sequencer/sequencer.rb
Instance Attribute Summary collapse
-
#cache ⇒ Object
readonly
Returns the value of attribute cache.
-
#ident ⇒ Object
readonly
Returns the value of attribute ident.
-
#jsonpath ⇒ Object
readonly
Returns the value of attribute jsonpath.
-
#keep_all ⇒ Object
readonly
Returns the value of attribute keep_all.
Class Method Summary collapse
Instance Method Summary collapse
- #delete! ⇒ Object
- #exists? ⇒ Boolean
- #get_last_seen(key = ident) ⇒ Object
-
#initialize(sequence_table, opts = {}) ⇒ Sequencer
constructor
A new instance of Sequencer.
- #sequential?(msg) ⇒ Boolean
- #set_last_seen(msg, last_seen_check = nil) ⇒ Object
- #truncate! ⇒ Object
- #validate(msg) ⇒ Object
Constructor Details
#initialize(sequence_table, opts = {}) ⇒ Sequencer
Returns a new instance of Sequencer.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/alephant/sequencer/sequencer.rb', line 10 def initialize(sequence_table, opts = {}) @sequence_table = sequence_table @cache = opts[:cache] @keep_all = opts[:keep_all] @ident = opts[:id] @exists = exists? @jsonpath = opts[:jsonpath] logger.info( 'event' => 'SequencerInitialized', 'sequenceTable' => sequence_table, 'jsonPath' => @jsonpath, 'id' => @ident, 'method' => "#{self.class}#initialize" ) end |
Instance Attribute Details
#cache ⇒ Object (readonly)
Returns the value of attribute cache.
8 9 10 |
# File 'lib/alephant/sequencer/sequencer.rb', line 8 def cache @cache end |
#ident ⇒ Object (readonly)
Returns the value of attribute ident.
8 9 10 |
# File 'lib/alephant/sequencer/sequencer.rb', line 8 def ident @ident end |
#jsonpath ⇒ Object (readonly)
Returns the value of attribute jsonpath.
8 9 10 |
# File 'lib/alephant/sequencer/sequencer.rb', line 8 def jsonpath @jsonpath end |
#keep_all ⇒ Object (readonly)
Returns the value of attribute keep_all.
8 9 10 |
# File 'lib/alephant/sequencer/sequencer.rb', line 8 def keep_all @keep_all end |
Class Method Details
.sequence_id_from(msg, path) ⇒ Object
86 87 88 |
# File 'lib/alephant/sequencer/sequencer.rb', line 86 def self.sequence_id_from(msg, path) JsonPath.on(msg.body, path).first.to_i end |
Instance Method Details
#delete! ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/alephant/sequencer/sequencer.rb', line 56 def delete! @exists = false @sequence_table.delete_item!(ident).tap do logger.info( 'event' => 'SequenceIdDeleted', 'id' => ident, 'method' => "#{self.class}#delete!" ) end end |
#exists? ⇒ Boolean
31 32 33 34 35 |
# File 'lib/alephant/sequencer/sequencer.rb', line 31 def exists? @exists || cache.get(ident) do @sequence_table.sequence_exists(ident) end end |
#get_last_seen(key = ident) ⇒ Object
80 81 82 83 84 |
# File 'lib/alephant/sequencer/sequencer.rb', line 80 def get_last_seen(key = ident) cache.get(key) do @sequence_table.sequence_for(key) end end |
#sequential?(msg) ⇒ Boolean
27 28 29 |
# File 'lib/alephant/sequencer/sequencer.rb', line 27 def sequential?(msg) (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath) end |
#set_last_seen(msg, last_seen_check = nil) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/alephant/sequencer/sequencer.rb', line 71 def set_last_seen(msg, last_seen_check = nil) seen_id = Sequencer.sequence_id_from(msg, jsonpath) @sequence_table.update_sequence_id( ident, seen_id, (exists? ? last_seen_check : nil) ) end |
#truncate! ⇒ Object
67 68 69 |
# File 'lib/alephant/sequencer/sequencer.rb', line 67 def truncate! @sequence_table.truncate! end |
#validate(msg) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/alephant/sequencer/sequencer.rb', line 37 def validate(msg) last_seen_id = get_last_seen sequential = ((last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath)) yield if sequential || keep_all if sequential set_last_seen(msg, last_seen_id) else logger.metric 'SequencerNonSequentialMessageCount' logger.info( 'event' => 'NonSequentialMessageReceived', 'id' => ident, 'lastSeenId' => last_seen_id, 'method' => "#{self.class}#validate" ) end end |