Class: Alephant::Sequencer
- Inherits:
-
Object
- Object
- Alephant::Sequencer
- Defined in:
- lib/alephant/models/sequencer.rb
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#table_conf ⇒ Object
readonly
Returns the value of attribute table_conf.
-
#table_name ⇒ Object
readonly
Returns the value of attribute table_name.
Instance Method Summary collapse
- #get_last_seen ⇒ Object
- #get_sequence_id_from(data, jsonpath) ⇒ Object
-
#initialize(opts, id) ⇒ Sequencer
constructor
A new instance of Sequencer.
- #sequential?(data, jsonpath = nil) ⇒ Boolean
- #set_last_seen(data, jsonpath) ⇒ Object
- #sleep_until_table_active ⇒ Object
- #table_conf_defaults ⇒ Object
Constructor Details
#initialize(opts, id) ⇒ Sequencer
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/alephant/models/sequencer.rb', line 21 def initialize(opts, id) @logger = ::Alephant.logger dynamo_db = AWS::DynamoDB.new @id = id @table_name = opts[:table_name] @table_conf = opts[:table_conf] || table_conf_defaults @table = dynamo_db.tables[@table_name] begin sleep_until_table_active rescue AWS::DynamoDB::Errors::ResourceNotFoundException @logger.error("Sequencer.initialize: DynamoDB resource was not found.") @table = dynamo_db.tables.create( @table_name, @table_conf[:read_units], @table_conf[:write_units], @table_conf[:schema] ) @logger.info("Sequencer.initialize: Creating table with name #{@table_name}, read units #{@table_conf[:read_units]}, write units #{@table_conf[:write_units]}, schema #{@table_conf[:schema]}") sleep_until_table_active end @logger.info("Sequencer.initialize: end with id #{@id}") end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
6 7 8 |
# File 'lib/alephant/models/sequencer.rb', line 6 def id @id end |
#table_conf ⇒ Object (readonly)
Returns the value of attribute table_conf.
6 7 8 |
# File 'lib/alephant/models/sequencer.rb', line 6 def table_conf @table_conf end |
#table_name ⇒ Object (readonly)
Returns the value of attribute table_name.
6 7 8 |
# File 'lib/alephant/models/sequencer.rb', line 6 def table_name @table_name end |
Instance Method Details
#get_last_seen ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/alephant/models/sequencer.rb', line 70 def get_last_seen begin @table.batch_get( ['value'], [@id], { :consistent_read => true } ).first["value"].to_i rescue Exception => e trace = e.backtrace.join('\n') @logger.error("Sequencer.get_last_seen: id #{id}\nmessage: #{e.message}\ntrace: #{trace}") 0 end end |
#get_sequence_id_from(data, jsonpath) ⇒ Object
64 65 66 67 68 |
# File 'lib/alephant/models/sequencer.rb', line 64 def get_sequence_id_from(data, jsonpath) jsonpath.nil? ? data.body['sequence_id'].to_i : JsonPath.on(data.body, jsonpath).first end |
#sequential?(data, jsonpath = nil) ⇒ Boolean
51 52 53 |
# File 'lib/alephant/models/sequencer.rb', line 51 def sequential?(data, jsonpath = nil) get_last_seen < get_sequence_id_from(data, jsonpath) end |
#set_last_seen(data, jsonpath) ⇒ Object
55 56 57 58 59 60 61 62 |
# File 'lib/alephant/models/sequencer.rb', line 55 def set_last_seen(data, jsonpath) last_seen_id = get_sequence_id_from(data, jsonpath) batch = AWS::DynamoDB::BatchWrite.new batch.put(@table_name, [:key => @id,:value => last_seen_id]) batch.process! @logger.info("Sequencer.set_last_seen: id #{id} and last_seen_id #{last_seen_id}") end |
#sleep_until_table_active ⇒ Object
86 87 88 |
# File 'lib/alephant/models/sequencer.rb', line 86 def sleep_until_table_active sleep 1 until @table.status == :active end |
#table_conf_defaults ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/alephant/models/sequencer.rb', line 8 def table_conf_defaults { :write_units => 5, :read_units => 10, :schema => { :hash_key => { :key => :string, :value => :string } } } end |