Class: ASIR::Transport::Zmq
- Inherits:
-
ConnectionOriented
- Object
- ConnectionOriented
- ASIR::Transport::Zmq
- Defined in:
- lib/asir/transport/zmq.rb
Overview
!SLIDE ZeroMQ Transport
Instance Attribute Summary collapse
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
-
#_client_connect! ⇒ Object
!SLIDE 0MQ client.
- #_read(stream, context) ⇒ Object
- #_receive_result(message, opaque_result) ⇒ Object
- #_send_result(message, result, result_payload, stream, message_state) ⇒ Object
-
#_server! ⇒ Object
!SLIDE 0MQ server.
-
#_server_accept_connection!(server) ⇒ Object
server represents a receiving ZMQ endpoint.
-
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for ZMQ.
- #_write(payload, stream, context) ⇒ Object
- #queue_ ⇒ Object
- #stream_eof?(stream) ⇒ Boolean
- #zmq_context ⇒ Object
- #zmq_uri ⇒ Object
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
10 11 12 |
# File 'lib/asir/transport/zmq.rb', line 10 def queue @queue end |
Instance Method Details
#_client_connect! ⇒ Object
!SLIDE 0MQ client.
14 15 16 17 18 19 20 |
# File 'lib/asir/transport/zmq.rb', line 14 def _client_connect! sock = zmq_context.socket(one_way ? ZMQ::PUB : ZMQ::REQ) sock.connect(zmq_uri) sock rescue ::Exception => exc raise exc.class, "#{self.class} #{zmq_uri}: #{exc.}", exc.backtrace end |
#_read(stream, context) ⇒ Object
52 53 54 |
# File 'lib/asir/transport/zmq.rb', line 52 def _read stream, context stream.recv 0 end |
#_receive_result(message, opaque_result) ⇒ Object
33 34 35 36 |
# File 'lib/asir/transport/zmq.rb', line 33 def _receive_result , opaque_result return nil if one_way || .one_way super end |
#_send_result(message, result, result_payload, stream, message_state) ⇒ Object
38 39 40 41 |
# File 'lib/asir/transport/zmq.rb', line 38 def _send_result , result, result_payload, stream, return nil if one_way || .one_way super end |
#_server! ⇒ Object
!SLIDE 0MQ server.
24 25 26 27 28 29 30 31 |
# File 'lib/asir/transport/zmq.rb', line 24 def _server! sock = zmq_context.socket(one_way ? ZMQ::SUB : ZMQ::REP) sock.setsockopt(ZMQ::SUBSCRIBE, queue) if one_way sock.bind("tcp://*:#{port}") # WTF?: why doesn't tcp://localhost:PORT work? @server = sock rescue ::Exception => exc raise exc.class, "#{self.class} #{zmq_uri}: #{exc.}", exc.backtrace end |
#_server_accept_connection!(server) ⇒ Object
server represents a receiving ZMQ endpoint.
75 76 77 |
# File 'lib/asir/transport/zmq.rb', line 75 def _server_accept_connection! server [ server, @one_way ? nil : server ] end |
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for ZMQ.
87 88 89 |
# File 'lib/asir/transport/zmq.rb', line 87 def _server_close_connection! in_stream, out_stream # NOTHING end |
#_write(payload, stream, context) ⇒ Object
43 44 45 46 47 48 49 50 |
# File 'lib/asir/transport/zmq.rb', line 43 def _write payload, stream, context if one_way q = context && (context[:queue] || context[:zmq_queue]) payload.insert(0, q || queue_) end stream.send payload, 0 stream end |
#queue_ ⇒ Object
69 70 71 72 |
# File 'lib/asir/transport/zmq.rb', line 69 def queue_ @queue_ ||= (queue.empty? ? queue : queue + " ").freeze end |
#stream_eof?(stream) ⇒ Boolean
82 83 84 |
# File 'lib/asir/transport/zmq.rb', line 82 def stream_eof? stream false end |
#zmq_context ⇒ Object
100 101 102 103 |
# File 'lib/asir/transport/zmq.rb', line 100 def zmq_context @@zmq_context ||= ZMQ::Context.new(1) end |
#zmq_uri ⇒ Object
91 92 93 94 95 96 97 98 |
# File 'lib/asir/transport/zmq.rb', line 91 def zmq_uri @zmq_uri ||= ( u = URI.parse(uri) u.path = '' u.to_s.freeze ) end |