10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/ey-core/subscribable.rb', line 10
def subscribe(&block)
return false unless read_channel_uri
gem 'faye', '~> 1.1'
require 'faye'
Faye.logger = Logger.new(STDOUT, level: "DEBUG") if ENV["DEBUG"]
uri = read_channel_uri
resource = self
url = uri.omit(:query).to_s
token = uri.query_values["token"]
subscription = uri.query_values["subscription"]
EM.run do
client = Faye::Client.new(url)
client.("Authorization", "Token #{token}")
next_ready_check = Time.now + 5
handle_output = Proc.new do |m|
next_ready_check = Time.now + 1
block.call(m)
end
deferred = client.subscribe(subscription) do |message|
handle_output.call(JSON.load(message))
end
deferred.callback do
handle_output.call({"meta" => true, "created_at" => Time.now,"message" => "log output stream connection established, waiting...\n"})
end
deferred.errback do |error|
handle_output.call({"meta" => true, "created_at" => Time.now, "message" => "failed to stream output: #{error.inspect}\n"})
EM.stop_event_loop
end
EventMachine::PeriodicTimer.new(1) do
if Time.now > next_ready_check
if resource.reload.ready?
handle_output.call({"meta" => true, "created_at" => Time.now, "message" => "#{resource} finished"})
EM.stop_event_loop
end
next_ready_check = Time.now + 5
end
end
end
end
|