Class: Fluent::AerospikeOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::AerospikeOutput
- Defined in:
- lib/fluent/plugin/out_aerospike.rb
Instance Attribute Summary collapse
-
#write_policy ⇒ Object
readonly
Returns the value of attribute write_policy.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #get_client(address) ⇒ Object
-
#initialize ⇒ AerospikeOutput
constructor
A new instance of AerospikeOutput.
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Constructor Details
#initialize ⇒ AerospikeOutput
Returns a new instance of AerospikeOutput.
20 21 22 23 24 25 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 20 def initialize super require 'aerospike' require 'msgpack' require 'uuidtools' end |
Instance Attribute Details
#write_policy ⇒ Object (readonly)
Returns the value of attribute write_policy.
18 19 20 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 18 def write_policy @write_policy end |
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 30 def configure(conf) super # You can also refer raw parameter via conf[name]. write_policy = Aerospike::WritePolicy.new( Aerospike::RecordExistsAction::CREATE_ONLY, nil, nil, @ttl, nil ) end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string. [tag, time, record].to_json + “n” Alternatively, use msgpack to serialize the object. [tag, time, record].to_msgpack
61 62 63 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 61 def format(tag, time, record) [tag, time, record].to_msgpack end |
#get_client(address) ⇒ Object
83 84 85 86 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 83 def get_client(address) host_port = address.split(':', 2) return Aerospike::Client.new(host_port[0], host_port[1]) end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
51 52 53 54 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 51 def shutdown @client.close super end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
44 45 46 47 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 44 def start super @client = get_client(@address) end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins. Optionally, you can use chunk.msgpack_each to deserialize objects.
73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_aerospike.rb', line 73 def write(chunk) chunk.msgpack_each {|(tag,time,record)| # key_s = "#{time_key.nil? time : record[time_key]}-#{UUIDTools::UUID.random_create}" key_s = "#{record[@time_key] || time}-#{UUIDTools::UUID.random_create}" set_s = @set || tag key = Aerospike::Key.new(@namespace, set_s, key_s) @client.put(key, record, write_policy) } end |