Class: ZMachine::ZMQChannel

Inherits:
Channel
  • Object
show all
Defined in:
lib/zmachine/zmq_channel.rb

Instance Attribute Summary collapse

Attributes inherited from Channel

#handler, #selector, #socket

Instance Method Summary collapse

Constructor Details

#initialize(type, selector) ⇒ ZMQChannel



12
13
14
15
16
17
# File 'lib/zmachine/zmq_channel.rb', line 12

def initialize(type, selector)
  super(selector)
  @socket = ZMachine.context.create_socket(type)
  @socket.linger = 0
  @socket.set_router_mandatory(true) if type == ZMQ::ROUTER
end

Instance Attribute Details

#portObject (readonly)

Returns the value of attribute port.



10
11
12
# File 'lib/zmachine/zmq_channel.rb', line 10

def port
  @port
end

Instance Method Details

#bind(address) ⇒ Object



23
24
25
# File 'lib/zmachine/zmq_channel.rb', line 23

def bind(address)
  @port = @socket.bind(address)
end

#can_send?Boolean



77
78
79
# File 'lib/zmachine/zmq_channel.rb', line 77

def can_send?
  @socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT
end

#closeObject



35
36
37
38
39
40
41
42
# File 'lib/zmachine/zmq_channel.rb', line 35

def close
  if @channel_key
    @channel_key.cancel
    @channel_key = nil
  end

  @socket.close
end

#connect(address) ⇒ Object



27
28
29
# File 'lib/zmachine/zmq_channel.rb', line 27

def connect(address)
  @socket.connect(address) if address
end

#current_eventsObject



96
97
98
# File 'lib/zmachine/zmq_channel.rb', line 96

def current_events
  SelectionKey::OP_READ
end

#has_more?Boolean



73
74
75
# File 'lib/zmachine/zmq_channel.rb', line 73

def has_more?
  @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
end

#identity=(value) ⇒ Object



31
32
33
# File 'lib/zmachine/zmq_channel.rb', line 31

def identity=(value)
  @socket.identity = value.to_java_bytes
end

#peer_nameObject

TODO: fix me



86
87
88
89
# File 'lib/zmachine/zmq_channel.rb', line 86

def peer_name
  sock = @socket.socket
  [sock.port, sock.inet_address.host_address]
end

#read_inbound_data(buffer) ⇒ Object



55
56
57
58
# File 'lib/zmachine/zmq_channel.rb', line 55

def read_inbound_data(buffer)
  return unless has_more?
  ZMsg.recv_msg(@socket)
end

#registerObject



19
20
21
# File 'lib/zmachine/zmq_channel.rb', line 19

def register
  @channel_key ||= @socket.fd.register(@selector, current_events, self)
end

#schedule_close(after_writing) ⇒ Object



81
82
83
# File 'lib/zmachine/zmq_channel.rb', line 81

def schedule_close(after_writing)
  true
end

#send_data(data) ⇒ Object



44
45
46
# File 'lib/zmachine/zmq_channel.rb', line 44

def send_data(data)
  @outbound_queue << data unless send_msg(data)
end

#send_msg(msg) ⇒ Object



48
49
50
51
52
53
# File 'lib/zmachine/zmq_channel.rb', line 48

def send_msg(msg)
  msg.java_send(:send, [org.zeromq.ZMQ::Socket], @socket)
  return true
rescue ZMQException
  return false
end

#sock_nameObject



91
92
93
94
# File 'lib/zmachine/zmq_channel.rb', line 91

def sock_name
  sock = @socket.socket
  [sock.local_port, sock.local_address.host_address]
end

#write_outbound_dataObject



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/zmachine/zmq_channel.rb', line 60

def write_outbound_data
  until @outbound_queue.empty?
    data = @outbound_queue.first
    if send_msg(data)
      @outbound_queue.shift
    else
      break
    end
  end

  return true
end