Class: Cabin::Outputs::ZeroMQ

Inherits:
Object
  • Object
show all
Defined in:
lib/cabin/outputs/zeromq.rb

Overview

Output to a zeromq socket.

Constant Summary collapse

DEFAULTS =
{
  :topology => "pushpull",
  :hwm => 0, # zeromq default: no limit
  :linger => -1, # zeromq default: wait until all messages are sent.
  :topic => ""
}
CONTEXT =
ZMQ::Context.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(addresses, options = {}) ⇒ ZeroMQ

Create a new ZeroMQ output.

arguments: addresses A list of addresses to connect to. These are round-robined by zeromq.

:topology Either ‘pushpull’ or ‘pubsub’. Specifies which zeromq socket type to use. Default pushpull. :hwm Specifies the High Water Mark for the socket. Default 0, which means there is none. :linger Specifies the linger time in milliseconds for the socket. Default -1, meaning wait forever for the socket to close. :topic Specifies the topic for a pubsub topology. This can be a string or a proc with the event as the only argument.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/cabin/outputs/zeromq.rb', line 26

def initialize(addresses, options={})
  options = DEFAULTS.merge(options)

  @topology = options[:topology].to_s
  case @topology
  when "pushpull"
    socket_type = ZMQ::PUSH
  when "pubsub"
    socket_type = ZMQ::PUB
  end

  @topic = options[:topic]
  @socket = CONTEXT.socket(socket_type)
  
  Array(addresses).each do |address|
    error_check @socket.connect(address), "connecting to #{address}"
  end

  error_check @socket.setsockopt(ZMQ::LINGER, options[:linger]), "while setting ZMQ::LINGER to #{options[:linger]}"
  error_check @socket.setsockopt(ZMQ::HWM, options[:hwm]), "while setting ZMQ::HWM to #{options[:hwm]}"

  #TODO use cabin's teardown when it exists
  at_exit do
    teardown
  end

  #define_finalizer
end

Instance Attribute Details

#socketObject (readonly)

Returns the value of attribute socket.



15
16
17
# File 'lib/cabin/outputs/zeromq.rb', line 15

def socket
  @socket
end

#topicObject (readonly)

Returns the value of attribute topic.



15
16
17
# File 'lib/cabin/outputs/zeromq.rb', line 15

def topic
  @topic
end

#topologyObject (readonly)

Returns the value of attribute topology.



15
16
17
# File 'lib/cabin/outputs/zeromq.rb', line 15

def topology
  @topology
end

Instance Method Details

#<<(event) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/cabin/outputs/zeromq.rb', line 67

def <<(event)
  if @socket.name == "PUB"
    topic = @topic.is_a?(Proc) ? @topic.call(event) : @topic
    error_check @socket.send_string(topic, ZMQ::SNDMORE), "in topic send_string"
  end
  error_check @socket.send_string(event.inspect), "in send_string"
end

#hwmObject



61
62
63
64
65
# File 'lib/cabin/outputs/zeromq.rb', line 61

def hwm
  array = []
  error_check @socket.getsockopt(ZMQ::HWM, array), "while getting ZMQ::HWM"
  array.first
end

#lingerObject



55
56
57
58
59
# File 'lib/cabin/outputs/zeromq.rb', line 55

def linger
  array = []
  error_check @socket.getsockopt(ZMQ::LINGER, array), "while getting ZMQ::LINGER"
  array.first
end

#teardownObject



75
76
77
# File 'lib/cabin/outputs/zeromq.rb', line 75

def teardown
  @socket.close if @socket
end