Class: Fluent::Plugin::RedisEnrichmentFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_redis_enrichment.rb

Overview

filter plugin

enrich record based on redis fetched content

Defined Under Namespace

Modules: Cache Classes: CleanroomExpander, PlaceholderExpander, RedisPool

Constant Summary collapse

DEFAULT_REDIS_HOST =
'127.0.0.1'
DEFAULT_REDIS_PORT =
6379
DEFAULT_REDIS_DB =
0
DEFAULT_REDIS_PASSWORD =
nil
DEFAULT_REDIS_TIMEOUT =
5.0
DEFAULT_REDIS_POOL =
5
DEFAULT_SENTINELS =
nil
DEFAULT_SENTINEL_MASTER =
'mymaster'
DEFAULT_SENTINEL_PASSWORD =
nil
DEFAULT_REDIS_ROLE =
:slave
DEFAULT_SENTINEL_PORT =
26_379
DEFAULT_CACHE_TTL =
30 * 60
DEFAULT_CACHE_SIZE =
5000
DEFAULT_CACHE_TYPE =
:lazy

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#record_enrichmentObject (readonly)

Returns the value of attribute record_enrichment.



80
81
82
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 80

def record_enrichment
  @record_enrichment
end

Instance Method Details

#cache_optionsObject



130
131
132
133
134
135
136
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 130

def cache_options
  {
    type: cache_type,
    size: cache_size,
    ttl: cache_ttl
  }
end

#configure(conf) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 82

def configure(conf)
  super

  if !key || key.empty?
    raise Fluent::ConfigError,
          "key can't be empty, the value will be expanded to a redis key"
  end

  @record_enrichment = {}
  conf.elements.select { |element| element.name == 'record' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k)
      @record_enrichment[k] = parse_record_value(v)
    end
  end

  @placeholder_expander = PlaceholderExpander.new(log)
end

#filter(tag, time, record) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 115

def filter(tag, time, record)
  new_record = record.dup
  expanded_key = @placeholder_expander.expand(@key, { tag: tag,
                                                      time: time,
                                                      record: new_record })
  log.debug("filter_redis_enrichment: on tag:#{tag}, search #{expanded_key}")
  redis = @cache.get(expanded_key)
  new_record_record_enrichment = @placeholder_expander.expand(@record_enrichment,
                                                              { tag: tag,
                                                                time: time,
                                                                record: new_record,
                                                                redis: redis })
  new_record.merge(new_record_record_enrichment)
end

#redis_optionsObject



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 138

def redis_options
  redis_options = {
    db: redis_db,
    password: redis_password,
    timeout: redis_timeout,
    pool_size: redis_pool
  }

  if sentinels
    formated_sentinels = sentinels.map do |sentinel|
      host, port = sentinel.split(':')
      port = (port || DEFAULT_SENTINEL_PORT).to_i
      formated = { host: host, port: port }
      formated[:password] = sentinel_password if sentinel_password
      formated
    end
    redis_options.update({
                           sentinels: formated_sentinels,
                           name: sentinel_master,
                           role: redis_role
                         })
  else
    redis_options.update({
                           host: redis_host,
                           port: redis_port
                         })
  end
end

#shutdownObject



108
109
110
111
112
113
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 108

def shutdown
  @redis.quit
  @cache.clean

  super
end

#startObject



101
102
103
104
105
106
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 101

def start
  super

  @redis = RedisPool.new(log: log, **redis_options)
  @cache = Cache.new(redis: @redis, log: log, **cache_options)
end