Class: Carnivore::Source::UnixSocket
- Inherits:
-
Carnivore::Source
- Object
- Carnivore::Source
- Carnivore::Source::UnixSocket
- Defined in:
- lib/carnivore-unixsocket/unixsocket.rb
Overview
Unix socket based carnivore source
Constant Summary collapse
- INIT_SRV_TIMEOUT =
max time for unix socket to setup
2.0
Instance Attribute Summary collapse
- #init_args ⇒ Hash readonly
-
#socket ⇒ String
readonly
Path to socket.
- #srv_name ⇒ String readonly
Instance Method Summary collapse
- #connection ⇒ Celluloid::IO::UNIXSocket
-
#init_srv ⇒ Object
Initialize the server.
-
#process(*args) ⇒ Object
Override processing to enable server only if required.
-
#receive(*args) ⇒ Object
Receive messages.
- #server ⇒ Util::Server
-
#setup(args = {}) ⇒ Object
Setup the unix socket.
-
#transmit(payload, original_message = nil) ⇒ Object
Send message.
Instance Attribute Details
#init_args ⇒ Hash (readonly)
16 17 18 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 16 def init_args @init_args end |
#socket ⇒ String (readonly)
Returns path to socket.
12 13 14 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 12 def socket @socket end |
#srv_name ⇒ String (readonly)
14 15 16 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 14 def srv_name @srv_name end |
Instance Method Details
#connection ⇒ Celluloid::IO::UNIXSocket
34 35 36 37 38 39 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 34 def connection unless(@connection) @connection = Celluloid::IO::UNIXSocket.new(socket) end @connection end |
#init_srv ⇒ Object
Initialize the server
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 42 def init_srv callback_supervisor.supervise_as(srv_name, Carnivore::UnixSocket::Util::Server, init_args.merge(:notify_actor => current_actor) ) waited = 0.0 until(server || waited > INIT_SRV_TIMEOUT) sleep(0.01) waited += 0.01 end server.async.start end |
#process(*args) ⇒ Object
Override processing to enable server only if required
78 79 80 81 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 78 def process(*args) init_srv super end |
#receive(*args) ⇒ Object
Receive messages
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 56 def receive(*args) wait(:new_socket_lines) server.return_lines.map do |line| begin MultiJson.load(line) rescue MultiJson::ParseError line end end end |
#server ⇒ Util::Server
29 30 31 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 29 def server callback_supervisor[srv_name] end |
#setup(args = {}) ⇒ Object
Setup the unix socket
21 22 23 24 25 26 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 21 def setup(args={}) @socket = ::File.(args[:path]) @srv_name = "socket_srv_#{name}".to_sym @connection = nil @init_args = args end |
#transmit(payload, original_message = nil) ⇒ Object
Send message
71 72 73 74 75 |
# File 'lib/carnivore-unixsocket/unixsocket.rb', line 71 def transmit(payload, =nil) output = payload.is_a?(String) ? payload : MultiJson.dump(payload) connection.puts(output) connection.flush end |