Class: EventMachine::XS::Socket

Inherits:
Connection
  • Object
show all
Defined in:
lib/em-xs/socket.rb,
lib/em-xs/helpers.rb

Constant Summary collapse

TYPES_MAPPING =
{
  ::XS::REQ         => Sockets::Request,
  ::XS::REP         => Sockets::Reply,
  ::XS::ROUTER      => Sockets::Router,
  ::XS::DEALER      => Sockets::Dealer,
  ::XS::SUB         => Sockets::Subscriber,
  ::XS::PUB         => Sockets::Publisher,
  ::XS::SURVEYOR    => Sockets::Surveyor,
  ::XS::RESPONDENT  => Sockets::Respondent,
  ::XS::PUSH        => Sockets::Push,
  ::XS::PULL        => Sockets::Pull
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, socket_type, handler) ⇒ Socket

Returns a new instance of Socket.



7
8
9
10
11
# File 'lib/em-xs/socket.rb', line 7

def initialize(socket, socket_type, handler)
  @socket      = socket
  @socket_type = socket_type
  @handler     = handler
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



4
5
6
# File 'lib/em-xs/socket.rb', line 4

def handler
  @handler
end

#on_readableObject

Returns the value of attribute on_readable.



4
5
6
# File 'lib/em-xs/socket.rb', line 4

def on_readable
  @on_readable
end

#on_writableObject

Returns the value of attribute on_writable.



4
5
6
# File 'lib/em-xs/socket.rb', line 4

def on_writable
  @on_writable
end

#socketObject (readonly)

Returns the value of attribute socket.



5
6
7
# File 'lib/em-xs/socket.rb', line 5

def socket
  @socket
end

#socket_typeObject (readonly)

Returns the value of attribute socket_type.



5
6
7
# File 'lib/em-xs/socket.rb', line 5

def socket_type
  @socket_type
end

Class Method Details

.get_class_for_type(socket_type) ⇒ Object



18
19
20
# File 'lib/em-xs/helpers.rb', line 18

def self.get_class_for_type(socket_type)
  TYPES_MAPPING[socket_type]
end

.map_sockopt(opt, name, writer = true) ⇒ Object



13
14
15
16
17
18
# File 'lib/em-xs/socket.rb', line 13

def self.map_sockopt(opt, name, writer = true)
  define_method(name){ getsockopt(opt) }
  if writer
    define_method("#{name}="){|val| @socket.setsockopt(opt, val) }
  end
end

Instance Method Details

#bind(address) ⇒ Object

User method



44
45
46
# File 'lib/em-xs/socket.rb', line 44

def bind(address)
  @socket.bind(address)
end

#connect(address) ⇒ Object



48
49
50
# File 'lib/em-xs/socket.rb', line 48

def connect(address)
  @socket.connect(address)
end

#getsockopt(opt) ⇒ Object



89
90
91
92
93
94
95
96
97
# File 'lib/em-xs/socket.rb', line 89

def getsockopt(opt)
  ret = []
  rc = @socket.getsockopt(opt, ret)
  unless ::XS::Util.resultcode_ok?(rc)
    ::XS::Util.raise_error('getsockopt', rc)
  end

  (ret.size == 1) ? ret[0] : ret    
end

#notify_readableObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/em-xs/socket.rb', line 124

def notify_readable
  # Not sure if this is actually necessary. I suppose it prevents us
  # from having to to instantiate a XS::Message unnecessarily.
  # I'm leaving this is because its in the docs, but it could probably
  # be taken out.
  return unless readable?
   
  loop do
    msg_parts = []
    msg       = get_message
    if msg
      msg_parts << msg
      while @socket.more_parts?
        msg = get_message
        if msg
          msg_parts << msg
        else
          raise "Multi-part message missing a message!"
        end
      end
      
      @handler.on_readable(self, msg_parts)
    else
      break
    end
  end
end

#notify_writableObject



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/em-xs/socket.rb', line 152

def notify_writable
  return unless writable?
  
  # once a writable event is successfully received the socket
  # should be accepting messages again so stop triggering
  # write events
  self.notify_writable = false
  
  if @handler.respond_to?(:on_writable)
    @handler.on_writable(self)
  end
end

#readable?Boolean

Returns:

  • (Boolean)


164
165
166
# File 'lib/em-xs/socket.rb', line 164

def readable?
  (getsockopt(::XS::EVENTS) & ::XS::POLLIN) == ::XS::POLLIN
end

#register_readableObject

Make this socket available for reads



109
110
111
112
113
114
115
116
# File 'lib/em-xs/socket.rb', line 109

def register_readable
  if readable?
    notify_readable
  end
  
  # Subscribe to EM read notifications
  self.notify_readable = true
end

#register_writableObject

Trigger on_readable when socket is readable



119
120
121
122
# File 'lib/em-xs/socket.rb', line 119

def register_writable
  # Subscribe to EM write notifications
  self.notify_writable = true
end

#send_msg(*parts) ⇒ Object

send a non blocking message parts: if only one argument is given a signle part message is sent

if more than one arguments is given a multipart message is sent

return: true is message was queued, false otherwise



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/em-xs/socket.rb', line 58

def send_msg(*parts)
  parts = Array(parts[0]) if parts.size == 0
  sent = true
  
  # multipart
  parts[0...-1].each do |msg|
    sent = @socket.send_string(msg, ::XS::DONTWAIT | ::XS::SNDMORE)
    if sent == false
      break
    end
  end
  
  if sent
    # all the previous parts were queued, send
    # the last one
    ret = @socket.send_string(parts[-1], ::XS::DONTWAIT)
    if ret < 0
      raise "Unable to send message: #{::XS::Util.error_string}"
    end
  else
    # error while sending the previous parts
    # register the socket for writability
    self.notify_writable = true
    sent = false
  end
  
  notify_readable()
  
  sent
end

#setsockopt(opt, value) ⇒ Object



99
100
101
# File 'lib/em-xs/socket.rb', line 99

def setsockopt(opt, value)
  @socket.setsockopt(opt, value)
end

#unbindObject

cleanup when ending loop



104
105
106
# File 'lib/em-xs/socket.rb', line 104

def unbind
  detach_and_close
end

#writable?Boolean

Returns:

  • (Boolean)


168
169
170
171
172
# File 'lib/em-xs/socket.rb', line 168

def writable?
  true
  # return false
  # (getsockopt(::XS::EVENTS) & ::XS::POLLOUT) == ::XS::POLLOUT
end