Class: ZMachine::ZMQChannel
Instance Attribute Summary collapse
Attributes inherited from Channel
#handler, #selector, #socket
Instance Method Summary
collapse
Constructor Details
#initialize(type, selector) ⇒ ZMQChannel
12
13
14
15
16
17
|
# File 'lib/zmachine/zmq_channel.rb', line 12
def initialize(type, selector)
super(selector)
@socket = ZMachine.context.create_socket(type)
@socket.linger = 0
@socket.set_router_mandatory(true) if type == ZMQ::ROUTER
end
|
Instance Attribute Details
#port ⇒ Object
Returns the value of attribute port.
10
11
12
|
# File 'lib/zmachine/zmq_channel.rb', line 10
def port
@port
end
|
Instance Method Details
#bind(address) ⇒ Object
23
24
25
|
# File 'lib/zmachine/zmq_channel.rb', line 23
def bind(address)
@port = @socket.bind(address)
end
|
#can_send? ⇒ Boolean
77
78
79
|
# File 'lib/zmachine/zmq_channel.rb', line 77
def can_send?
@socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT
end
|
#close ⇒ Object
35
36
37
38
39
40
41
42
|
# File 'lib/zmachine/zmq_channel.rb', line 35
def close
if @channel_key
@channel_key.cancel
@channel_key = nil
end
@socket.close
end
|
#connect(address) ⇒ Object
27
28
29
|
# File 'lib/zmachine/zmq_channel.rb', line 27
def connect(address)
@socket.connect(address) if address
end
|
#current_events ⇒ Object
96
97
98
|
# File 'lib/zmachine/zmq_channel.rb', line 96
def current_events
SelectionKey::OP_READ
end
|
#has_more? ⇒ Boolean
73
74
75
|
# File 'lib/zmachine/zmq_channel.rb', line 73
def has_more?
@socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
end
|
#identity=(value) ⇒ Object
31
32
33
|
# File 'lib/zmachine/zmq_channel.rb', line 31
def identity=(value)
@socket.identity = value.to_java_bytes
end
|
#peer_name ⇒ Object
86
87
88
89
|
# File 'lib/zmachine/zmq_channel.rb', line 86
def peer_name
sock = @socket.socket
[sock.port, sock.inet_address.host_address]
end
|
#read_inbound_data(buffer) ⇒ Object
55
56
57
58
|
# File 'lib/zmachine/zmq_channel.rb', line 55
def read_inbound_data(buffer)
return unless has_more?
ZMsg.recv_msg(@socket)
end
|
#register ⇒ Object
19
20
21
|
# File 'lib/zmachine/zmq_channel.rb', line 19
def register
@channel_key ||= @socket.fd.register(@selector, current_events, self)
end
|
#schedule_close(after_writing) ⇒ Object
81
82
83
|
# File 'lib/zmachine/zmq_channel.rb', line 81
def schedule_close(after_writing)
true
end
|
#send_data(data) ⇒ Object
44
45
46
|
# File 'lib/zmachine/zmq_channel.rb', line 44
def send_data(data)
@outbound_queue << data unless send_msg(data)
end
|
#send_msg(msg) ⇒ Object
48
49
50
51
52
53
|
# File 'lib/zmachine/zmq_channel.rb', line 48
def send_msg(msg)
msg.java_send(:send, [org.zeromq.ZMQ::Socket], @socket)
return true
rescue ZMQException
return false
end
|
#sock_name ⇒ Object
91
92
93
94
|
# File 'lib/zmachine/zmq_channel.rb', line 91
def sock_name
sock = @socket.socket
[sock.local_port, sock.local_address.host_address]
end
|
#write_outbound_data ⇒ Object
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/zmachine/zmq_channel.rb', line 60
def write_outbound_data
until @outbound_queue.empty?
data = @outbound_queue.first
if send_msg(data)
@outbound_queue.shift
else
break
end
end
return true
end
|