Class: Fluent::Plugin::RedisEnrichmentFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_redis_enrichment.rb

Overview

filter plugin

enrich record based on redis fetched content

Defined Under Namespace

Classes: Cache, CleanroomExpander, NoCache, PlaceholderExpander, RedisPool

Constant Summary collapse

DEFAULT_REDIS_HOST =
'127.0.0.1'
DEFAULT_REDIS_PORT =
6379
DEFAULT_REDIS_DB =
0
DEFAULT_REDIS_PASSWORD =
nil
DEFAULT_REDIS_TIMEOUT =
5.0
DEFAULT_REDIS_POOL =
5
DEFAULT_SENTINELS =
nil
DEFAULT_SENTINEL_MASTER =
'mymaster'
DEFAULT_SENTINEL_PASSWORD =
nil
DEFAULT_REDIS_ROLE =
:slave
DEFAULT_SENTINEL_PORT =
26_379
DEFAULT_CACHE_TTL =
30 * 60
DEFAULT_CACHE_SIZE =
5000

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#record_enrichmentObject (readonly)

Returns the value of attribute record_enrichment.



77
78
79
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 77

def record_enrichment
  @record_enrichment
end

Instance Method Details

#cache_optionsObject



126
127
128
129
130
131
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 126

def cache_options
  {
    size: cache_size,
    ttl: cache_ttl
  }
end

#configure(conf) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 79

def configure(conf)
  super

  if !key || key.empty?
    raise Fluent::ConfigError,
          "key can't be empty, the value will be expanded to a redis key"
  end

  @record_enrichment = {}
  conf.elements.select { |element| element.name == 'record' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k)
      @record_enrichment[k] = parse_record_value(v)
    end
  end

  @placeholder_expander = PlaceholderExpander.new(log)
end

#filter(tag, time, record) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 111

def filter(tag, time, record)
  new_record = record.dup
  expanded_key = @placeholder_expander.expand(@key, { tag: tag,
                                                      time: time,
                                                      record: new_record })
  log.debug("filter_redis_enrichment: on tag:#{tag}, search #{expanded_key}")
  redis = @cache.getset(expanded_key) { @redis.get(expanded_key) }
  new_record_record_enrichment = @placeholder_expander.expand(@record_enrichment,
                                                              { tag: tag,
                                                                time: time,
                                                                record: new_record,
                                                                redis: redis })
  new_record.merge(new_record_record_enrichment)
end

#redis_optionsObject



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 133

def redis_options
  redis_options = {
    db: redis_db,
    password: redis_password,
    timeout: redis_timeout,
    pool_size: redis_pool
  }

  if sentinels
    formated_sentinels = sentinels.map do |sentinel|
      host, port = sentinel.split(':')
      port = (port || DEFAULT_SENTINEL_PORT).to_i
      formated = { host: host, port: port }
      formated[:password] = sentinel_password if sentinel_password
      formated
    end
    redis_options.update({
                           sentinels: formated_sentinels,
                           name: sentinel_master,
                           role: redis_role
                         })
  else
    redis_options.update({
                           host: redis_host,
                           port: redis_port
                         })
  end
end

#shutdownObject



105
106
107
108
109
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 105

def shutdown
  @redis.quit

  super
end

#startObject



98
99
100
101
102
103
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 98

def start
  super

  @cache = Cache.new(**cache_options)
  @redis = RedisPool.new(**redis_options)
end