Module: JqMixin

Constant Summary collapse

JqError =
Class.new(RuntimeError)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(plugin) ⇒ Object



7
8
9
# File 'lib/fluent/plugin/jq_mixin.rb', line 7

def self.included(plugin)
  plugin.config_param :jq, :string
end

Instance Method Details

#configure(conf) ⇒ Object



11
12
13
14
15
16
17
18
19
20
# File 'lib/fluent/plugin/jq_mixin.rb', line 11

def configure(conf)
  super
  p = start_process(null_input: true)
  err = p.read
  raise Fluent::ConfigError, "Could not parse jq filter: #{@jq}, error: #{err}" if err =~ /compile error/m
rescue
  raise Fluent::ConfigError, "Could not parse jq filter: #{@jq}, error: #{$!.message}"
ensure
  p.close if p # if `super` fails, `p` will be `nil`
end

#jq_transform(object) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/jq_mixin.rb', line 37

def jq_transform(object)
  result = @lock.synchronize do
    @jq_process.puts MultiJson.dump(object)
    @jq_process.gets
  end
  MultiJson.load result
rescue MultiJson::ParseError
  raise JqError.new(result)
rescue Errno::EPIPE
  @jq_process.close
  @jq_process = start_process
  retry
end

#shutdownObject



28
29
30
31
# File 'lib/fluent/plugin/jq_mixin.rb', line 28

def shutdown
  @jq_process.close rescue nil
  super
end

#startObject



22
23
24
25
26
# File 'lib/fluent/plugin/jq_mixin.rb', line 22

def start
  super
  @jq_process = start_process
  @lock = Thread::Mutex.new
end

#start_process(filter: @jq, null_input: false) ⇒ Object



33
34
35
# File 'lib/fluent/plugin/jq_mixin.rb', line 33

def start_process(filter: @jq, null_input: false)
  IO.popen(%Q"jq #{'-n' if null_input} --unbuffered -c '#{filter}' 2>&1", 'r+')
end