Class: ASIR::Transport::Zmq

Inherits:
ConnectionOriented
  • Object
show all
Defined in:
lib/asir/transport/zmq.rb

Overview

!SLIDE ZeroMQ Transport

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



10
11
12
# File 'lib/asir/transport/zmq.rb', line 10

def queue
  @queue
end

Instance Method Details

#_client_connect!Object

!SLIDE 0MQ client.



14
15
16
17
18
19
20
# File 'lib/asir/transport/zmq.rb', line 14

def _client_connect!
  sock = zmq_context.socket(one_way ? ZMQ::PUB : ZMQ::REQ)
  sock.connect(zmq_uri)
  sock
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
end

#_read(stream, context) ⇒ Object



52
53
54
# File 'lib/asir/transport/zmq.rb', line 52

def _read stream, context
  stream.recv 0
end

#_receive_result(message, opaque_result) ⇒ Object



33
34
35
36
# File 'lib/asir/transport/zmq.rb', line 33

def _receive_result message, opaque_result
  return nil if one_way || message.one_way
  super
end

#_send_result(message, result, result_payload, stream, message_state) ⇒ Object



38
39
40
41
# File 'lib/asir/transport/zmq.rb', line 38

def _send_result message, result, result_payload, stream, message_state
  return nil if one_way || message.one_way
  super
end

#_server!Object

!SLIDE 0MQ server.



24
25
26
27
28
29
30
31
# File 'lib/asir/transport/zmq.rb', line 24

def _server!
  sock = zmq_context.socket(one_way ? ZMQ::SUB : ZMQ::REP)
  sock.setsockopt(ZMQ::SUBSCRIBE, queue) if one_way
  sock.bind("tcp://*:#{port}") # WTF?: why doesn't tcp://localhost:PORT work?
  @server = sock
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
end

#_server_accept_connection!(server) ⇒ Object

server represents a receiving ZMQ endpoint.



75
76
77
# File 'lib/asir/transport/zmq.rb', line 75

def _server_accept_connection! server
  [ server, @one_way ? nil : server ]
end

#_server_close_connection!(in_stream, out_stream) ⇒ Object

Nothing to be closed for ZMQ.



87
88
89
# File 'lib/asir/transport/zmq.rb', line 87

def _server_close_connection! in_stream, out_stream
  # NOTHING
end

#_write(payload, stream, context) ⇒ Object



43
44
45
46
47
48
49
50
# File 'lib/asir/transport/zmq.rb', line 43

def _write payload, stream, context
  if one_way
    q = context && (context[:queue] || context[:zmq_queue])
    payload.insert(0, q || queue_)
  end
  stream.send payload, 0
  stream
end

#queue_Object



69
70
71
72
# File 'lib/asir/transport/zmq.rb', line 69

def queue_
  @queue_ ||=
    (queue.empty? ? queue : queue + " ").freeze
end

#stream_eof?(stream) ⇒ Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/asir/transport/zmq.rb', line 82

def stream_eof? stream
  false
end

#zmq_contextObject



100
101
102
103
# File 'lib/asir/transport/zmq.rb', line 100

def zmq_context
  @@zmq_context ||=
    ZMQ::Context.new(1)
end

#zmq_uriObject



91
92
93
94
95
96
97
98
# File 'lib/asir/transport/zmq.rb', line 91

def zmq_uri
  @zmq_uri ||=
    (
    u = URI.parse(uri)
    u.path = ''
    u.to_s.freeze
    )
end