Class: LogStash::JavaBasePipeline

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/java_pipeline.rb

Direct Known Subclasses

JavaPipeline

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #logger, #slow_logger

Constructor Details

#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ JavaBasePipeline

Returns a new instance of JavaBasePipeline.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/logstash/java_pipeline.rb', line 34

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
  @logger = self.logger
  @ephemeral_id = SecureRandom.uuid

  @pipeline_config = pipeline_config
  @config_str = pipeline_config.config_string
  @settings = pipeline_config.settings
  @config_hash = Digest::SHA1.hexdigest(@config_str)

  @lir = ConfigCompiler.configToPipelineIR(
    @config_str, @settings.get_value("config.support_escapes")
  )

  @pipeline_id = @settings.get_value("pipeline.id") || self.object_id
  @agent = agent
  @dlq_writer = dlq_writer
  @plugin_factory = LogStash::Plugins::PluginFactory.new(
    # use NullMetric if called in the BasePipeline context otherwise use the @metric value
    @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new),
    LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer),
    JavaFilterDelegator
  )
  @lir_execution = CompiledPipeline.new(@lir, @plugin_factory)
  if settings.get_value("config.debug") && @logger.debug?
    @logger.debug("Compiled pipeline code", default_logging_keys(:code => @lir.get_graph.to_string))
  end
  @inputs = @lir_execution.inputs
  @filters = @lir_execution.filters
  @outputs = @lir_execution.outputs
end

Instance Attribute Details

#config_hashObject (readonly)

Returns the value of attribute config_hash.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def config_hash
  @config_hash
end

#config_strObject (readonly)

Returns the value of attribute config_str.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def config_str
  @config_str
end

#ephemeral_idObject (readonly)

Returns the value of attribute ephemeral_id.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def ephemeral_id
  @ephemeral_id
end

#filtersObject (readonly)

Returns the value of attribute filters.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def filters
  @filters
end

#inputsObject (readonly)

Returns the value of attribute inputs.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def inputs
  @inputs
end

#lirObject (readonly)

Returns the value of attribute lir.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def lir
  @lir
end

#outputsObject (readonly)

Returns the value of attribute outputs.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def outputs
  @outputs
end

#pipeline_configObject (readonly)

Returns the value of attribute pipeline_config.



32
33
34
# File 'lib/logstash/java_pipeline.rb', line 32

def pipeline_config
  @pipeline_config
end

#pipeline_idObject (readonly)

Returns the value of attribute pipeline_id.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def pipeline_id
  @pipeline_id
end

#settingsObject (readonly)

Returns the value of attribute settings.



31
32
33
# File 'lib/logstash/java_pipeline.rb', line 31

def settings
  @settings
end

Instance Method Details

#buildCodec(name, *args) ⇒ Object



92
93
94
# File 'lib/logstash/java_pipeline.rb', line 92

def buildCodec(name, *args)
 plugin("codec", name, 0, 0, *args)
end

#buildFilter(name, line, column, *args) ⇒ Object



84
85
86
# File 'lib/logstash/java_pipeline.rb', line 84

def buildFilter(name, line, column, *args)
  plugin("filter", name, line, column, *args)
end

#buildInput(name, line, column, *args) ⇒ Object



88
89
90
# File 'lib/logstash/java_pipeline.rb', line 88

def buildInput(name, line, column, *args)
  plugin("input", name, line, column, *args)
end

#buildOutput(name, line, column, *args) ⇒ Object



80
81
82
# File 'lib/logstash/java_pipeline.rb', line 80

def buildOutput(name, line, column, *args)
  plugin("output", name, line, column, *args)
end

#close_dlq_writerObject



73
74
75
76
77
78
# File 'lib/logstash/java_pipeline.rb', line 73

def close_dlq_writer
  @dlq_writer.close
  if settings.get_value("dead_letter_queue.enable")
    DeadLetterQueueFactory.release(pipeline_id)
  end
end

#configured_as_reloadable?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/logstash/java_pipeline.rb', line 104

def configured_as_reloadable?
  settings.get("pipeline.reloadable")
end

#dlq_writerObject



65
66
67
68
69
70
71
# File 'lib/logstash/java_pipeline.rb', line 65

def dlq_writer
  if settings.get_value("dead_letter_queue.enable")
    @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes"))
  else
    @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
  end
end

#non_reloadable_pluginsObject



112
113
114
# File 'lib/logstash/java_pipeline.rb', line 112

def non_reloadable_plugins
  (inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
end

#plugin(plugin_type, name, line, column, *args) ⇒ Object



96
97
98
# File 'lib/logstash/java_pipeline.rb', line 96

def plugin(plugin_type, name, line, column, *args)
  @plugin_factory.plugin(plugin_type, name, line, column, *args)
end

#reloadable?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/logstash/java_pipeline.rb', line 100

def reloadable?
  configured_as_reloadable? && reloadable_plugins?
end

#reloadable_plugins?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/logstash/java_pipeline.rb', line 108

def reloadable_plugins?
  non_reloadable_plugins.empty?
end