17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
# File 'lib/simplepubsub.rb', line 17
def self.start(host: '0.0.0.0', port: 59000, brokers: [], debug: false)
EM.run do
subscribers = {}
WebSocket::EventMachine::Server.start(host: host, port: port) do |ws|
ws.onopen do
end
ws.onmessage do |msg, type|
puts 'msg: ' + msg.inspect if debug
next if msg.empty?
msg = '' if not msg[0][/[\w:]/]
a = msg.lstrip.split(/\s*:\s/,2)
def ws.subscriber?()
false
end
if a.first == 'subscribe to topic' then
topic = a.last.rstrip.gsub('+','*')\
.gsub('#','*//').gsub(/\bor/,'|')
subscribers[topic] ||= []
subscribers[topic] << ws
def ws.subscriber_topic=(topic) @topic = topic end
def ws.subscriber_topic() @topic end
ws.subscriber_topic = topic
elsif a.first == 'shutdown' then
puts 'shutting down ...'
EM.stop
elsif a.length > 1 and a.first != ''
current_topic, message = a
if current_topic[0] == ':' then
current_topic.sub!(/:\w+\//,'')
elsif brokers.any?
brokers.each do |broker|
hostx, portx = broker.split(':',2)
portx ||= port
fqm = ":%s/%s: %s" % [Socket.gethostname, current_topic, message]
begin
SPSPub.notice fqm, host: hostx, port: portx
rescue
puts "warning couldn\'t send to %s:%s" % [hostx, portx]
end
end
end
if not current_topic[0] == '/' and \
not current_topic =~ /[^a-zA-Z0-9\/_ ]$/ then
begin
reg = XMLRegistry.new
reg[current_topic] = message
rescue
puts 'simplepubsub.rb warning: ' + ($!).inspect
end
subscribers.each do |topic,conns|
xpath = topic.split('/')\
.map {|x| x.to_i.to_s == x ? x.prepend('x') : x}\
.join('/')
node = reg.doc.root.xpath xpath.sub(/\S\b$/,'\0/text()')
if node.any? then
conns.each {|x| x.send current_topic + ': ' + message}
end
end
reg = nil
end
end
end
ws.onclose do
if ws.respond_to? :subscriber_topic then
subscribers[ws.subscriber_topic].delete ws
end
end
end
end
end
|