Class: ZMachine::ZMQChannel
- Defined in:
- lib/zmachine/zmq_channel.rb
Instance Attribute Summary
Attributes inherited from Channel
Instance Method Summary collapse
- #bind(address, port = nil) ⇒ Object
- #bound? ⇒ Boolean
- #can_send? ⇒ Boolean
- #close(after_writing = false) ⇒ Object
- #closed? ⇒ Boolean
- #connect(address) ⇒ Object
- #connected? ⇒ Boolean
- #connection_pending? ⇒ Boolean
- #has_more? ⇒ Boolean
- #identity ⇒ Object
- #identity=(v) ⇒ Object
-
#initialize(type) ⇒ ZMQChannel
constructor
A new instance of ZMQChannel.
- #peer ⇒ Object
- #read_inbound_data ⇒ Object
- #selectable_fd ⇒ Object
-
#send1(a) ⇒ Object
to get around iterating over an array in #send_data we pass message parts as arguments.
- #send2(a, b) ⇒ Object
- #send3(a, b, c) ⇒ Object
- #send4(a, b, c, d) ⇒ Object
- #send_data(data) ⇒ Object
- #write_outbound_data ⇒ Object
Methods inherited from Channel
Constructor Details
#initialize(type) ⇒ ZMQChannel
Returns a new instance of ZMQChannel.
20 21 22 23 24 25 26 |
# File 'lib/zmachine/zmq_channel.rb', line 20 def initialize(type) super() @socket = ZMachine.context.create_socket(type) @bound = false @connected = false @closed = false end |
Instance Method Details
#bind(address, port = nil) ⇒ Object
39 40 41 42 |
# File 'lib/zmachine/zmq_channel.rb', line 39 def bind(address, port = nil) @bound = true @socket.bind(address) end |
#bound? ⇒ Boolean
44 45 46 |
# File 'lib/zmachine/zmq_channel.rb', line 44 def bound? @bound end |
#can_send? ⇒ Boolean
107 108 109 |
# File 'lib/zmachine/zmq_channel.rb', line 107 def can_send? super and (@socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT) end |
#close(after_writing = false) ⇒ Object
121 122 123 124 125 126 127 |
# File 'lib/zmachine/zmq_channel.rb', line 121 def close(after_writing = false) super @closed = true @connected = false @bound = false ZMachine.context.destroySocket(@socket) unless can_send? end |
#closed? ⇒ Boolean
129 130 131 |
# File 'lib/zmachine/zmq_channel.rb', line 129 def closed? @closed end |
#connect(address) ⇒ Object
48 49 50 51 |
# File 'lib/zmachine/zmq_channel.rb', line 48 def connect(address) @connected = true @socket.connect(address) end |
#connected? ⇒ Boolean
57 58 59 |
# File 'lib/zmachine/zmq_channel.rb', line 57 def connected? @connected end |
#connection_pending? ⇒ Boolean
53 54 55 |
# File 'lib/zmachine/zmq_channel.rb', line 53 def connection_pending? false end |
#has_more? ⇒ Boolean
103 104 105 |
# File 'lib/zmachine/zmq_channel.rb', line 103 def has_more? @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN end |
#identity ⇒ Object
31 32 33 |
# File 'lib/zmachine/zmq_channel.rb', line 31 def identity @socket ? @socket.identity : nil end |
#identity=(v) ⇒ Object
28 29 30 |
# File 'lib/zmachine/zmq_channel.rb', line 28 def identity=(v) @socket.identity = v if @socket end |
#peer ⇒ Object
133 134 135 |
# File 'lib/zmachine/zmq_channel.rb', line 133 def peer raise RuntimeError.new("ZMQChannel has no peer") end |
#read_inbound_data ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/zmachine/zmq_channel.rb', line 61 def read_inbound_data data = [@socket.recv_byte_array(0)] while @socket.hasReceiveMore data << @socket.recv_byte_array(0) end data end |
#selectable_fd ⇒ Object
35 36 37 |
# File 'lib/zmachine/zmq_channel.rb', line 35 def selectable_fd @socket.fd end |
#send1(a) ⇒ Object
to get around iterating over an array in #send_data we pass message parts as arguments
81 82 83 |
# File 'lib/zmachine/zmq_channel.rb', line 81 def send1(a) @socket.send_byte_array(a, ZMQ::DONTWAIT) end |
#send2(a, b) ⇒ Object
85 86 87 88 |
# File 'lib/zmachine/zmq_channel.rb', line 85 def send2(a, b) @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) @socket.send_byte_array(b, ZMQ::DONTWAIT) end |
#send3(a, b, c) ⇒ Object
90 91 92 93 94 |
# File 'lib/zmachine/zmq_channel.rb', line 90 def send3(a, b, c) @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) @socket.send_byte_array(c, ZMQ::DONTWAIT) end |
#send4(a, b, c, d) ⇒ Object
96 97 98 99 100 101 |
# File 'lib/zmachine/zmq_channel.rb', line 96 def send4(a, b, c, d) @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) @socket.send_byte_array(c, ZMQ::SNDMORE | ZMQ::DONTWAIT) @socket.send_byte_array(d, ZMQ::DONTWAIT) end |
#send_data(data) ⇒ Object
69 70 71 72 73 74 75 76 77 |
# File 'lib/zmachine/zmq_channel.rb', line 69 def send_data(data) parts, last = data[0..-2], data.last parts.each do |part| @socket.send_byte_array(part, ZMQ::SNDMORE | ZMQ::DONTWAIT) end @socket.send_byte_array(last, ZMQ::DONTWAIT) rescue ZMQException @outbound_queue << data end |
#write_outbound_data ⇒ Object
111 112 113 114 115 116 117 118 119 |
# File 'lib/zmachine/zmq_channel.rb', line 111 def write_outbound_data while can_send? data = @outbound_queue.shift send_data(data) end close if @close_scheduled return true end |