Module: JqMixin
- Included in:
- Fluent::Plugin::JqFormatter, Fluent::Plugin::JqOutput, Fluent::Plugin::JqParser, Fluent::Plugin::JqTransformerFilter
- Defined in:
- lib/fluent/plugin/jq_mixin.rb
Constant Summary collapse
- JqError =
Class.new(RuntimeError)
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #jq_transform(object) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_process(filter: @jq, null_input: false) ⇒ Object
Class Method Details
.included(plugin) ⇒ Object
7 8 9 |
# File 'lib/fluent/plugin/jq_mixin.rb', line 7 def self.included(plugin) plugin.config_param :jq, :string end |
Instance Method Details
#configure(conf) ⇒ Object
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/fluent/plugin/jq_mixin.rb', line 11 def configure(conf) super p = start_process(null_input: true) err = p.read raise Fluent::ConfigError, "Could not parse jq filter: #{@jq}, error: #{err}" if err =~ /compile error/m rescue raise Fluent::ConfigError, "Could not parse jq filter: #{@jq}, error: #{$!.}" ensure p.close if p # if `super` fails, `p` will be `nil` end |
#jq_transform(object) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/jq_mixin.rb', line 37 def jq_transform(object) result = @lock.synchronize do @jq_process.puts MultiJson.dump(object) @jq_process.gets end MultiJson.load result rescue MultiJson::ParseError raise JqError.new(result) rescue Errno::EPIPE @jq_process.close @jq_process = start_process retry end |
#shutdown ⇒ Object
28 29 30 31 |
# File 'lib/fluent/plugin/jq_mixin.rb', line 28 def shutdown @jq_process.close rescue nil super end |
#start ⇒ Object
22 23 24 25 26 |
# File 'lib/fluent/plugin/jq_mixin.rb', line 22 def start super @jq_process = start_process @lock = Thread::Mutex.new end |
#start_process(filter: @jq, null_input: false) ⇒ Object
33 34 35 |
# File 'lib/fluent/plugin/jq_mixin.rb', line 33 def start_process(filter: @jq, null_input: false) IO.popen(%Q"jq #{'-n' if null_input} --unbuffered -c '#{filter}' 2>&1", 'r+') end |