Class: ZMachine::ZMQChannel

Inherits:
Channel
  • Object
show all
Defined in:
lib/zmachine/zmq_channel.rb

Instance Attribute Summary

Attributes inherited from Channel

#socket

Instance Method Summary collapse

Methods inherited from Channel

#close_after_writing

Constructor Details

#initialize(type) ⇒ ZMQChannel

Returns a new instance of ZMQChannel.



20
21
22
23
24
25
26
# File 'lib/zmachine/zmq_channel.rb', line 20

def initialize(type)
  super()
  @socket = ZMachine.context.create_socket(type)
  @bound = false
  @connected = false
  @closed = false
end

Instance Method Details

#bind(address, port = nil) ⇒ Object



39
40
41
42
# File 'lib/zmachine/zmq_channel.rb', line 39

def bind(address, port = nil)
  @bound = true
  @socket.bind(address)
end

#bound?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/zmachine/zmq_channel.rb', line 44

def bound?
  @bound
end

#can_send?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/zmachine/zmq_channel.rb', line 107

def can_send?
  super and (@socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT)
end

#close(after_writing = false) ⇒ Object



121
122
123
124
125
126
127
# File 'lib/zmachine/zmq_channel.rb', line 121

def close(after_writing = false)
  super
  @closed = true
  @connected = false
  @bound = false
  ZMachine.context.destroySocket(@socket) unless can_send?
end

#closed?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/zmachine/zmq_channel.rb', line 129

def closed?
  @closed
end

#connect(address) ⇒ Object



48
49
50
51
# File 'lib/zmachine/zmq_channel.rb', line 48

def connect(address)
  @connected = true
  @socket.connect(address)
end

#connected?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/zmachine/zmq_channel.rb', line 57

def connected?
  @connected
end

#connection_pending?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/zmachine/zmq_channel.rb', line 53

def connection_pending?
  false
end

#has_more?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/zmachine/zmq_channel.rb', line 103

def has_more?
  @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
end

#identityObject



31
32
33
# File 'lib/zmachine/zmq_channel.rb', line 31

def identity
  @socket ? @socket.identity : nil
end

#identity=(v) ⇒ Object



28
29
30
# File 'lib/zmachine/zmq_channel.rb', line 28

def identity=(v)
  @socket.identity = v if @socket
end

#peerObject

Raises:

  • (RuntimeError)


133
134
135
# File 'lib/zmachine/zmq_channel.rb', line 133

def peer
  raise RuntimeError.new("ZMQChannel has no peer")
end

#read_inbound_dataObject



61
62
63
64
65
66
67
# File 'lib/zmachine/zmq_channel.rb', line 61

def read_inbound_data
  data = [@socket.recv_byte_array(0)]
  while @socket.hasReceiveMore
    data << @socket.recv_byte_array(0)
  end
  data
end

#selectable_fdObject



35
36
37
# File 'lib/zmachine/zmq_channel.rb', line 35

def selectable_fd
  @socket.fd
end

#send1(a) ⇒ Object

to get around iterating over an array in #send_data we pass message parts as arguments



81
82
83
# File 'lib/zmachine/zmq_channel.rb', line 81

def send1(a)
  @socket.send_byte_array(a, ZMQ::DONTWAIT)
end

#send2(a, b) ⇒ Object



85
86
87
88
# File 'lib/zmachine/zmq_channel.rb', line 85

def send2(a, b)
  @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT)
  @socket.send_byte_array(b, ZMQ::DONTWAIT)
end

#send3(a, b, c) ⇒ Object



90
91
92
93
94
# File 'lib/zmachine/zmq_channel.rb', line 90

def send3(a, b, c)
  @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT)
  @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT)
  @socket.send_byte_array(c, ZMQ::DONTWAIT)
end

#send4(a, b, c, d) ⇒ Object



96
97
98
99
100
101
# File 'lib/zmachine/zmq_channel.rb', line 96

def send4(a, b, c, d)
  @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT)
  @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT)
  @socket.send_byte_array(c, ZMQ::SNDMORE | ZMQ::DONTWAIT)
  @socket.send_byte_array(d, ZMQ::DONTWAIT)
end

#send_data(data) ⇒ Object



69
70
71
72
73
74
75
76
77
# File 'lib/zmachine/zmq_channel.rb', line 69

def send_data(data)
  parts, last = data[0..-2], data.last
  parts.each do |part|
    @socket.send_byte_array(part, ZMQ::SNDMORE | ZMQ::DONTWAIT)
  end
  @socket.send_byte_array(last, ZMQ::DONTWAIT)
rescue ZMQException
  @outbound_queue << data
end

#write_outbound_dataObject



111
112
113
114
115
116
117
118
119
# File 'lib/zmachine/zmq_channel.rb', line 111

def write_outbound_data
  while can_send?
    data = @outbound_queue.shift
    send_data(data)
  end

  close if @close_scheduled
  return true
end