Class: Wukong::Store::CassandraModel::AvroWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/wukong/store/cassandra/streaming.rb

Instance Method Summary collapse

Constructor Details

#initializeAvroWriter

Reads in the protocol schema creates the necessary encoder and writer.



30
31
32
33
34
35
36
37
38
# File 'lib/wukong/store/cassandra/streaming.rb', line 30

def initialize
  schema_file = Settings.cassandra_avro_schema
  @proto  = Avro::Protocol.parse(File.read(schema_file))
  @schema = @proto.types.detect{|schema| schema.name == 'StreamingMutation'}
  @enc    = Avro::IO::BinaryEncoder.new($stdout)
  # @enc    = DummyEncoder.new($stdout)
  @writer = Avro::IO::DatumWriter.new(@schema)
  # warn [@schema, @enc].inspect
end

Instance Method Details

#put(id, hsh, timestamp = nil, ttl = 0) ⇒ Object

Iterate through each key value pair in the hash to be inserted and write directly one at a time



57
58
59
60
61
62
# File 'lib/wukong/store/cassandra/streaming.rb', line 57

def put id, hsh, timestamp=nil, ttl=0
  timestamp ||= Time.now.to_i
  hsh.each do |attr, val|
    write_directly(id, attr, val, timestamp, ttl)
  end
end

#smutation(key, name, value) ⇒ Object



64
65
66
67
68
69
70
71
72
# File 'lib/wukong/store/cassandra/streaming.rb', line 64

def smutation key, name, value
  {
    'key'       => key,
    'name'      => name.to_s,
    'value'     => value.to_s,
    'timestamp' => Time.epoch_microseconds,
    'ttl'       => 0
  }
end

#write(key, col_name, value) ⇒ Object



40
41
42
# File 'lib/wukong/store/cassandra/streaming.rb', line 40

def write key, col_name, value
  @writer.write(smutation(key, col_name, value), @enc)
end

#write_directly(key, col_name, value, timestamp, ttl) ⇒ Object



44
45
46
47
48
49
50
51
# File 'lib/wukong/store/cassandra/streaming.rb', line 44

def write_directly key, col_name, value, timestamp, ttl
  # Log.info "Insert(row_key => #{key}, col_name => #{col_name}, value => #{value}"
  @enc.write_bytes(key)
  @enc.write_bytes(col_name)
  @enc.write_bytes(value)
  @enc.write_long(timestamp)
  @enc.write_int(ttl)
end