Class: Fluent::RedisOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::RedisOutput
- Defined in:
- lib/fluent/plugin/out_redisstore.rb
Instance Attribute Summary collapse
-
#db_number ⇒ Object
readonly
Returns the value of attribute db_number.
-
#fixed_key_value ⇒ Object
readonly
Returns the value of attribute fixed_key_value.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#key_expire ⇒ Object
readonly
Returns the value of attribute key_expire.
-
#key_name ⇒ Object
readonly
Returns the value of attribute key_name.
-
#key_prefix ⇒ Object
readonly
Returns the value of attribute key_prefix.
-
#key_suffix ⇒ Object
readonly
Returns the value of attribute key_suffix.
-
#order ⇒ Object
readonly
Returns the value of attribute order.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
-
#score_name ⇒ Object
readonly
Returns the value of attribute score_name.
-
#store_type ⇒ Object
readonly
Returns the value of attribute store_type.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#value_expire ⇒ Object
readonly
Returns the value of attribute value_expire.
-
#value_length ⇒ Object
readonly
Returns the value of attribute value_length.
-
#value_name ⇒ Object
readonly
Returns the value of attribute value_name.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #generate_ltrim_script(key, maxlen, order) ⇒ Object
- #generate_zremrangebyrank_script(key, maxlen, order) ⇒ Object
-
#initialize ⇒ RedisOutput
constructor
A new instance of RedisOutput.
- #operation_for_list(record) ⇒ Object
- #operation_for_set(record) ⇒ Object
- #operation_for_string(record) ⇒ Object
- #operation_for_zset(record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #traverse(data, key) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ RedisOutput
Returns a new instance of RedisOutput.
6 7 8 9 10 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 6 def initialize super require 'redis' require 'msgpack' end |
Instance Attribute Details
#db_number ⇒ Object (readonly)
Returns the value of attribute db_number.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def db_number @db_number end |
#fixed_key_value ⇒ Object (readonly)
Returns the value of attribute fixed_key_value.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def fixed_key_value @fixed_key_value end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def host @host end |
#key_expire ⇒ Object (readonly)
Returns the value of attribute key_expire.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def key_expire @key_expire end |
#key_name ⇒ Object (readonly)
Returns the value of attribute key_name.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def key_name @key_name end |
#key_prefix ⇒ Object (readonly)
Returns the value of attribute key_prefix.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def key_prefix @key_prefix end |
#key_suffix ⇒ Object (readonly)
Returns the value of attribute key_suffix.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def key_suffix @key_suffix end |
#order ⇒ Object (readonly)
Returns the value of attribute order.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def order @order end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def port @port end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def redis @redis end |
#score_name ⇒ Object (readonly)
Returns the value of attribute score_name.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def score_name @score_name end |
#store_type ⇒ Object (readonly)
Returns the value of attribute store_type.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def store_type @store_type end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def timeout @timeout end |
#value_expire ⇒ Object (readonly)
Returns the value of attribute value_expire.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def value_expire @value_expire end |
#value_length ⇒ Object (readonly)
Returns the value of attribute value_length.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def value_length @value_length end |
#value_name ⇒ Object (readonly)
Returns the value of attribute value_name.
4 5 6 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 4 def value_name @value_name end |
Instance Method Details
#configure(conf) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 12 def configure(conf) super @driver = conf.has_key?('driver') ? conf['driver'] : nil @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'].to_i : 6379 @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil @timeout = conf.has_key?('timeout') ? conf['timeout'].to_f : 5.0 @key_prefix = conf.has_key?('key_prefix') ? conf['key_prefix'] : '' @key_suffix = conf.has_key?('key_suffix') ? conf['key_suffix'] : '' @store_type = conf.has_key?('store_type') ? conf['store_type'] : 'zset' @key_name = conf['key_name'] @fixed_key_value = conf.has_key?('fixed_key_value') ? conf['fixed_key_value'] : nil @score_name = conf['score_name'] @value_name = conf['value_name'] @key_expire = conf.has_key?('key_expire') ? conf['key_expire'].to_i : -1 @value_expire = conf.has_key?('value_expire') ? conf['value_expire'].to_i : -1 @value_length = conf.has_key?('value_length') ? conf['value_length'].to_i : -1 @order = conf.has_key?('order') ? conf['order'] : 'asc' end |
#format(tag, time, record) ⇒ Object
59 60 61 62 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 59 def format(tag, time, record) identifier = [tag, time].join(".") [identifier, record].to_msgpack end |
#generate_ltrim_script(key, maxlen, order) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 190 def generate_ltrim_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('LLEN', key))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " return redis.call('LTRIM', key, l, -1)\n" script += " else\n" script += " return redis.call('LTRIM', key, 0, maxlen - 1)\n" script += " end\n" script += "end\n" return script end |
#generate_zremrangebyrank_script(key, maxlen, order) ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 172 def generate_zremrangebyrank_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('ZCOUNT', key, '-inf', '+inf'))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " if l >= 0 then\n" script += " return redis.call('ZREMRANGEBYRANK', key, 0, l)\n" script += " end\n" script += " else\n" script += " return redis.call('ZREMRANGEBYRANK', key, maxlen, -1)\n" script += " end\n" script += "end\n" return script end |
#operation_for_list(record) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 134 def operation_for_list(record) if @fixed_key_value k = @fixed_key_value else k = traverse(record, @key_name).to_s end v = traverse(record, @value_name) sk = @key_prefix + k + @key_suffix if @order == 'asc' @redis.rpush sk, v else @redis.lpush sk, v end if @key_expire > 0 @redis.expire sk, @key_expire end if @value_length > 0 script = generate_ltrim_script(sk, @value_length, @order) @redis.eval script end end |
#operation_for_set(record) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 119 def operation_for_set(record) if @fixed_key_value k = @fixed_key_value else k = traverse(record, @key_name).to_s end v = traverse(record, @value_name) sk = @key_prefix + k + @key_suffix @redis.sadd sk, v if @key_expire > 0 @redis.expire sk, @key_expire end end |
#operation_for_string(record) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 157 def operation_for_string(record) if @fixed_key_value k = @fixed_key_value else k = traverse(record, @key_name).to_s end v = traverse(record, @value_name) sk = @key_prefix + k + @key_suffix @redis.set sk, v if @key_expire > 0 @redis.expire sk, @key_expire end end |
#operation_for_zset(record) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 91 def operation_for_zset(record) now = Time.now.to_i if @fixed_key_value k = @fixed_key_value else k = traverse(record, @key_name).to_s end if @score_name s = traverse(record, @score_name) else s = now end v = traverse(record, @value_name) sk = @key_prefix + k + @key_suffix @redis.zadd sk , s, v if @key_expire > 0 @redis.expire sk , @key_expire end if @value_expire > 0 @redis.zremrangebyscore sk , '-inf' , (now - @value_expire) end if @value_length > 0 script = generate_zremrangebyrank_script(sk, @value_length, @order) @redis.eval script end end |
#shutdown ⇒ Object
53 54 55 56 57 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 53 def shutdown super @redis.quit end |
#start ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 35 def start super opt = { :host => @host, :port => @port, :db => @db_number, :timeout => @timeout, :thread_safe => true, } if @driver opt[:driver] = @driver.to_sym end @redis = Redis.new(opt) end |
#traverse(data, key) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 206 def traverse(data, key) val = data key.split('.').each{ |k| if val.has_key?(k) val = val[k] else return nil end } return val end |
#write(chunk) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/out_redisstore.rb', line 64 def write(chunk) @redis.pipelined { chunk.open { |io| begin MessagePack::Unpacker.new(io).each { || begin (tag, record) = if @store_type == 'zset' operation_for_zset(record) elsif @store_type == 'set' operation_for_set(record) elsif @store_type == 'list' operation_for_list(record) elsif @store_type == 'string' operation_for_string(record) end rescue NoMethodError => e puts e end } rescue EOFError # EOFError always occured when reached end of chunk. end } } end |