Class: ASIR::Transport::Zmq
- Inherits:
-
ConnectionOriented
- Object
- ASIR::Transport
- Stream
- ConnectionOriented
- ASIR::Transport::Zmq
- Defined in:
- lib/asir/transport/zmq.rb
Overview
!SLIDE ZeroMQ Transport
Constant Summary
Constants included from PayloadIO
PayloadIO::FOOTER, PayloadIO::HEADER
Constants included from UriConfig
UriConfig::S_LOCALHOST, UriConfig::S_TCP
Constants included from ASIR::ThreadVariable
ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER
Instance Attribute Summary collapse
-
#queue ⇒ Object
Returns the value of attribute queue.
Attributes included from UriConfig
#host, #host_default, #path, #path_default, #port, #port_default, #scheme, #scheme_default, #uri
Attributes inherited from ASIR::Transport
#after_receive_message, #before_send_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose
Attributes included from Log
Instance Method Summary collapse
-
#_client_connect! ⇒ Object
!SLIDE 0MQ client.
- #_read(stream) ⇒ Object
- #_receive_result(message, opaque_result) ⇒ Object
- #_send_result(message, result, result_payload, stream, message_state) ⇒ Object
-
#_server! ⇒ Object
!SLIDE 0MQ server.
-
#_server_accept_connection!(server) ⇒ Object
server represents a receiving ZMQ endpoint.
-
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for ZMQ.
- #_write(payload, stream) ⇒ Object
- #queue_ ⇒ Object
- #stream_eof?(stream) ⇒ Boolean
- #zmq_context ⇒ Object
- #zmq_uri ⇒ Object
Methods inherited from ConnectionOriented
#_after_connect!, #_before_close!, #_connect!, #_receive_message, #_send_message, #_server_close!, #connect!, #prepare_server!, #run_server!, #serve_connection!, #server_on_start!, #server_on_stop!, #stream
Methods included from PayloadIO
#_read_line_and_expect!, #close
Methods included from UriConfig
Methods inherited from Stream
#_serve_stream!, #serve_stream!, #serve_stream_message!
Methods inherited from ASIR::Transport
#_subclass_responsibility, #initialize, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #stop!, #with_server_signals!
Methods included from Log
#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included
Methods included from Initialization
Methods included from AdditionalData
#[], #[]=, #additional_data, #additional_data!, #additional_data=, included
Methods included from Message::Delay
#relative_message_delay!, #wait_for_delay!
Methods included from ASIR::ThreadVariable
Constructor Details
This class inherits a constructor from ASIR::Transport
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
10 11 12 |
# File 'lib/asir/transport/zmq.rb', line 10 def queue @queue end |
Instance Method Details
#_client_connect! ⇒ Object
!SLIDE 0MQ client.
14 15 16 17 18 19 20 |
# File 'lib/asir/transport/zmq.rb', line 14 def _client_connect! sock = zmq_context.socket(one_way ? ZMQ::PUB : ZMQ::REQ) sock.connect(zmq_uri) sock rescue ::Exception => exc raise exc.class, "#{self.class} #{zmq_uri}: #{exc.}", exc.backtrace end |
#_read(stream) ⇒ Object
49 50 51 |
# File 'lib/asir/transport/zmq.rb', line 49 def _read stream stream.recv 0 end |
#_receive_result(message, opaque_result) ⇒ Object
33 34 35 36 |
# File 'lib/asir/transport/zmq.rb', line 33 def _receive_result , opaque_result return nil if one_way || .one_way super end |
#_send_result(message, result, result_payload, stream, message_state) ⇒ Object
38 39 40 41 |
# File 'lib/asir/transport/zmq.rb', line 38 def _send_result , result, result_payload, stream, return nil if one_way || .one_way super end |
#_server! ⇒ Object
!SLIDE 0MQ server.
24 25 26 27 28 29 30 31 |
# File 'lib/asir/transport/zmq.rb', line 24 def _server! sock = zmq_context.socket(one_way ? ZMQ::SUB : ZMQ::REP) sock.setsockopt(ZMQ::SUBSCRIBE, queue) if one_way sock.bind("tcp://*:#{port}") # WTF?: why doesn't tcp://localhost:PORT work? @server = sock rescue ::Exception => exc raise exc.class, "#{self.class} #{zmq_uri}: #{exc.}", exc.backtrace end |
#_server_accept_connection!(server) ⇒ Object
server represents a receiving ZMQ endpoint.
72 73 74 |
# File 'lib/asir/transport/zmq.rb', line 72 def _server_accept_connection! server [ server, @one_way ? nil : server ] end |
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for ZMQ.
84 85 86 |
# File 'lib/asir/transport/zmq.rb', line 84 def _server_close_connection! in_stream, out_stream # NOTHING end |
#_write(payload, stream) ⇒ Object
43 44 45 46 47 |
# File 'lib/asir/transport/zmq.rb', line 43 def _write payload, stream payload.insert(0, queue_) if one_way stream.send payload, 0 stream end |
#queue_ ⇒ Object
66 67 68 69 |
# File 'lib/asir/transport/zmq.rb', line 66 def queue_ @queue_ ||= (queue.empty? ? queue : queue + " ").freeze end |
#stream_eof?(stream) ⇒ Boolean
79 80 81 |
# File 'lib/asir/transport/zmq.rb', line 79 def stream_eof? stream false end |
#zmq_context ⇒ Object
97 98 99 100 |
# File 'lib/asir/transport/zmq.rb', line 97 def zmq_context @@zmq_context ||= ZMQ::Context.new(1) end |
#zmq_uri ⇒ Object
88 89 90 91 92 93 94 95 |
# File 'lib/asir/transport/zmq.rb', line 88 def zmq_uri @zmq_uri ||= ( u = URI.parse(uri) u.path = '' u.to_s.freeze ) end |