Class: Fluent::AerospikeOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_aerospike.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeAerospikeOutput

Returns a new instance of AerospikeOutput.



20
21
22
23
24
25
# File 'lib/fluent/plugin/out_aerospike.rb', line 20

def initialize
  super
  require 'aerospike'
  require 'msgpack'
  require 'uuidtools'
end

Instance Attribute Details

#write_policyObject (readonly)

Returns the value of attribute write_policy.



18
19
20
# File 'lib/fluent/plugin/out_aerospike.rb', line 18

def write_policy
  @write_policy
end

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/out_aerospike.rb', line 30

def configure(conf)
  super
  # You can also refer raw parameter via conf[name].
  write_policy = Aerospike::WritePolicy.new(
    Aerospike::RecordExistsAction::CREATE_ONLY,
    nil,
    nil,
    @ttl,
    nil
  )
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd. Convert the event to a raw string. [tag, time, record].to_json + “n” Alternatively, use msgpack to serialize the object. [tag, time, record].to_msgpack



61
62
63
# File 'lib/fluent/plugin/out_aerospike.rb', line 61

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#get_client(address) ⇒ Object



83
84
85
86
# File 'lib/fluent/plugin/out_aerospike.rb', line 83

def get_client(address)
  host_port = address.split(':', 2)
  return Aerospike::Client.new(host_port[0], host_port[1])
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



51
52
53
54
# File 'lib/fluent/plugin/out_aerospike.rb', line 51

def shutdown
  @client.close
  super
end

#startObject

This method is called when starting. Open sockets or files here.



44
45
46
47
# File 'lib/fluent/plugin/out_aerospike.rb', line 44

def start
  super
  @client = get_client(@address)
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins. Optionally, you can use chunk.msgpack_each to deserialize objects.



73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/out_aerospike.rb', line 73

def write(chunk)
  chunk.msgpack_each {|(tag,time,record)|
#         key_s = "#{time_key.nil? time : record[time_key]}-#{UUIDTools::UUID.random_create}"
    key_s = "#{record[@time_key] || time}-#{UUIDTools::UUID.random_create}"
    set_s = @set || tag
    key = Aerospike::Key.new(@namespace, set_s, key_s)
    @client.put(key, record, write_policy)
  }
end