Class: LogStash::Outputs::Base

Inherits:
Plugin
  • Object
show all
Includes:
Config::Mixin, Util::Loggable
Defined in:
lib/logstash/outputs/base.rb

Constant Summary

Constants included from Config::Mixin

Config::Mixin::PLUGIN_VERSION_0_9_0, Config::Mixin::PLUGIN_VERSION_1_0_0

Constants included from Util::SubstitutionVariables

Util::SubstitutionVariables::SUBSTITUTION_PLACEHOLDER_REGEX

Constants inherited from Plugin

Plugin::NL

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#execution_context, #params

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Config::Mixin

#config_init, included

Methods included from Util::SubstitutionVariables

#deep_replace, #replace_placeholders

Methods included from Util::Loggable

included, #logger, #slow_logger

Methods inherited from Plugin

#close, #config_name, #debug_info, #do_close, #eql?, #hash, #id, #inspect, lookup, #metric, #metric=, #reloadable?, reloadable?, #to_s

Constructor Details

#initialize(params = {}) ⇒ Base

Returns a new instance of Base.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/logstash/outputs/base.rb', line 61

def initialize(params={})
  super
  config_init(@params)

  if self.workers != 1
    raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option"
  end

  # If we're running with a single thread we must enforce single-threaded concurrency by default
  # Maybe in a future version we'll assume output plugins are threadsafe
  @single_worker_mutex = Mutex.new
  
  @receives_encoded = self.methods.include?(:multi_receive_encoded)
end

Class Method Details

.concurrency(type = nil) ⇒ Object

Set or return concurrency type



30
31
32
33
34
35
36
# File 'lib/logstash/outputs/base.rb', line 30

def self.concurrency(type=nil)
  if type
    @concurrency = type
  else
    @concurrency || :legacy # default is :legacyo
  end
end

.declare_threadsafe!Object

Deprecated: Favor ‘concurrency :shared`



39
40
41
# File 'lib/logstash/outputs/base.rb', line 39

def self.declare_threadsafe!
  concurrency :shared
end

.declare_workers_not_supported!(message = nil) ⇒ Object

Deprecated: Favor ‘concurrency :single` Remove in Logstash 6.0.0



50
51
52
# File 'lib/logstash/outputs/base.rb', line 50

def self.declare_workers_not_supported!(message=nil)
  concurrency :single
end

.plugin_typeObject



56
57
58
# File 'lib/logstash/outputs/base.rb', line 56

def self.plugin_type
  "output"
end

.threadsafe?Boolean

Deprecated: Favor ‘#concurrency`

Returns:

  • (Boolean)


44
45
46
# File 'lib/logstash/outputs/base.rb', line 44

def self.threadsafe?
  concurrency == :shared
end

Instance Method Details

#codecObject



100
101
102
# File 'lib/logstash/outputs/base.rb', line 100

def codec
  params["codec"]
end

#concurrencyObject



104
105
106
# File 'lib/logstash/outputs/base.rb', line 104

def concurrency
  self.class.concurrency
end

#execution_context=(context) ⇒ Object



108
109
110
111
112
113
114
115
116
# File 'lib/logstash/outputs/base.rb', line 108

def execution_context=(context)
  super
  # There is no easy way to propage an instance variable into the codec, because the codec
  # are created at the class level
  # TODO(talevy): Codecs should have their own execution_context, for now they will inherit their 
  #               parent plugin's
  @codec.execution_context = context
  context
end

#multi_receive(events) ⇒ Object

To be overridden in implementations



88
89
90
91
92
93
94
# File 'lib/logstash/outputs/base.rb', line 88

def multi_receive(events)
  if @receives_encoded
    self.multi_receive_encoded(codec.multi_encode(events))
  else
    events.each {|event| receive(event) }
  end
end

#receive(event) ⇒ Object



82
83
84
# File 'lib/logstash/outputs/base.rb', line 82

def receive(event)
  raise "#{self.class}#receive must be overidden"
end

#registerObject



77
78
79
# File 'lib/logstash/outputs/base.rb', line 77

def register
  raise "#{self.class}#register must be overidden"
end

#workers_not_supported(message = nil) ⇒ Object



96
97
98
# File 'lib/logstash/outputs/base.rb', line 96

def workers_not_supported(message=nil)
  raise "This plugin (#{self.class.name}) is using the obsolete '#workers_not_supported' method. If you installed this plugin specifically on this Logstash version, it is not compatible. If you are a plugin author, please see https://www.elastic.co/guide/en/logstash/current/_how_to_write_a_logstash_output_plugin.html for more info"
end