Class: Alephant::Sequencer::Sequencer

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/alephant/sequencer/sequencer.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sequence_table, opts = {}) ⇒ Sequencer

Returns a new instance of Sequencer.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/alephant/sequencer/sequencer.rb', line 10

def initialize(sequence_table, opts = {})
  @sequence_table = sequence_table

  @cache    = opts[:cache]
  @keep_all = opts[:keep_all]
  @ident    = opts[:id]
  @exists   = exists?
  @jsonpath = opts[:jsonpath]
  logger.info(
    'event'         => 'SequencerInitialized',
    'sequenceTable' => sequence_table,
    'jsonPath'      => @jsonpath,
    'id'            => @ident,
    'method'        => "#{self.class}#initialize"
  )
end

Instance Attribute Details

#cacheObject (readonly)

Returns the value of attribute cache.



8
9
10
# File 'lib/alephant/sequencer/sequencer.rb', line 8

def cache
  @cache
end

#identObject (readonly)

Returns the value of attribute ident.



8
9
10
# File 'lib/alephant/sequencer/sequencer.rb', line 8

def ident
  @ident
end

#jsonpathObject (readonly)

Returns the value of attribute jsonpath.



8
9
10
# File 'lib/alephant/sequencer/sequencer.rb', line 8

def jsonpath
  @jsonpath
end

#keep_allObject (readonly)

Returns the value of attribute keep_all.



8
9
10
# File 'lib/alephant/sequencer/sequencer.rb', line 8

def keep_all
  @keep_all
end

Class Method Details

.sequence_id_from(msg, path) ⇒ Object



86
87
88
# File 'lib/alephant/sequencer/sequencer.rb', line 86

def self.sequence_id_from(msg, path)
  JsonPath.on(msg.body, path).first.to_i
end

Instance Method Details

#delete!Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/alephant/sequencer/sequencer.rb', line 56

def delete!
  @exists = false
  @sequence_table.delete_item!(ident).tap do
    logger.info(
      'event'  => 'SequenceIdDeleted',
      'id'     => ident,
      'method' => "#{self.class}#delete!"
    )
  end
end

#exists?Boolean

Returns:

  • (Boolean)


31
32
33
34
35
# File 'lib/alephant/sequencer/sequencer.rb', line 31

def exists?
  @exists || cache.get(ident) do
    @sequence_table.sequence_exists(ident)
  end
end

#get_last_seen(key = ident) ⇒ Object



80
81
82
83
84
# File 'lib/alephant/sequencer/sequencer.rb', line 80

def get_last_seen(key = ident)
  cache.get(key) do
    @sequence_table.sequence_for(key)
  end
end

#sequential?(msg) ⇒ Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/alephant/sequencer/sequencer.rb', line 27

def sequential?(msg)
  (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath)
end

#set_last_seen(msg, last_seen_check = nil) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/alephant/sequencer/sequencer.rb', line 71

def set_last_seen(msg, last_seen_check = nil)
  seen_id = Sequencer.sequence_id_from(msg, jsonpath)

  @sequence_table.update_sequence_id(
    ident, seen_id,
    (exists? ? last_seen_check : nil)
  )
end

#truncate!Object



67
68
69
# File 'lib/alephant/sequencer/sequencer.rb', line 67

def truncate!
  @sequence_table.truncate!
end

#validate(msg) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/alephant/sequencer/sequencer.rb', line 37

def validate(msg)
  last_seen_id = get_last_seen
  sequential = ((last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath))

  yield if sequential || keep_all

  if sequential
    set_last_seen(msg, last_seen_id)
  else
    logger.metric 'SequencerNonSequentialMessageCount'
    logger.info(
      'event'      => 'NonSequentialMessageReceived',
      'id'         => ident,
      'lastSeenId' => last_seen_id,
      'method'     => "#{self.class}#validate"
    )
  end
end