Class: LogStash::JavaBasePipeline

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/java_pipeline.rb

Direct Known Subclasses

JavaPipeline

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #logger, #slow_logger

Constructor Details

#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ JavaBasePipeline

Returns a new instance of JavaBasePipeline.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/java_pipeline.rb', line 40

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
  @logger = self.logger
  @mutex = Mutex.new
  @ephemeral_id = SecureRandom.uuid

  @pipeline_config = pipeline_config
  @config_str = pipeline_config.config_string
  @settings = pipeline_config.settings
  @config_hash = Digest::SHA1.hexdigest(@config_str)

  @lir = ConfigCompiler.configToPipelineIR(
    @config_str, @settings.get_value("config.support_escapes")
  )

  @pipeline_id = @settings.get_value("pipeline.id") || self.object_id
  @agent = agent
  @dlq_writer = dlq_writer
  @plugin_factory = LogStash::Plugins::PluginFactory.new(
    # use NullMetric if called in the BasePipeline context otherwise use the @metric value
    @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new),
    @logger, LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer),
    JavaFilterDelegator
  )
  @lir_execution = CompiledPipeline.new(@lir, @plugin_factory)
  if settings.get_value("config.debug") && @logger.debug?
    @logger.debug("Compiled pipeline code", default_logging_keys(:code => @lir.get_graph.to_string))
  end
  @inputs = @lir_execution.inputs
  @filters = @lir_execution.filters
  @outputs = @lir_execution.outputs
end

Instance Attribute Details

#config_hashObject (readonly)

Returns the value of attribute config_hash.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def config_hash
  @config_hash
end

#config_strObject (readonly)

Returns the value of attribute config_str.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def config_str
  @config_str
end

#ephemeral_idObject (readonly)

Returns the value of attribute ephemeral_id.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def ephemeral_id
  @ephemeral_id
end

#execution_contextObject (readonly)

Returns the value of attribute execution_context.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def execution_context
  @execution_context
end

#filtersObject (readonly)

Returns the value of attribute filters.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def filters
  @filters
end

#inputsObject (readonly)

Returns the value of attribute inputs.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def inputs
  @inputs
end

#lirObject (readonly)

Returns the value of attribute lir.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def lir
  @lir
end

#outputsObject (readonly)

Returns the value of attribute outputs.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def outputs
  @outputs
end

#pipeline_configObject (readonly)

Returns the value of attribute pipeline_config.



38
39
40
# File 'lib/logstash/java_pipeline.rb', line 38

def pipeline_config
  @pipeline_config
end

#pipeline_idObject (readonly)

Returns the value of attribute pipeline_id.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def pipeline_id
  @pipeline_id
end

#settingsObject (readonly)

Returns the value of attribute settings.



37
38
39
# File 'lib/logstash/java_pipeline.rb', line 37

def settings
  @settings
end

Instance Method Details

#buildCodec(name, *args) ⇒ Object



99
100
101
# File 'lib/logstash/java_pipeline.rb', line 99

def buildCodec(name, *args)
 plugin("codec", name, 0, 0, *args)
end

#buildFilter(name, line, column, *args) ⇒ Object



91
92
93
# File 'lib/logstash/java_pipeline.rb', line 91

def buildFilter(name, line, column, *args)
  plugin("filter", name, line, column, *args)
end

#buildInput(name, line, column, *args) ⇒ Object



95
96
97
# File 'lib/logstash/java_pipeline.rb', line 95

def buildInput(name, line, column, *args)
  plugin("input", name, line, column, *args)
end

#buildOutput(name, line, column, *args) ⇒ Object



87
88
89
# File 'lib/logstash/java_pipeline.rb', line 87

def buildOutput(name, line, column, *args)
  plugin("output", name, line, column, *args)
end

#close_dlq_writerObject



80
81
82
83
84
85
# File 'lib/logstash/java_pipeline.rb', line 80

def close_dlq_writer
  @dlq_writer.close
  if settings.get_value("dead_letter_queue.enable")
    DeadLetterQueueFactory.release(pipeline_id)
  end
end

#configured_as_reloadable?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/logstash/java_pipeline.rb', line 111

def configured_as_reloadable?
  settings.get("pipeline.reloadable")
end

#dlq_writerObject



72
73
74
75
76
77
78
# File 'lib/logstash/java_pipeline.rb', line 72

def dlq_writer
  if settings.get_value("dead_letter_queue.enable")
    @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes"))
  else
    @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
  end
end

#non_reloadable_pluginsObject



119
120
121
# File 'lib/logstash/java_pipeline.rb', line 119

def non_reloadable_plugins
  (inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
end

#plugin(plugin_type, name, line, column, *args) ⇒ Object



103
104
105
# File 'lib/logstash/java_pipeline.rb', line 103

def plugin(plugin_type, name, line, column, *args)
  @plugin_factory.plugin(plugin_type, name, line, column, *args)
end

#reloadable?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/logstash/java_pipeline.rb', line 107

def reloadable?
  configured_as_reloadable? && reloadable_plugins?
end

#reloadable_plugins?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/logstash/java_pipeline.rb', line 115

def reloadable_plugins?
  non_reloadable_plugins.empty?
end