Class: Carnivore::Source::UnixSocket

Inherits:
Carnivore::Source show all
Defined in:
lib/carnivore-unixsocket/unixsocket.rb

Overview

Unix socket based carnivore source

Constant Summary collapse

INIT_SRV_TIMEOUT =

max time for unix socket to setup

2.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#init_argsHash (readonly)

Returns:

  • (Hash)


16
17
18
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 16

def init_args
  @init_args
end

#socketString (readonly)

Returns path to socket.

Returns:

  • (String)

    path to socket



12
13
14
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 12

def socket
  @socket
end

#srv_nameString (readonly)

Returns:

  • (String)


14
15
16
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 14

def srv_name
  @srv_name
end

Instance Method Details

#connectionCelluloid::IO::UNIXSocket

Returns:

  • (Celluloid::IO::UNIXSocket)


34
35
36
37
38
39
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 34

def connection
  unless(@connection)
    @connection = Celluloid::IO::UNIXSocket.new(socket)
  end
  @connection
end

#init_srvObject

Initialize the server



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 42

def init_srv
  callback_supervisor.supervise_as(srv_name,
    Carnivore::UnixSocket::Util::Server,
    init_args.merge(:notify_actor => current_actor)
  )
  waited = 0.0
  until(server || waited > INIT_SRV_TIMEOUT)
    sleep(0.01)
    waited += 0.01
  end
  server.async.start
end

#process(*args) ⇒ Object

Override processing to enable server only if required



78
79
80
81
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 78

def process(*args)
  init_srv
  super
end

#receive(*args) ⇒ Object

Receive messages



56
57
58
59
60
61
62
63
64
65
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 56

def receive(*args)
  wait(:new_socket_lines)
  server.return_lines.map do |line|
    begin
      MultiJson.load(line)
    rescue MultiJson::ParseError
      line
    end
  end
end

#serverUtil::Server

Returns:

  • (Util::Server)


29
30
31
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 29

def server
  callback_supervisor[srv_name]
end

#setup(args = {}) ⇒ Object

Setup the unix socket

Parameters:

  • args (Hash) (defaults to: {})


21
22
23
24
25
26
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 21

def setup(args={})
  @socket = ::File.expand_path(args[:path])
  @srv_name = "socket_srv_#{name}".to_sym
  @connection = nil
  @init_args = args
end

#transmit(payload, original_message = nil) ⇒ Object

Send message

Parameters:

  • payload (Object)
  • original_message (Carnivore::Message) (defaults to: nil)


71
72
73
74
75
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 71

def transmit(payload, original_message=nil)
  output = payload.is_a?(String) ? payload : MultiJson.dump(payload)
  connection.puts(output)
  connection.flush
end