Class: Alephant::Sequencer

Inherits:
Object
  • Object
show all
Defined in:
lib/alephant/models/sequencer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts, id) ⇒ Sequencer



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/alephant/models/sequencer.rb', line 21

def initialize(opts, id)
  @logger = ::Alephant.logger

  dynamo_db = AWS::DynamoDB.new

  @id = id
  @table_name = opts[:table_name]
  @table_conf = opts[:table_conf] || table_conf_defaults
  @table = dynamo_db.tables[@table_name]

  begin
    sleep_until_table_active
  rescue AWS::DynamoDB::Errors::ResourceNotFoundException
    @logger.error("Sequencer.initialize: DynamoDB resource was not found.")

    @table = dynamo_db.tables.create(
      @table_name,
      @table_conf[:read_units],
      @table_conf[:write_units],
      @table_conf[:schema]
    )

    @logger.info("Sequencer.initialize: Creating table with name #{@table_name}, read units #{@table_conf[:read_units]}, write units #{@table_conf[:write_units]}, schema #{@table_conf[:schema]}")

    sleep_until_table_active
  end

  @logger.info("Sequencer.initialize: end with id #{@id}")
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/alephant/models/sequencer.rb', line 6

def id
  @id
end

#table_confObject (readonly)

Returns the value of attribute table_conf.



6
7
8
# File 'lib/alephant/models/sequencer.rb', line 6

def table_conf
  @table_conf
end

#table_nameObject (readonly)

Returns the value of attribute table_name.



6
7
8
# File 'lib/alephant/models/sequencer.rb', line 6

def table_name
  @table_name
end

Instance Method Details

#get_last_seenObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/alephant/models/sequencer.rb', line 70

def get_last_seen
  begin
    @table.batch_get(
      ['value'],
      [@id],
      {
        :consistent_read => true
      }
    ).first["value"].to_i
  rescue Exception => e
    trace = e.backtrace.join('\n')
    @logger.error("Sequencer.get_last_seen: id #{id}\nmessage: #{e.message}\ntrace: #{trace}")
    0
  end
end

#get_sequence_id_from(data, jsonpath) ⇒ Object



64
65
66
67
68
# File 'lib/alephant/models/sequencer.rb', line 64

def get_sequence_id_from(data, jsonpath)
  jsonpath.nil? ?
    data.body['sequence_id'].to_i :
    JsonPath.on(data.body, jsonpath).first
end

#sequential?(data, jsonpath = nil) ⇒ Boolean



51
52
53
# File 'lib/alephant/models/sequencer.rb', line 51

def sequential?(data, jsonpath = nil)
  get_last_seen < get_sequence_id_from(data, jsonpath)
end

#set_last_seen(data, jsonpath) ⇒ Object



55
56
57
58
59
60
61
62
# File 'lib/alephant/models/sequencer.rb', line 55

def set_last_seen(data, jsonpath)
  last_seen_id = get_sequence_id_from(data, jsonpath)

  batch = AWS::DynamoDB::BatchWrite.new
  batch.put(@table_name, [:key => @id,:value => last_seen_id])
  batch.process!
  @logger.info("Sequencer.set_last_seen: id #{id} and last_seen_id #{last_seen_id}")
end

#sleep_until_table_activeObject



86
87
88
# File 'lib/alephant/models/sequencer.rb', line 86

def sleep_until_table_active
  sleep 1 until @table.status == :active
end

#table_conf_defaultsObject



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/alephant/models/sequencer.rb', line 8

def table_conf_defaults
  {
    :write_units => 5,
    :read_units => 10,
    :schema => {
      :hash_key => {
        :key => :string,
        :value => :string
      }
    }
  }
end