Class: Fluent::RedisStoreOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::RedisStoreOutput
- Defined in:
- lib/fluent/plugin/out_redis_store.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #generate_ltrim_script(key, maxlen, order) ⇒ Object
- #generate_zremrangebyrank_script(key, maxlen, order) ⇒ Object
- #get_key_from(record) ⇒ Object
- #get_score_from(record, time) ⇒ Object
- #get_value_from(record) ⇒ Object
-
#initialize ⇒ RedisStoreOutput
constructor
A new instance of RedisStoreOutput.
- #operation_for_list(record) ⇒ Object
- #operation_for_publish(record) ⇒ Object
- #operation_for_set(record) ⇒ Object
- #operation_for_string(record) ⇒ Object
- #operation_for_zset(record, time) ⇒ Object
- #set_key_expire(key) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #traverse(data, key) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ RedisStoreOutput
Returns a new instance of RedisStoreOutput.
27 28 29 30 31 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 27 def initialize super require 'redis' require 'msgpack' end |
Instance Method Details
#configure(conf) ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 33 def configure(conf) super if @key_path == nil and @key == nil raise Fluent::ConfigError, "either key_path or key is required" end end |
#format(tag, time, record) ⇒ Object
56 57 58 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 56 def format(tag, time, record) [tag, time, record].to_msgpack end |
#generate_ltrim_script(key, maxlen, order) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 162 def generate_ltrim_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('LLEN', key))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " return redis.call('LTRIM', key, l, -1)\n" script += " else\n" script += " return redis.call('LTRIM', key, 0, maxlen - 1)\n" script += " end\n" script += "end\n" return script end |
#generate_zremrangebyrank_script(key, maxlen, order) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 144 def generate_zremrangebyrank_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('ZCOUNT', key, '-inf', '+inf'))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " if l >= 0 then\n" script += " return redis.call('ZREMRANGEBYRANK', key, 0, l)\n" script += " end\n" script += " else\n" script += " return redis.call('ZREMRANGEBYRANK', key, maxlen, -1)\n" script += " end\n" script += "end\n" return script end |
#get_key_from(record) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 190 def get_key_from(record) if @key k = @key else k = traverse(record, @key_path).to_s end key = @key_prefix + k + @key_suffix raise Fluent::ConfigError, "key is empty" if key == '' key end |
#get_score_from(record, time) ⇒ Object
214 215 216 217 218 219 220 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 214 def get_score_from(record, time) if @score_path traverse(record, @score_path) else time end end |
#get_value_from(record) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 202 def get_value_from(record) value = traverse(record, @value_path) case @format_type when 'json' value.to_json when 'msgpack' value.to_msgpack else value end end |
#operation_for_list(record) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 114 def operation_for_list(record) key = get_key_from(record) value = get_value_from(record) if @order == 'asc' @redis.rpush key, value else @redis.lpush key, value end set_key_expire key if 0 < @value_length script = generate_ltrim_script(key, @value_length, @order) @redis.eval script end end |
#operation_for_publish(record) ⇒ Object
138 139 140 141 142 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 138 def operation_for_publish(record) key = get_key_from(record) value = get_value_from(record) @redis.publish key, value end |
#operation_for_set(record) ⇒ Object
107 108 109 110 111 112 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 107 def operation_for_set(record) key = get_key_from(record) value = get_value_from(record) @redis.sadd key, value set_key_expire key end |
#operation_for_string(record) ⇒ Object
130 131 132 133 134 135 136 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 130 def operation_for_string(record) key = get_key_from(record) value = get_value_from(record) @redis.set key, value set_key_expire key end |
#operation_for_zset(record, time) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 90 def operation_for_zset(record, time) key = get_key_from(record) value = get_value_from(record) score = get_score_from(record, time) @redis.zadd key, score, value set_key_expire key if 0 < @value_expire now = Time.now.to_i @redis.zremrangebyscore key , '-inf' , (now - @value_expire) end if 0 < @value_length script = generate_zremrangebyrank_script(key, @value_length, @order) @redis.eval script end end |
#set_key_expire(key) ⇒ Object
222 223 224 225 226 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 222 def set_key_expire(key) if 0 < @key_expire @redis.expire key, @key_expire end end |
#shutdown ⇒ Object
52 53 54 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 52 def shutdown @redis.quit end |
#start ⇒ Object
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 41 def start super if @path @redis = Redis.new(:path => @path, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) else @redis = Redis.new(:host => @host, :port => @port, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) end end |
#traverse(data, key) ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 178 def traverse(data, key) val = data key.split('.').each{ |k| if val.has_key?(k) val = val[k] else return nil end } return val end |
#write(chunk) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 60 def write(chunk) @redis.pipelined { chunk.open { |io| begin MessagePack::Unpacker.new(io).each { || begin (tag, time, record) = case @store_type when 'zset' operation_for_zset(record, time) when 'set' operation_for_set(record) when 'list' operation_for_list(record) when 'string' operation_for_string(record) when 'publish' operation_for_publish(record) end rescue NoMethodError => e puts e end } rescue EOFError # EOFError always occured when reached end of chunk. end } } end |