Class: ASIR::Transport::Zmq

Inherits:
ConnectionOriented show all
Defined in:
lib/asir/transport/zmq.rb

Overview

!SLIDE ZeroMQ Transport

Constant Summary

Constants included from PayloadIO

PayloadIO::FOOTER, PayloadIO::HEADER

Constants included from UriConfig

UriConfig::S_LOCALHOST, UriConfig::S_TCP

Constants included from ASIR::ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from UriConfig

#host, #host_default, #path, #path_default, #port, #port_default, #scheme, #scheme_default, #uri

Attributes inherited from ASIR::Transport

#after_receive_message, #before_send_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose

Attributes included from Log

#_logger

Instance Method Summary collapse

Methods inherited from ConnectionOriented

#_after_connect!, #_before_close!, #_connect!, #_receive_message, #_send_message, #_server_close!, #connect!, #prepare_server!, #run_server!, #serve_connection!, #server_on_start!, #server_on_stop!, #stream

Methods included from PayloadIO

#_read_line_and_expect!, #close

Methods included from UriConfig

#_uri, #address, #protocol

Methods inherited from Stream

#_serve_stream!, #serve_stream!, #serve_stream_message!

Methods inherited from ASIR::Transport

#_subclass_responsibility, #initialize, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #stop!, #with_server_signals!

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from Initialization

#initialize

Methods included from AdditionalData

#[], #[]=, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#relative_message_delay!, #wait_for_delay!

Methods included from ASIR::ThreadVariable

included, setter

Constructor Details

This class inherits a constructor from ASIR::Transport

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



10
11
12
# File 'lib/asir/transport/zmq.rb', line 10

def queue
  @queue
end

Instance Method Details

#_client_connect!Object

!SLIDE 0MQ client.



14
15
16
17
18
19
20
# File 'lib/asir/transport/zmq.rb', line 14

def _client_connect!
  sock = zmq_context.socket(one_way ? ZMQ::PUB : ZMQ::REQ)
  sock.connect(zmq_uri)
  sock
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
end

#_read(stream) ⇒ Object



49
50
51
# File 'lib/asir/transport/zmq.rb', line 49

def _read stream
  stream.recv 0
end

#_receive_result(message, opaque_result) ⇒ Object



33
34
35
36
# File 'lib/asir/transport/zmq.rb', line 33

def _receive_result message, opaque_result
  return nil if one_way || message.one_way
  super
end

#_send_result(message, result, result_payload, stream, message_state) ⇒ Object



38
39
40
41
# File 'lib/asir/transport/zmq.rb', line 38

def _send_result message, result, result_payload, stream, message_state
  return nil if one_way || message.one_way
  super
end

#_server!Object

!SLIDE 0MQ server.



24
25
26
27
28
29
30
31
# File 'lib/asir/transport/zmq.rb', line 24

def _server!
  sock = zmq_context.socket(one_way ? ZMQ::SUB : ZMQ::REP)
  sock.setsockopt(ZMQ::SUBSCRIBE, queue) if one_way
  sock.bind("tcp://*:#{port}") # WTF?: why doesn't tcp://localhost:PORT work?
  @server = sock
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
end

#_server_accept_connection!(server) ⇒ Object

server represents a receiving ZMQ endpoint.



72
73
74
# File 'lib/asir/transport/zmq.rb', line 72

def _server_accept_connection! server
  [ server, @one_way ? nil : server ]
end

#_server_close_connection!(in_stream, out_stream) ⇒ Object

Nothing to be closed for ZMQ.



84
85
86
# File 'lib/asir/transport/zmq.rb', line 84

def _server_close_connection! in_stream, out_stream
  # NOTHING
end

#_write(payload, stream) ⇒ Object



43
44
45
46
47
# File 'lib/asir/transport/zmq.rb', line 43

def _write payload, stream
  payload.insert(0, queue_) if one_way
  stream.send payload, 0
  stream
end

#queue_Object



66
67
68
69
# File 'lib/asir/transport/zmq.rb', line 66

def queue_
  @queue_ ||=
    (queue.empty? ? queue : queue + " ").freeze
end

#stream_eof?(stream) ⇒ Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/asir/transport/zmq.rb', line 79

def stream_eof? stream
  false
end

#zmq_contextObject



97
98
99
100
# File 'lib/asir/transport/zmq.rb', line 97

def zmq_context
  @@zmq_context ||=
    ZMQ::Context.new(1)
end

#zmq_uriObject



88
89
90
91
92
93
94
95
# File 'lib/asir/transport/zmq.rb', line 88

def zmq_uri
  @zmq_uri ||=
    (
    u = URI.parse(uri)
    u.path = ''
    u.to_s.freeze
    )
end