Class: Fluent::Plugin::RedisOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::RedisOutput
- Defined in:
- lib/fluent/plugin/out_redis.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"- DEFAULT_TTL_VALUE =
-1
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
14 15 16 |
# File 'lib/fluent/plugin/out_redis.rb', line 14 def redis @redis end |
Instance Method Details
#configure(conf) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/fluent/plugin/out_redis.rb', line 31 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super if conf.has_key?('namespace') log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually." end raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time @unpacker = Fluent::Engine.msgpack_factory.unpacker end |
#format(tag, time, record) ⇒ Object
62 63 64 65 |
# File 'lib/fluent/plugin/out_redis.rb', line 62 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary ⇒ Object
67 68 69 |
# File 'lib/fluent/plugin/out_redis.rb', line 67 def formatted_to_msgpack_binary true end |
#shutdown ⇒ Object
57 58 59 60 |
# File 'lib/fluent/plugin/out_redis.rb', line 57 def shutdown @redis.quit super end |
#start ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/out_redis.rb', line 43 def start super = { host: @host, port: @port, thread_safe: true, db: @db_number } [:password] = @password if @password @redis = Redis.new() end |
#write(chunk) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/fluent/plugin/out_redis.rb', line 71 def write(chunk) tag, time = (chunk.) @redis.pipelined { unless @allow_duplicate_key stream = chunk.to_msgpack_stream @unpacker.feed_each(stream).with_index { |record, index| identifier = [tag, time].join(".") @redis.multi do @redis.mapped_hmset "#{identifier}.#{index}", record[2] @redis.expire "#{identifier}.#{index}", @ttl if @ttl > 0 end } else chunk.each do |_tag, _time, record| @redis.multi do @redis.mapped_hmset "#{tag}", record @redis.expire "#{tag}", @ttl if @ttl > 0 end end end } end |