Class: LogStash::Outputs::Base

Inherits:
Plugin
  • Object
show all
Includes:
Config::Mixin, Util::Loggable
Defined in:
lib/logstash/outputs/base.rb

Direct Known Subclasses

Plugins::Builtin::Pipeline::Output

Constant Summary

Constants included from Config::Mixin

Config::Mixin::PLUGIN_VERSION_0_9_0, Config::Mixin::PLUGIN_VERSION_1_0_0

Constants included from Util::SubstitutionVariables

Util::SubstitutionVariables::SUBSTITUTION_PLACEHOLDER_REGEX

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Config::Mixin

#config_init, included

Methods included from Util::SubstitutionVariables

#deep_replace, #replace_placeholders

Constructor Details

#initialize(params = {}) ⇒ Base

Returns a new instance of Base.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/outputs/base.rb', line 58

def initialize(params={})
  super
  config_init(@params)

  if self.workers != 1
    raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option"
  end

  # If we're running with a single thread we must enforce single-threaded concurrency by default
  # Maybe in a future version we'll assume output plugins are threadsafe
  @single_worker_mutex = Mutex.new

  @receives_encoded = self.methods.include?(:multi_receive_encoded)
end

Class Method Details

.concurrency(type = nil) ⇒ Object

Set or return concurrency type



27
28
29
30
31
32
33
# File 'lib/logstash/outputs/base.rb', line 27

def self.concurrency(type=nil)
  if type
    @concurrency = type
  else
    @concurrency || :legacy # default is :legacyo
  end
end

.declare_threadsafe!Object

Deprecated: Favor ‘concurrency :shared`



36
37
38
# File 'lib/logstash/outputs/base.rb', line 36

def self.declare_threadsafe!
  concurrency :shared
end

.declare_workers_not_supported!(message = nil) ⇒ Object

Deprecated: Favor ‘concurrency :single` Remove in Logstash 6.0.0



47
48
49
# File 'lib/logstash/outputs/base.rb', line 47

def self.declare_workers_not_supported!(message=nil)
  concurrency :single
end

.plugin_typeObject



53
54
55
# File 'lib/logstash/outputs/base.rb', line 53

def self.plugin_type
  "output"
end

.threadsafe?Boolean

Deprecated: Favor ‘#concurrency`

Returns:

  • (Boolean)


41
42
43
# File 'lib/logstash/outputs/base.rb', line 41

def self.threadsafe?
  concurrency == :shared
end

Instance Method Details

#codecObject



97
98
99
# File 'lib/logstash/outputs/base.rb', line 97

def codec
  params["codec"]
end

#concurrencyObject



101
102
103
# File 'lib/logstash/outputs/base.rb', line 101

def concurrency
  self.class.concurrency
end

#execution_context=(context) ⇒ Object



112
113
114
115
116
117
118
119
120
# File 'lib/logstash/outputs/base.rb', line 112

def execution_context=(context)
  super
  # There is no easy way to propage an instance variable into the codec, because the codec
  # are created at the class level
  # TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
  #               parent plugin's
  @codec.execution_context = context
  context
end

#metric=(metric) ⇒ Object



105
106
107
108
109
110
# File 'lib/logstash/outputs/base.rb', line 105

def metric=(metric)
  super
  # Hack to create a new metric namespace using 'plugins' as the root
  @codec.metric = metric.root.namespace(metric.namespace_name[0...-2].push(:codecs, codec.id))
  metric
end

#multi_receive(events) ⇒ Object

To be overridden in implementations



85
86
87
88
89
90
91
# File 'lib/logstash/outputs/base.rb', line 85

def multi_receive(events)
  if @receives_encoded
    self.multi_receive_encoded(codec.multi_encode(events))
  else
    events.each {|event| receive(event) }
  end
end

#receive(event) ⇒ Object



79
80
81
# File 'lib/logstash/outputs/base.rb', line 79

def receive(event)
  raise "#{self.class}#receive must be overidden"
end

#registerObject



74
75
76
# File 'lib/logstash/outputs/base.rb', line 74

def register
  raise "#{self.class}#register must be overidden"
end

#workers_not_supported(message = nil) ⇒ Object



93
94
95
# File 'lib/logstash/outputs/base.rb', line 93

def workers_not_supported(message=nil)
  raise "This plugin (#{self.class.name}) is using the obsolete '#workers_not_supported' method. If you installed this plugin specifically on this Logstash version, it is not compatible. If you are a plugin author, please see https://www.elastic.co/guide/en/logstash/current/_how_to_write_a_logstash_output_plugin.html for more info"
end