Class: DripDrop::ZMQSubHandler

Inherits:
ZMQBaseHandler show all
Includes:
ZMQReadableHandler
Defined in:
lib/dripdrop/handlers/zeromq.rb

Instance Attribute Summary collapse

Attributes included from ZMQReadableHandler

#message_class

Attributes inherited from ZMQBaseHandler

#connection

Instance Method Summary collapse

Methods included from ZMQReadableHandler

#decode_message

Methods inherited from ZMQBaseHandler

#add_connection, #address, #on_receive, #on_recv, #read_connection, #write_connection

Methods inherited from BaseHandler

#handle_error, #on_error, #print_exception

Constructor Details

#initialize(*args) ⇒ ZMQSubHandler

Returns a new instance of ZMQSubHandler.



126
127
128
129
# File 'lib/dripdrop/handlers/zeromq.rb', line 126

def initialize(*args)
  super(*args)
  self.topic_filter = @opts[:topic_filter]
end

Instance Attribute Details

#topic_filterObject

Returns the value of attribute topic_filter.



124
125
126
# File 'lib/dripdrop/handlers/zeromq.rb', line 124

def topic_filter
  @topic_filter
end

Instance Method Details

#on_readable(socket, messages) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/dripdrop/handlers/zeromq.rb', line 131

def on_readable(socket, messages)
  begin
    if @msg_format == :dripdrop
      unless messages.length == 2
        return false
      end
      topic = messages.shift.copy_out_string
      if @topic_filter.nil? || topic.match(@topic_filter)
        body  = messages.shift.copy_out_string
        @recv_cbak.call(decode_message(body))
      end
    else
      super(socket,messages)
    end
  rescue StandardError => e
    handle_error(e)
  end
end

#post_setupObject



150
151
152
153
# File 'lib/dripdrop/handlers/zeromq.rb', line 150

def post_setup
  super
  @connection.socket.setsockopt(ZMQ::SUBSCRIBE, '')
end