Class: XRBP::WebSocket::Plugins::MessageDispatcher

Inherits:
PluginBase
  • Object
show all
Defined in:
lib/xrbp/websocket/plugins/message_dispatcher.rb

Overview

Dispatch messages & wait for responses (w/ optional timeout). This module allows the client to track messages sent to the server, waiting for responses up to a maximum time. An overridable callback method is provided to match responses to messages. Most often the end-user will not use this plugin directly but rather through CommandDispatcher which inherits it / extends it to issue and track structured commands.

See Also:

Direct Known Subclasses

CommandDispatcher

Constant Summary collapse

DEFAULT_TIMEOUT =
10

Instance Attribute Summary collapse

Attributes inherited from PluginBase

#connection

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ MessageDispatcher

Returns a new instance of MessageDispatcher.



26
27
28
29
30
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 26

def initialize(connection)
  super(connection)
  @message_timeout = DEFAULT_TIMEOUT
  @messages = []
end

Instance Attribute Details

#message_timeoutObject

Returns the value of attribute message_timeout.



24
25
26
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 24

def message_timeout
  @message_timeout
end

#messagesObject (readonly)

Returns the value of attribute messages.



23
24
25
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 23

def messages
  @messages
end

Instance Method Details

#addedObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 32

def added
  plugin = self

  connection.define_instance_method(:message_timeout=) do |t|
    plugin.message_timeout = t

    connections.each{ |c|
      c.plugin(MessageDispatcher)
       .message_timeout = t
    } if self.kind_of?(MultiConnection)
  end

  connection.define_instance_method(:msg) do |msg, &bl|
    return next_connection.msg msg, &bl if self.kind_of?(MultiConnection)

    msg = Message.new(msg) unless msg.kind_of?(Message)
    msg.connection = self
    msg.time = Time.now
    msg.bl = bl if bl

    unless self.open?
      if plugin.try_next(msg)
        return nil if bl
               msg.wait
        return msg.result

      else
        msg.bl.call nil if bl
        return nil
      end
    end

    plugin.messages << msg

    send_data msg.to_s

    return nil if bl
    msg.wait
    msg.result
  end

  connection.on :close do
    plugin.cancel_all_messages
  end unless connection.kind_of?(MultiConnection)
end

#cancel_all_messagesObject



136
137
138
139
140
141
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 136

def cancel_all_messages
  # copy array as we modify original during iteration
  Array.new(messages).each { |msg|
    cancel_message(msg)
  }
end

#cancel_message(msg) ⇒ Object

FIXME: I believe there is issue causing deadlock at process

termination where subsequent pages in paginated cmds
are timing out. Since when retrieving messages
synchronously, the first message block will be used
to wait for the results and on timeout cancel_message
will be called with the _latest_ message, the wait
block never gets unlocked.


129
130
131
132
133
134
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 129

def cancel_message(msg)
  connection.state_mutex.synchronize {
    messages.delete(msg)
    msg.signal
  }
end

#closedObject



171
172
173
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 171

def closed
  terminate!
end

#match_message(msg) ⇒ Object

Should be overridden in subclass return request message & formatted response given raw response



81
82
83
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 81

def match_message(msg)
  nil
end

#message(res) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 93

def message(res)
  req, res = match_message(res)
  return unless req
  messages.delete(req)

  return unless unlock!(req, res)

  begin
    res = parse_result(res, req)
  rescue Exception => e
    if try_next(req)
      return

    else
      res = nil
    end
  end

  req.bl.call(res)
end

#openedObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 145

def opened
  connection.add_work do
    # XXX remove force_quit? condition check from this loop,
    #     so we're sure messages always timeout, even on force quit.
    #     Always ensure close! is called after websocket is no longer
    #     being used!
    until terminate? || connection.closed?
      now = Time.now
      tmsgs = Array.new(messages)
      tmsgs.each { |msg|
        if now - msg.time > @message_timeout
          connection.emit :timeout, msg

          cancel_message(msg) unless try_next(msg)

          # XXX manually close the connection as
          #     a broken pipe will not stop websocket polling
          connection.async_close!
        end
      }

      connection.rsleep(0.1)
    end
  end
end

#parsing_pluginsObject



19
20
21
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 19

def parsing_plugins
  connection.plugins
end

#try_next(msg) ⇒ Object



114
115
116
117
118
119
120
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 114

def try_next(msg)
  conn = connection.next_connection(msg.connection)
  return false unless !!conn
  messages.delete(msg)
  conn.msg(msg, &msg.bl)
  true
end

#unlock!(req, res) ⇒ Object

Return bool if message,response is read to be unlocked / returned to client. Allows other plugins to block message unlocking



87
88
89
90
91
# File 'lib/xrbp/websocket/plugins/message_dispatcher.rb', line 87

def unlock!(req, res)
  !connection.plugins.any? { |plg|
    plg != self && plg.respond_to?(:unlock!) && !plg.unlock!(req, res)
  }
end