Class: DripDrop::ZMQXRepHandler

Inherits:
ZMQBaseHandler show all
Includes:
ZMQReadableHandler, ZMQWritableHandler
Defined in:
lib/dripdrop/handlers/zeromq.rb

Defined Under Namespace

Classes: Response

Instance Attribute Summary

Attributes included from ZMQReadableHandler

#message_class

Attributes inherited from ZMQBaseHandler

#connection

Instance Method Summary collapse

Methods included from ZMQReadableHandler

#decode_message

Methods included from ZMQWritableHandler

#on_writable

Methods inherited from ZMQBaseHandler

#add_connection, #address, #on_receive, #on_recv, #post_setup, #read_connection, #write_connection

Methods inherited from BaseHandler

#handle_error, #on_error, #print_exception

Constructor Details

#initialize(*args) ⇒ ZMQXRepHandler

Returns a new instance of ZMQXRepHandler.



182
183
184
# File 'lib/dripdrop/handlers/zeromq.rb', line 182

def initialize(*args)
  super(*args)
end

Instance Method Details

#on_readable(socket, messages) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/dripdrop/handlers/zeromq.rb', line 186

def on_readable(socket,messages)
  if @msg_format == :dripdrop
    begin
      if messages.length < 3
        raise "Expected message in at least 3 parts, got #{messages.map(&:copy_out_string).inspect}"
      end
       
      message_strings = messages.map(&:copy_out_string)
      
      # parse the message into identities, delimiter and body
      identities = []
      delimiter  = nil
      body       = nil
      # It's an identitiy if it isn't an empty string
      # Once we hit the delimiter, we know the rest after is the body
      message_strings.each_with_index do |ms,i|
        unless ms.empty?
          identities << ms
        else
          delimiter = ms
           
          unless message_strings.length == i+2
            raise "Expected body in 1 part got '#{message_strings.inspect}'"
          end
           
          body  = message_strings[i+1]
          break
        end
      end
      
      raise "Received xreq message with no body!" unless body
      message    = decode_message(body)
      raise "Received nil message! #{body}" unless message
      seq        = message.head[SEQ_CTR_KEY]
      response   = ZMQXRepHandler::Response.new(self,identities,seq,@message_class)
      @recv_cbak.call(message,response) if @recv_cbak
    rescue StandardError => e
      handle_error(e)
    end
  else
    super(socket,messages)
  end
end

#send_message(message, identities, seq) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
# File 'lib/dripdrop/handlers/zeromq.rb', line 230

def send_message(message,identities,seq)
  if message.is_a?(DripDrop::Message)
    message.head[SEQ_CTR_KEY] = seq
     
    resp  = identities + ['', message.encoded]
    super(resp)
  else
    resp  = identities + ['', message]
    super(resp)
  end
end