Class: FluQ::Input::Base

Inherits:
Object
  • Object
show all
Includes:
Mixins::Loggable
Defined in:
lib/fluq/input/base.rb

Direct Known Subclasses

Socket

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Mixins::Loggable

#logger

Constructor Details

#initialize(reactor, options = {}) ⇒ Base

Returns a new instance of Base.

Parameters:

  • reactor (FluQ::Reactor)
  • options (Hash) (defaults to: {})

    various configuration options



12
13
14
15
16
# File 'lib/fluq/input/base.rb', line 12

def initialize(reactor, options = {})
  super()
  @reactor = reactor
  @config  = defaults.merge(options)
end

Instance Attribute Details

#configObject (readonly)



8
9
10
# File 'lib/fluq/input/base.rb', line 8

def config
  @config
end

#reactorObject (readonly)



5
6
7
# File 'lib/fluq/input/base.rb', line 5

def reactor
  @reactor
end

Instance Method Details

#flush!(buffer) ⇒ Object

Flushes and closes a buffer

Parameters:



35
36
37
38
39
40
41
42
43
# File 'lib/fluq/input/base.rb', line 35

def flush!(buffer)
  feed_klass.new(buffer).each_slice(10_000) do |events|
    reactor.process(events)
  end
rescue => ex
  logger.crash "#{self.class.name} failure: #{ex.message} (#{ex.class.name})", ex
ensure
  buffer.close if buffer
end

#nameString

Returns descriptive name.

Returns:

  • (String)

    descriptive name



19
20
21
# File 'lib/fluq/input/base.rb', line 19

def name
  @name ||= self.class.name.split("::")[-1].downcase
end

#new_bufferFluQ::Buffer::Base

Creates a new buffer object

Returns:



29
30
31
# File 'lib/fluq/input/base.rb', line 29

def new_buffer
  buffer_klass.new config[:buffer_options]
end

#runObject

Start the input



24
25
# File 'lib/fluq/input/base.rb', line 24

def run
end