Class: Alephant::Sequencer
- Inherits:
-
Object
- Object
- Alephant::Sequencer
- Defined in:
- lib/alephant/models/sequencer.rb
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#table_conf ⇒ Object
readonly
Returns the value of attribute table_conf.
-
#table_name ⇒ Object
readonly
Returns the value of attribute table_name.
Instance Method Summary collapse
- #get_last_seen ⇒ Object
-
#initialize(opts, id) ⇒ Sequencer
constructor
A new instance of Sequencer.
- #sequential?(data) ⇒ Boolean
- #set_last_seen(data) ⇒ Object
- #sleep_until_table_active ⇒ Object
- #table_conf_defaults ⇒ Object
Constructor Details
#initialize(opts, id) ⇒ Sequencer
Returns a new instance of Sequencer.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/alephant/models/sequencer.rb', line 20 def initialize(opts, id) @logger = ::Alephant.logger dynamo_db = AWS::DynamoDB.new @id = id @table_name = opts[:table_name] @table_conf = opts[:table_conf] || table_conf_defaults @table = dynamo_db.tables[@table_name] begin sleep_until_table_active rescue AWS::DynamoDB::Errors::ResourceNotFoundException @logger.error("Sequencer.initialize: DynamoDB resource was not found.") @table = dynamo_db.tables.create( @table_name, @table_conf[:read_units], @table_conf[:write_units], @table_conf[:schema] ) @logger.info("Sequencer.initialize: Creating table with name #{@table_name}, read units #{@table_conf[:read_units]}, write units #{@table_conf[:write_units]}, schema #{@table_conf[:schema]}") sleep_until_table_active end @logger.info("Sequencer.initialize: end with id #{@id}") end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
5 6 7 |
# File 'lib/alephant/models/sequencer.rb', line 5 def id @id end |
#table_conf ⇒ Object (readonly)
Returns the value of attribute table_conf.
5 6 7 |
# File 'lib/alephant/models/sequencer.rb', line 5 def table_conf @table_conf end |
#table_name ⇒ Object (readonly)
Returns the value of attribute table_name.
5 6 7 |
# File 'lib/alephant/models/sequencer.rb', line 5 def table_name @table_name end |
Instance Method Details
#get_last_seen ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/alephant/models/sequencer.rb', line 67 def get_last_seen begin @table.batch_get( ['value'], [@id], { :consistent_read => true } ).first["value"].to_i rescue Exception => e trace = e.backtrace.join('\n') @logger.error("Sequencer.get_last_seen: id #{id}\nmessage: #{e.message}\ntrace: #{trace}") 0 end end |
#sequential?(data) ⇒ Boolean
50 51 52 53 54 55 56 |
# File 'lib/alephant/models/sequencer.rb', line 50 def sequential?(data) if block_given? yield(get_last_seen, data) else get_last_seen < data["sequence_id"].to_i end end |
#set_last_seen(data) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/alephant/models/sequencer.rb', line 58 def set_last_seen(data) last_seen_id = block_given? ? yield(data) : data["sequence_id"] batch = AWS::DynamoDB::BatchWrite.new batch.put(@table_name, [:key => @id,:value => last_seen_id]) batch.process! @logger.info("Sequencer.set_last_seen: id #{id} and last_seen_id #{last_seen_id}") end |
#sleep_until_table_active ⇒ Object
83 84 85 |
# File 'lib/alephant/models/sequencer.rb', line 83 def sleep_until_table_active sleep 1 until @table.status == :active end |
#table_conf_defaults ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/alephant/models/sequencer.rb', line 7 def table_conf_defaults { :write_units => 5, :read_units => 10, :schema => { :hash_key => { :key => :string, :value => :string } } } end |