Class: Carnivore::Source::CarnFile

Inherits:
Carnivore::Source show all
Defined in:
lib/carnivore-files/carn_file.rb

Overview

Carnivore source for consumption from files

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#fetcherSymbol (readonly)

Returns registry name of fetcher.

Returns:

  • (Symbol)

    registry name of fetcher



11
12
13
# File 'lib/carnivore-files/carn_file.rb', line 11

def fetcher
  @fetcher
end

#message_queueQueue (readonly)

Returns queue to hold messages.

Returns:

  • (Queue)

    queue to hold messages



13
14
15
# File 'lib/carnivore-files/carn_file.rb', line 13

def message_queue
  @message_queue
end

#pathString (readonly)

Returns path to file.

Returns:

  • (String)

    path to file



9
10
11
# File 'lib/carnivore-files/carn_file.rb', line 9

def path
  @path
end

Instance Method Details

#connectObject

Start the line fetcher



29
30
31
32
33
34
35
36
37
# File 'lib/carnivore-files/carn_file.rb', line 29

def connect
  case args[:foundation].to_sym
  when :poll
    @fetcher = Carnivore::Files::Util::Fetcher::Poll.new(args.merge(:queue => message_queue))
  else
    @fetcher = Carnivore::Files::Util::Fetcher::Penguin.new(args.merge(:queue => message_queue))
  end
  fetcher.async.start_fetcher
end

#receive(*_) ⇒ Array<Hash>

Return messages

Returns:

  • (Array<Hash>)

    return messages



40
41
42
# File 'lib/carnivore-files/carn_file.rb', line 40

def receive(*_)
  defer{ message_queue.pop }
end

#setup(*_) ⇒ Object

Setup source

Parameters:

  • args (Hash)


20
21
22
23
24
25
26
# File 'lib/carnivore-files/carn_file.rb', line 20

def setup(*_)
  @path = ::File.expand_path(args[:path])
  @message_queue = Queue.new
  unless(args[:foundation])
    args[:foundation] = RUBY_PLATFORM == 'java' ? :poll : :penguin
  end
end

#transmit(payload, *args) ⇒ Object

Send payload

Parameters:

  • payload (Object)

    payload to transmit



47
48
49
# File 'lib/carnivore-files/carn_file.rb', line 47

def transmit(payload, *args)
  fetcher.write_line(payload)
end