Class: Fluent::Plugin::RedisOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::RedisOutput
- Defined in:
- lib/fluent/plugin/out_redis.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
13 14 15 |
# File 'lib/fluent/plugin/out_redis.rb', line 13 def redis @redis end |
Instance Method Details
#configure(conf) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/fluent/plugin/out_redis.rb', line 29 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super if conf.has_key?('namespace') log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually." end raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time @unpacker = Fluent::Engine.msgpack_factory.unpacker end |
#format(tag, time, record) ⇒ Object
60 61 62 63 |
# File 'lib/fluent/plugin/out_redis.rb', line 60 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary ⇒ Object
65 66 67 |
# File 'lib/fluent/plugin/out_redis.rb', line 65 def formatted_to_msgpack_binary true end |
#shutdown ⇒ Object
55 56 57 58 |
# File 'lib/fluent/plugin/out_redis.rb', line 55 def shutdown @redis.quit super end |
#start ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/out_redis.rb', line 41 def start super = { host: @host, port: @port, thread_safe: true, db: @db_number } [:password] = @password if @password @redis = Redis.new() end |
#write(chunk) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/out_redis.rb', line 69 def write(chunk) tag, time = (chunk.) @redis.pipelined { unless @allow_duplicate_key stream = chunk.to_msgpack_stream @unpacker.feed_each(stream).with_index { |record, index| identifier = [tag, time].join(".") @redis.mapped_hmset "#{identifier}.#{index}", record[2] } else chunk.each do |_tag, _time, record| @redis.mapped_hmset "#{tag}", record end end } end |