Class: Carnivore::Source::UnixSocket

Inherits:
Carnivore::Source show all
Defined in:
lib/carnivore-unixsocket/unixsocket.rb

Overview

Unix socket based carnivore source

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#init_argsHash (readonly)

Returns:

  • (Hash)


15
16
17
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 15

def init_args
  @init_args
end

#socketString (readonly)

Returns path to socket.

Returns:

  • (String)

    path to socket



11
12
13
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 11

def socket
  @socket
end

#srv_nameString (readonly)

Returns:

  • (String)


13
14
15
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 13

def srv_name
  @srv_name
end

Instance Method Details

#connectionCelluloid::IO::UNIXSocket

Returns:

  • (Celluloid::IO::UNIXSocket)


34
35
36
37
38
39
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 34

def connection
  unless(@connection)
    @connection = UNIXSocket.new(socket)
  end
  @connection
end

#init_srvObject

Initialize the server



42
43
44
45
46
47
48
49
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 42

def init_srv
  @server = Carnivore::UnixSocket::Util::Server.new(
    :path => socket,
    :source => current_self
  )
  @server.async.start_collector!
  @server.async.start_server!
end

#process(*args) ⇒ Object

Override processing to enable server only if required



76
77
78
79
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 76

def process(*args)
  init_srv
  super
end

#receive(*args) ⇒ Object

Receive messages



52
53
54
55
56
57
58
59
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 52

def receive(*args)
  line = wait(:message)
  begin
    MultiJson.load(line)
  rescue MultiJson::ParseError
    line
  end
end

#serverUtil::Server

Returns:

  • (Util::Server)


29
30
31
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 29

def server
  @server
end

#setup(args = {}) ⇒ Object

Setup the unix socket

Parameters:

  • args (Hash) (defaults to: {})


20
21
22
23
24
25
26
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 20

def setup(args={})
  @write_lock = Mutex.new
  @socket = ::File.expand_path(args[:path])
  @srv_name = "socket_srv_#{name}".to_sym
  @connection = nil
  @init_args = args
end

#transmit(payload, original_message = nil) ⇒ Object

Send message

Parameters:

  • payload (Object)
  • original_message (Carnivore::Message) (defaults to: nil)


65
66
67
68
69
70
71
72
73
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 65

def transmit(payload, original_message=nil)
  output = payload.is_a?(String) ? payload : MultiJson.dump(payload)
  @write_lock.synchronize do
    defer do
      connection.puts(output)
      connection.flush
    end
  end
end