Class: Wukong::Store::CassandraModel::AvroWriter
- Defined in:
- lib/wukong/store/cassandra/streaming.rb
Instance Method Summary collapse
-
#initialize ⇒ AvroWriter
constructor
Reads in the protocol schema creates the necessary encoder and writer.
-
#put(id, hsh, timestamp = nil, ttl = 0) ⇒ Object
Iterate through each key value pair in the hash to be inserted and write directly one at a time.
- #smutation(key, name, value) ⇒ Object
- #write(key, col_name, value) ⇒ Object
- #write_directly(key, col_name, value, timestamp, ttl) ⇒ Object
Constructor Details
#initialize ⇒ AvroWriter
Reads in the protocol schema creates the necessary encoder and writer.
30 31 32 33 34 35 36 37 38 |
# File 'lib/wukong/store/cassandra/streaming.rb', line 30 def initialize schema_file = Settings.cassandra_avro_schema @proto = Avro::Protocol.parse(File.read(schema_file)) @schema = @proto.types.detect{|schema| schema.name == 'StreamingMutation'} @enc = Avro::IO::BinaryEncoder.new($stdout) # @enc = DummyEncoder.new($stdout) @writer = Avro::IO::DatumWriter.new(@schema) # warn [@schema, @enc].inspect end |
Instance Method Details
#put(id, hsh, timestamp = nil, ttl = 0) ⇒ Object
Iterate through each key value pair in the hash to be inserted and write directly one at a time
57 58 59 60 61 62 |
# File 'lib/wukong/store/cassandra/streaming.rb', line 57 def put id, hsh, =nil, ttl=0 ||= Time.now.to_i hsh.each do |attr, val| write_directly(id, attr, val, , ttl) end end |
#smutation(key, name, value) ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/wukong/store/cassandra/streaming.rb', line 64 def smutation key, name, value { 'key' => key, 'name' => name.to_s, 'value' => value.to_s, 'timestamp' => Time.epoch_microseconds, 'ttl' => 0 } end |
#write(key, col_name, value) ⇒ Object
40 41 42 |
# File 'lib/wukong/store/cassandra/streaming.rb', line 40 def write key, col_name, value @writer.write(smutation(key, col_name, value), @enc) end |
#write_directly(key, col_name, value, timestamp, ttl) ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/wukong/store/cassandra/streaming.rb', line 44 def write_directly key, col_name, value, , ttl # Log.info "Insert(row_key => #{key}, col_name => #{col_name}, value => #{value}" @enc.write_bytes(key) @enc.write_bytes(col_name) @enc.write_bytes(value) @enc.write_long() @enc.write_int(ttl) end |