17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/simplepubsub.rb', line 17
def self.start(host: '0.0.0.0', port: 59000, brokers: [])
EM.run do
subscribers = {}
WebSocket::EventMachine::Server.start(host: host, port: port) do |ws|
ws.onopen do
end
ws.onmessage do |msg, type|
msg = '' if not msg[0][/[\w:]/]
a = msg.lstrip.split(/\s*:\s/,2)
def ws.subscriber?()
false
end
if a.first == 'subscribe to topic' then
topic = a.last.rstrip.gsub('+','*')\
.gsub('#','*//').gsub(/\bor/,'|')
subscribers[topic] ||= []
subscribers[topic] << ws
def ws.subscriber_topic=(topic) @topic = topic end
def ws.subscriber_topic() @topic end
ws.subscriber_topic = topic
elsif a.length > 1 and a.first != ''
current_topic, message = a
if current_topic[0] == ':' then
current_topic.sub!(/:\w+\//,'')
elsif brokers.any?
brokers.each do |broker|
hostx, portx = broker.split(':',2)
portx ||= port
fqm = ":%s/%s: %s" % [Socket.gethostname, current_topic, message]
begin
SPSPub.notice fqm, host: hostx, port: portx
rescue
puts "warning couldn\'t send to %s:%s" % [hostx, portx]
end
end
end
if not current_topic[0] == '/' and \
not current_topic =~ /[^a-zA-Z0-9\/_ ]$/ then
begin
reg = XMLRegistry.new
reg[current_topic] = message
rescue
puts 'simplepubsub.rb warning: ' + ($!).inspect
end
subscribers.each do |topic,conns|
xpath = topic.split('/')\
.map {|x| x.to_i.to_s == x ? x.prepend('x') : x}\
.join('/')
node = reg.doc.root.xpath xpath.sub(/\S\b$/,'\0/text()')
if node.any? then
conns.each {|x| x.send current_topic + ': ' + message}
end
end
reg = nil
end
end
end
ws.onclose do
if ws.respond_to? :subscriber_topic then
subscribers[ws.subscriber_topic].delete ws
end
end
end
end
end
|