Class: EZMQ::Subscriber

Inherits:
Socket
  • Object
show all
Defined in:
lib/ezmq/subscribe.rb

Overview

Subscribe socket that listens for messages with an optional topic.

Instance Attribute Summary

Attributes inherited from Socket

#context, #decode, #encode, #socket

Instance Method Summary collapse

Methods inherited from Socket

#connect, #send

Constructor Details

#initialize(mode = :connect, **options) ⇒ Publisher

Note:

The default behaviour is to output and messages received to STDOUT.

Creates a new Subscriber socket.

Parameters:

  • mode (:bind, :connect) (defaults to: :connect)

    (:connect) a mode for the socket.

  • options (Hash)

    optional parameters.

Options Hash (**options):

  • topic (String)

    a topic to subscribe to.

See Also:



18
19
20
21
# File 'lib/ezmq/subscribe.rb', line 18

def initialize(mode = :connect, **options)
  super mode, ZMQ::SUB, options
  subscribe options[:topic] if options[:topic]
end

Instance Method Details

#listen {|message, topic| ... } ⇒ void

This method returns an undefined value.

Like receive, but doesn’t stop at one message.

Yields:

  • (message, topic)

    passes the message body and topic to the block.

Yield Parameters:

  • message (String)

    the message received.

  • topic (String)

    the topic of the message.



58
59
60
61
62
# File 'lib/ezmq/subscribe.rb', line 58

def listen(&block)
  loop do
    block.call(*receive)
  end
end

#receive(**options) {|message, topic| ... } ⇒ Object

Note:

This method blocks until a message arrives.

Receive a message from the socket.

Parameters:

  • options (Hash)

    optional parameters.

Options Hash (**options):

  • decode (lambda)

    how to decode the message.

Yields:

  • (message, topic)

    passes the message body and topic to the block.

Yield Parameters:

  • message (Object)

    the message received (decoded).

  • topic (String)

    the topic of the message.

Returns:

  • (Object)

    the message received (decoded).



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/ezmq/subscribe.rb', line 36

def receive(**options)
  message = ''
  @socket.recv_string message

  message = message.match(/^(?<topic>[^\ ]*)\ (?<body>.*)/m)

  decoded = (options[:decode] || @decode).call message['body']
  if block_given?
    yield decoded, message['topic']
  else
    [decoded, message['topic']]
  end
end

#subscribe(topic) ⇒ Boolean

Note:

By default, a Subscriber filters all incoming messages. Without

Establishes a new message filter on the socket.

calling subscribe at least once, no messages will be accepted. If topic was provided, #initialize calls #subscribe automatically.

prefix will be accepted.

Parameters:

  • topic (String)

    a topic to subscribe to. Messages matching this

Returns:

  • (Boolean)

    was subscription successful?



75
76
77
# File 'lib/ezmq/subscribe.rb', line 75

def subscribe(topic)
  @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0
end

#unsubscribe(topic) ⇒ Boolean

Removes a message filter (as set with subscribe) from the socket.

Parameters:

  • topic (String)

    the topic to unsubscribe from. If multiple filters with the same topic are set, this will only remove one.

Returns:

  • (Boolean)

    was unsubscription successful?



86
87
88
# File 'lib/ezmq/subscribe.rb', line 86

def unsubscribe(topic)
  @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0
end