56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# File 'lib/omf-web/rack/websocket_handler.rb', line 56
def on_register_data_source(args, context)
dsp = find_data_source(args)
return unless dsp
debug "Received registration for datasource proxy '#{dsp}'"
send_data({type: 'reply', status: 'ok'}.to_json)
mutex = Mutex.new
semaphore = ConditionVariable.new
action_queue = {}
dsp.on_changed(args['offset']) do |action, rows|
mutex.synchronize do
(action_queue[action] ||= []).concat(rows)
semaphore.signal
end
end
Thread.new do
begin
loop do
mutex.synchronize do
action_queue.each do |action, rows|
next if rows.empty?
debug "Sending '#{action}' message with #{rows.length} rows"
msg = {
type: 'datasource_update',
datasource: dsp.name,
rows: rows,
action: action
}
send_data(msg.to_json.encode("iso-8859-1").force_encoding("UTF-8"))
rows.clear
end
semaphore.wait(mutex)
end
sleep MESSAGE_DELAY
end
rescue Exception => ex
error "on_register_data_source - #{ex}"
debug "#{ex.backtrace.join("\n")}"
end
end
end
|