Class: Alephant::Sequencer

Inherits:
Object
  • Object
show all
Defined in:
lib/alephant/models/sequencer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts, id) ⇒ Sequencer

Returns a new instance of Sequencer.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/alephant/models/sequencer.rb', line 20

def initialize(opts, id)
  @logger = ::Alephant.logger

  dynamo_db = AWS::DynamoDB.new

  @id = id
  @table_name = opts[:table_name]
  @table_conf = opts[:table_conf] || table_conf_defaults
  @table = dynamo_db.tables[@table_name]

  begin
    sleep_until_table_active
  rescue AWS::DynamoDB::Errors::ResourceNotFoundException
    @logger.error("Sequencer.initialize: DynamoDB resource was not found.")

    @table = dynamo_db.tables.create(
      @table_name,
      @table_conf[:read_units],
      @table_conf[:write_units],
      @table_conf[:schema]
    )

    @logger.info("Sequencer.initialize: Creating table with name #{@table_name}, read units #{@table_conf[:read_units]}, write units #{@table_conf[:write_units]}, schema #{@table_conf[:schema]}")

    sleep_until_table_active
  end

  @logger.info("Sequencer.initialize: end with id #{@id}")
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



5
6
7
# File 'lib/alephant/models/sequencer.rb', line 5

def id
  @id
end

#table_confObject (readonly)

Returns the value of attribute table_conf.



5
6
7
# File 'lib/alephant/models/sequencer.rb', line 5

def table_conf
  @table_conf
end

#table_nameObject (readonly)

Returns the value of attribute table_name.



5
6
7
# File 'lib/alephant/models/sequencer.rb', line 5

def table_name
  @table_name
end

Instance Method Details

#get_last_seenObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/alephant/models/sequencer.rb', line 67

def get_last_seen
  begin
    @table.batch_get(
      ['value'],
      [@id],
      {
        :consistent_read => true
      }
    ).first["value"].to_i
  rescue Exception => e
    trace = e.backtrace.join('\n')
    @logger.error("Sequencer.get_last_seen: id #{id}\nmessage: #{e.message}\ntrace: #{trace}")
    0
  end
end

#sequential?(data) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
53
54
55
56
# File 'lib/alephant/models/sequencer.rb', line 50

def sequential?(data)
  if block_given?
    yield(get_last_seen, data)
  else
    get_last_seen < data["sequence_id"].to_i
  end
end

#set_last_seen(data) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/alephant/models/sequencer.rb', line 58

def set_last_seen(data)
  last_seen_id = block_given? ? yield(data) : data["sequence_id"]

  batch = AWS::DynamoDB::BatchWrite.new
  batch.put(@table_name, [:key => @id,:value => last_seen_id])
  batch.process!
  @logger.info("Sequencer.set_last_seen: id #{id} and last_seen_id #{last_seen_id}")
end

#sleep_until_table_activeObject



83
84
85
# File 'lib/alephant/models/sequencer.rb', line 83

def sleep_until_table_active
  sleep 1 until @table.status == :active
end

#table_conf_defaultsObject



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/alephant/models/sequencer.rb', line 7

def table_conf_defaults
  {
    :write_units => 5,
    :read_units => 10,
    :schema => {
      :hash_key => {
        :key => :string,
        :value => :string
      }
    }
  }
end