Class: EventMachine::ZeroMQ::Socket

Inherits:
Connection
  • Object
show all
Defined in:
lib/em-zeromq/socket.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, socket_type, handler) ⇒ Socket

Returns a new instance of Socket.



7
8
9
10
11
# File 'lib/em-zeromq/socket.rb', line 7

def initialize(socket, socket_type, handler)
  @socket      = socket
  @socket_type = socket_type
  @handler     = handler
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



4
5
6
# File 'lib/em-zeromq/socket.rb', line 4

def handler
  @handler
end

#on_readableObject

Returns the value of attribute on_readable.



4
5
6
# File 'lib/em-zeromq/socket.rb', line 4

def on_readable
  @on_readable
end

#on_writableObject

Returns the value of attribute on_writable.



4
5
6
# File 'lib/em-zeromq/socket.rb', line 4

def on_writable
  @on_writable
end

#socketObject (readonly)

Returns the value of attribute socket.



5
6
7
# File 'lib/em-zeromq/socket.rb', line 5

def socket
  @socket
end

#socket_typeObject (readonly)

Returns the value of attribute socket_type.



5
6
7
# File 'lib/em-zeromq/socket.rb', line 5

def socket_type
  @socket_type
end

Class Method Details

.map_sockopt(opt, name) ⇒ Object



13
14
15
16
# File 'lib/em-zeromq/socket.rb', line 13

def self.map_sockopt(opt, name)
  define_method(name){ getsockopt(opt) }
  define_method("#{name}="){|val| @socket.setsockopt(opt, val) }
end

Instance Method Details

#bind(address) ⇒ Object

User method



31
32
33
# File 'lib/em-zeromq/socket.rb', line 31

def bind(address)
  @socket.bind(address)
end

#connect(address) ⇒ Object



35
36
37
# File 'lib/em-zeromq/socket.rb', line 35

def connect(address)
  @socket.connect(address)
end

#getsockopt(opt) ⇒ Object



86
87
88
89
90
91
92
93
94
# File 'lib/em-zeromq/socket.rb', line 86

def getsockopt(opt)
  ret = []
  rc = @socket.getsockopt(opt, ret)
  unless ZMQ::Util.resultcode_ok?(rc)
    raise ZMQOperationFailed, "getsockopt: #{ZMQ::Util.error_string}"
  end

  (ret.size == 1) ? ret[0] : ret    
end

#notify_readableObject



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/em-zeromq/socket.rb', line 121

def notify_readable
  # Not sure if this is actually necessary. I suppose it prevents us
  # from having to to instantiate a ZMQ::Message unnecessarily.
  # I'm leaving this is because its in the docs, but it could probably
  # be taken out.
  return unless readable?
   
  loop do
    msg_parts = []
    msg       = get_message
    if msg
      msg_parts << msg
      while @socket.more_parts?
        msg = get_message
        if msg
          msg_parts << msg
        else
          raise "Multi-part message missing a message!"
        end
      end
      
      @handler.on_readable(self, msg_parts)
    else
      break
    end
  end
end

#notify_writableObject



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/em-zeromq/socket.rb', line 149

def notify_writable
  return unless writable?
  
  # one a writable event is successfully received the socket
  # should be accepting messages again so stop triggering
  # write events
  self.notify_writable = false
  
  if @handler.respond_to?(:on_writable)
    @handler.on_writable(self)
  end
end

#readable?Boolean

Returns:

  • (Boolean)


161
162
163
# File 'lib/em-zeromq/socket.rb', line 161

def readable?
  (getsockopt(ZMQ::EVENTS) & ZMQ::POLLIN) == ZMQ::POLLIN
end

#register_readableObject

Make this socket available for reads



106
107
108
109
110
111
112
113
# File 'lib/em-zeromq/socket.rb', line 106

def register_readable
  # Since ZMQ is event triggered I think this is necessary
  if readable?
    notify_readable
  end
  # Subscribe to EM read notifications
  self.notify_readable = true
end

#register_writableObject

Trigger on_readable when socket is readable



116
117
118
119
# File 'lib/em-zeromq/socket.rb', line 116

def register_writable
  # Subscribe to EM write notifications
  self.notify_writable = true
end

#send_msg(*parts) ⇒ Object

send a non blocking message parts: if only one argument is given a signle part message is sent

if more than one arguments is given a multipart message is sent

return: true is message was queued, false otherwise



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/em-zeromq/socket.rb', line 55

def send_msg(*parts)
  parts = Array(parts[0]) if parts.size == 0
  sent = true
  
  # multipart
  parts[0...-1].each do |msg|
    sent = @socket.send_string(msg, ZMQ::NOBLOCK | ZMQ::SNDMORE)
    if sent == false
      break
    end
  end
  
  if sent
    # all the previous parts were queued, send
    # the last one
    ret = @socket.send_string(parts[-1], ZMQ::NOBLOCK)
    if ret < 0
      raise "Unable to send message: #{ZMQ::Util.error_string}"
    end
  else
    # error while sending the previous parts
    # register the socket for writability
    self.notify_writable = true
    sent = false
  end
  
  EM::next_tick{ notify_readable() }
  
  sent
end

#setsockopt(opt, value) ⇒ Object



96
97
98
# File 'lib/em-zeromq/socket.rb', line 96

def setsockopt(opt, value)
  @socket.setsockopt(opt, value)
end

#subscribe(what = '') ⇒ Object



39
40
41
42
# File 'lib/em-zeromq/socket.rb', line 39

def subscribe(what = '')
  raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
  @socket.setsockopt(ZMQ::SUBSCRIBE, what)
end

#unbindObject

cleanup when ending loop



101
102
103
# File 'lib/em-zeromq/socket.rb', line 101

def unbind
  detach_and_close
end

#unsubscribe(what) ⇒ Object



44
45
46
47
# File 'lib/em-zeromq/socket.rb', line 44

def unsubscribe(what)
  raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
  @socket.setsockopt(ZMQ::UNSUBSCRIBE, what)
end

#writable?Boolean

Returns:

  • (Boolean)


165
166
167
168
169
# File 'lib/em-zeromq/socket.rb', line 165

def writable?
  return true
  # ZMQ::EVENTS has issues in ZMQ HEAD, we'll ignore this till they're fixed
  # (getsockopt(ZMQ::EVENTS) & ZMQ::POLLOUT) == ZMQ::POLLOUT
end