Class: Fluent::Plugin::RedisOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::RedisOutput
- Defined in:
- lib/fluent/plugin/out_redis.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"- DEFAULT_TTL_VALUE =
-1
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
14 15 16 |
# File 'lib/fluent/plugin/out_redis.rb', line 14 def redis @redis end |
Instance Method Details
#configure(conf) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/out_redis.rb', line 31 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) @running_multi_workers = system_config.workers > 1 super if conf.has_key?('namespace') log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually." end raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time @unpacker = Fluent::Engine.msgpack_factory.unpacker end |
#format(tag, time, record) ⇒ Object
63 64 65 66 |
# File 'lib/fluent/plugin/out_redis.rb', line 63 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary ⇒ Object
68 69 70 |
# File 'lib/fluent/plugin/out_redis.rb', line 68 def formatted_to_msgpack_binary true end |
#multi_workers_ready? ⇒ Boolean
72 73 74 |
# File 'lib/fluent/plugin/out_redis.rb', line 72 def multi_workers_ready? true end |
#shutdown ⇒ Object
58 59 60 61 |
# File 'lib/fluent/plugin/out_redis.rb', line 58 def shutdown @redis.quit super end |
#start ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/out_redis.rb', line 44 def start super = { host: @host, port: @port, thread_safe: true, db: @db_number } [:password] = @password if @password @redis = Redis.new() end |
#write(chunk) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/fluent/plugin/out_redis.rb', line 76 def write(chunk) tag, time = (chunk) @redis.pipelined { unless @allow_duplicate_key stream = chunk.to_msgpack_stream @unpacker.feed_each(stream).with_index { |record, index| identifier = if @running_multi_workers [tag, time, fluentd_worker_id].join(".") else [tag, time].join(".") end @redis.multi do @redis.mapped_hmset "#{identifier}.#{index}", record[2] @redis.expire "#{identifier}.#{index}", @ttl if @ttl > 0 end } else chunk.each do |_tag, _time, record| @redis.multi do @redis.mapped_hmset "#{tag}", record @redis.expire "#{tag}", @ttl if @ttl > 0 end end end } end |