Class: Fluent::Plugin::RedisEnrichmentFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::RedisEnrichmentFilter
show all
- Defined in:
- lib/fluent/plugin/filter_redis_enrichment.rb
Overview
filter plugin
enrich record based on redis fetched content
Defined Under Namespace
Classes: Cache, CleanroomExpander, NoCache, PlaceholderExpander, RedisPool
Constant Summary
collapse
- DEFAULT_REDIS_HOST =
'127.0.0.1'
- DEFAULT_REDIS_PORT =
6379
- DEFAULT_REDIS_DB =
0
- DEFAULT_REDIS_PASSWORD =
nil
- DEFAULT_REDIS_TIMEOUT =
5.0
- DEFAULT_REDIS_POOL =
5
- DEFAULT_SENTINELS =
nil
- DEFAULT_SENTINEL_MASTER =
'mymaster'
- DEFAULT_SENTINEL_PASSWORD =
nil
- DEFAULT_REDIS_ROLE =
:slave
- DEFAULT_SENTINEL_PORT =
26_379
- DEFAULT_CACHE_TTL =
30 * 60
- DEFAULT_CACHE_SIZE =
5000
Instance Attribute Summary collapse
Instance Method Summary
collapse
Instance Attribute Details
#record_enrichment ⇒ Object
Returns the value of attribute record_enrichment.
77
78
79
|
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 77
def record_enrichment
@record_enrichment
end
|
Instance Method Details
#cache_options ⇒ Object
126
127
128
129
130
131
|
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 126
def cache_options
{
size: cache_size,
ttl: cache_ttl
}
end
|
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 79
def configure(conf)
super
if !key || key.empty?
raise Fluent::ConfigError,
"key can't be empty, the value will be expanded to a redis key"
end
@record_enrichment = {}
conf.elements.select { |element| element.name == 'record' }.each do |element|
element.each_pair do |k, v|
element.has_key?(k)
@record_enrichment[k] = parse_record_value(v)
end
end
@placeholder_expander = PlaceholderExpander.new(log)
end
|
#filter(tag, time, record) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 111
def filter(tag, time, record)
new_record = record.dup
expanded_key = @placeholder_expander.expand(@key, { tag: tag,
time: time,
record: new_record })
log.debug("filter_redis_enrichment: on tag:#{tag}, search #{expanded_key}")
redis = @cache.getset(expanded_key) { @redis.get(expanded_key) }
new_record_record_enrichment = @placeholder_expander.expand(@record_enrichment,
{ tag: tag,
time: time,
record: new_record,
redis: redis })
new_record.merge(new_record_record_enrichment)
end
|
#redis_options ⇒ Object
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 133
def redis_options
redis_options = {
db: redis_db,
password: redis_password,
timeout: redis_timeout,
pool_size: redis_pool
}
if sentinels
formated_sentinels = sentinels.map do |sentinel|
host, port = sentinel.split(':')
port = (port || DEFAULT_SENTINEL_PORT).to_i
formated = { host: host, port: port }
formated[:password] = sentinel_password if sentinel_password
formated
end
redis_options.update({
sentinels: formated_sentinels,
name: sentinel_master,
role: redis_role
})
else
redis_options.update({
host: redis_host,
port: redis_port
})
end
end
|
#shutdown ⇒ Object
105
106
107
108
109
|
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 105
def shutdown
@redis.quit
super
end
|
#start ⇒ Object
98
99
100
101
102
103
|
# File 'lib/fluent/plugin/filter_redis_enrichment.rb', line 98
def start
super
@cache = Cache.new(**cache_options)
@redis = RedisPool.new(**redis_options)
end
|