EventMachine ZMTP1.0 protocol (ZMQ2.x protocol)

It is implementation of ZMTP 1.0 - ZMQ 2.x transport protocol using facilites provided by EventMachine. There are implementations of ZMQ socket types which try to be similar to original, but not too hard. Moreover, you may create your own behaviour. Library is tested against native ZMQ using ffi-rzmq.

Installation

Add this line to your application's Gemfile:

gem 'em-zmq-tp10'

And then execute:

$ bundle

Or install it yourself as:

$ gem install em-zmq-tp10

Usage

Library provides callback oriented classes which tries to emulate behaviour of standard ZMQ classes, but already integrated with EventMachine eventloop.

Main difference in behaviour is in highwatermark handling and balancing: For DEALER and REQ zmq provides roundrobin load balancing until HighWaterMark reached, then send operation blocks or returns error EAGAIN. This implementation do roundrobin until fixed internal highwatermark (2048bytes) reached, and then pushes to common queue until userdefined watermark is reached.

This internal per connect buffer is handled by EventMachine itself, and there is no precise control over it, so that, if peer is disconnected, all pushed messages to this buffer are lost. So that, compared to raw ZMQ, you loose not only content of OS's internal socket buffer, but EventMachine buffer as well :(

But since ZMQ is never pretended on durability, it is not big issue (for me).

There is two strategy of highwatermark handling: drop_first and drop_last. drop_last - is ignoring any try to send message if queue is full - this is default strategy for ZMQ if you use nonblocking sending. drop_first - dropping earliest message in a queue, so that newly inserted message will have more chanches to be sent. You can react on such dropping by overriding cancel_message (or cancel_request for Req). I like this strategy more cause old request tends to be less usefull, but drop_last is still default for "compatibility".

There is also simplified classes without internal queue (PreDealer, PreReq, PreRouter, PreRep, PrePub), so that you can implement your own strategy of queueing.

And you could do any crazy thing using base EM::Protocols::Zmq2::Socket class

Socket

Base class. It provides #connect and #bind methods for establishing endpoints. This method could be called outside EM event loop (even before EM.run called), cause they use EM.schedule. TCP and IPC endpoints are supported and fully interoperable with native ZMQ. INPROC supported as well, but you should treat them as connections inside EventMachine's context, so that you could not connect to native ZMQ inproc endpoints.

Dealer

  class MyPreDealer < EM::Protocols::Zmq2::PreDealer
    def receive_message(message)
      puts "Message received: #{message.inspect}"
    end
  end
  dealer = MyPreDealer.new
  dealer.connect('tcp://127.0.0.1:8000')
  dealer.bind('unix://dealer.sock')
  EM.schedule {
    if !dealer.send_message(['asdf','fdas'])
      puts "Could not send message (no free peers)"
    end
  }

  class MyDealer < EM::Protocols::Zmq2::Dealer
    def receive_message(message)
      puts "Message received: #{message.inspect}"
    end
  end
  dealer = MyDealer.new(hwm: 1000, hwm_strategy: :drop_last)
  dealer.connect('tcp://127.0.0.1:8000')
  EM.schedule {
    if !dealer.send_message(['asdf','fdas'])
      puts "No free peers and outgoing queue is full"
    end
  }

  dealer = EM::Protocols::Zmq2::DealerCb.new do |message|
     puts "Receive message #{message.inspect}"
  end
  dealer.connect('ipc://rep')
  EM.schedule {
    dealer.send_message(['hello','world'])
  }

Req

  class MyPreReq < EM::Protocols::Zmq2::PreReq
    def receive_reply(message, data, request_id)
      puts "Received message #{message} and stored data #{data}
    end
  end
  req = MyPreReq.new
  req.bind(...)
  req.connect(...)
  if request_id = req.send_request(['this is', 'message'], 'saved_data')
    puts "Message sent"
  else
    puts "there is no free peer"
  end

  class MyReq < EM::Protocols::Zmq2::PreReq
    def receive_reply(message, data, request_id)
      puts "Received message #{message} and stored data #{data}
    end
  end
  req = MyReq.new
  req.bind(...)
  req.connect(...)
  if request_id = req.send_request(['hi'], 'ho')
    puts "Message sent"
  end

  req = EM::Protocols::Zmq2::ReqCb.new
  req.bind('ipc://req')
  timer = nil
  request_id = req.send_request(['hello', 'world']) do |message|
    EM.cancel_timer(timer)
    puts "Message #{message}"
  end
  if request_id
    timer = EM.add_timer(1) {
      req.cancel_request(request_id)
    }
  end

  req = EM::Protocols::Zmq2::ReqDefer.new
  req.bind('ipc://req')
  data = {hi: 'ho'}
  deferable = req.send_request(['hello', 'world'], data) do |reply, data|
    puts "Reply received #{reply} #{data}"
  end
  deferable.timeout 1
  deferable.errback do
    puts "Message canceled"
  end
  deferable.callback do |reply, data|
    puts "Another callback #{reply} #{data}"
  end

Router

Router stores peer identity in a message, as ZMQ router do. And it sends message to a peer, which idenitity equals to first message string. PreRouter does no any queue caching, Router saves message in queue per peer, controlled by highwatermark strategy.

  class MyPreRouter < EM::Protocols::Zmq2::PreRouter
    def receive_message(message)
      puts "Received message #{message} (and it contains envelope)"
    end
  end
  router = MyPreRouter.new
  router.bind(...)
  router.send_message(message)

  class MyRouter < EM::Protocols::Zmq2::Router
    def receive_message(message)
      puts "Received message #{message}"
      message[-1] = 'reply'
      send_message(message)
    end
  end
  router = MyPreRouter.new(hwm: 1000, hwm_strategy: :drop_first)
  router.bind(...)
  router.send_message(message)

Rep

REP differs from Router mainly in methods signature.

  class EchoBangPreRep < EM::Protocols::Zmq2::PreRep
    def receive_request(message, envelope)
      message << "!"
      if send_reply(message, envelope)
        puts "reply sent successfuly"
      end
    end
  end
  rep = EchoBangPreRep.new
  rep.bind('ipc://rep')

  class EchoBangRep < EM::Protocols::Zmq2::Rep
    def receive_request(message, envelope)
      message << "!"
      if send_reply(message, envelope)
        puts "reply sent successfuly"
      end
    end
  end
  rep = EchoBangRep.new
  rep.bind('ipc://rep')

Sub

Unless ZMQ sub, this Sub accepts not only strings, but also RegExps and procs for subscribing. Note that as in ZMQ 2.x filtering occurs on Sub side.

Since subscriptions could be defined with callback passed to :subscribe option, subscribe or subscribe_many methods, you could use this class without overloading.

  class MySub < EM::Protocols::Zmq2::Sub
    def receive_message(message)
      puts "default handler"
    end
  end
  sub = MySub.new(subscribe: ['this', 'that'])
  sub.subscribe /^callback/i, do |message|
    puts "Callback subscribe #{message}"
  end
  sub.subscribe_many(
    proc{|s| s.end_with?("END")} => proc{|message| puts "TILL END #{message}"},
    '' => nil # also to default
  )

Pub

PrePub sends messages only to connected and not busy peers. send_message returns true, if there is at least one peer with short EventMachine's outgoing queue, to which message is scheduled.

Pub tries to queue messages for all connected and for disconnected peers with explicit identity set.

Since there is no incoming data, there is no need to overload methods.

  pub = EM::Protocols::Zmq2::PrePub.new
  pub.bind(...)
  pub.send_message(['hi', 'you'])

  pub = EM::Protocols::Zmq2::Pub.new
  pub.bind(...)
  pub.send_message(['hi', 'you'])

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Added some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request