Class: Rpush::Daemon::AppRunner
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
reflect
Methods included from Loggable
#log_error, #log_info, #log_warn
Constructor Details
#initialize(app) ⇒ AppRunner
Returns a new instance of AppRunner.
68
69
70
71
|
# File 'lib/rpush/daemon/app_runner.rb', line 68
def initialize(app)
@app = app
@loops = []
end
|
Class Attribute Details
.runners ⇒ Object
Returns the value of attribute runners.
9
10
11
|
# File 'lib/rpush/daemon/app_runner.rb', line 9
def runners
@runners
end
|
Instance Attribute Details
#app ⇒ Object
Returns the value of attribute app.
65
66
67
|
# File 'lib/rpush/daemon/app_runner.rb', line 65
def app
@app
end
|
#batch ⇒ Object
Returns the value of attribute batch.
66
67
68
|
# File 'lib/rpush/daemon/app_runner.rb', line 66
def batch
@batch
end
|
Class Method Details
.debug ⇒ Object
53
54
55
|
# File 'lib/rpush/daemon/app_runner.rb', line 53
def self.debug
runners.values.map(&:debug)
end
|
.enqueue(notifications) ⇒ Object
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/rpush/daemon/app_runner.rb', line 14
def self.enqueue(notifications)
notifications.group_by(&:app_id).each do |app_id, group|
batch = Batch.new(group)
if app = runners[app_id]
app.enqueue(batch)
else
Rpush.logger.error("No such app '#{app_id}' for notifications #{batch.describe}.")
end
end
end
|
.idle ⇒ Object
57
58
59
|
# File 'lib/rpush/daemon/app_runner.rb', line 57
def self.idle
runners.values.select(&:idle?)
end
|
.stop ⇒ Object
48
49
50
51
|
# File 'lib/rpush/daemon/app_runner.rb', line 48
def self.stop
runners.values.map(&:stop)
runners.clear
end
|
.sync ⇒ Object
25
26
27
28
29
30
|
# File 'lib/rpush/daemon/app_runner.rb', line 25
def self.sync
apps = Rpush::App.all
apps.each { |app| sync_app(app) }
removed = runners.keys - apps.map(&:id)
removed.each { |app_id| runners.delete(app_id).stop }
end
|
.sync_app(app) ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/rpush/daemon/app_runner.rb', line 32
def self.sync_app(app)
if runners[app.id]
runners[app.id].sync(app)
else
runner = new(app)
begin
runner.start
runners[app.id] = runner
rescue StandardError => e
Rpush.logger.error("[#{app.name}] Exception raised during startup. Notifications will not be delivered for this app.")
Rpush.logger.error(e)
reflect(:error, e)
end
end
end
|
.wait ⇒ Object
61
62
63
|
# File 'lib/rpush/daemon/app_runner.rb', line 61
def self.wait
sleep 0.1 while !runners.values.all?(&:idle?)
end
|
Instance Method Details
#batch_processed ⇒ Object
137
138
139
|
# File 'lib/rpush/daemon/app_runner.rb', line 137
def batch_processed
batch ? batch.num_processed : 0
end
|
#batch_size ⇒ Object
133
134
135
|
# File 'lib/rpush/daemon/app_runner.rb', line 133
def batch_size
batch ? batch.num_notifications : 0
end
|
#debug ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
|
# File 'lib/rpush/daemon/app_runner.rb', line 113
def debug
Rpush.logger.info <<-EOS
#{@app.name}:
dispatchers: #{num_dispatchers}
queued: #{queue_size}
batch size: #{batch_size}
batch processed: #{batch_processed}
idle: #{idle?}
EOS
end
|
#decrement_dispatchers(num) ⇒ Object
105
106
107
|
# File 'lib/rpush/daemon/app_runner.rb', line 105
def decrement_dispatchers(num)
num.times { dispatchers.pop }
end
|
#enqueue(batch) ⇒ Object
84
85
86
87
88
89
90
|
# File 'lib/rpush/daemon/app_runner.rb', line 84
def enqueue(batch)
self.batch = batch
batch.notifications.each do |notification|
queue.push([notification, batch])
reflect(:notification_enqueued, notification)
end
end
|
#idle? ⇒ Boolean
125
126
127
|
# File 'lib/rpush/daemon/app_runner.rb', line 125
def idle?
batch ? batch.complete? : true
end
|
#increment_dispatchers(num) ⇒ Object
109
110
111
|
# File 'lib/rpush/daemon/app_runner.rb', line 109
def increment_dispatchers(num)
num.times { dispatchers.push(new_dispatcher_loop) }
end
|
#num_dispatchers ⇒ Object
141
142
143
|
# File 'lib/rpush/daemon/app_runner.rb', line 141
def num_dispatchers
dispatchers.size
end
|
#queue_size ⇒ Object
129
130
131
|
# File 'lib/rpush/daemon/app_runner.rb', line 129
def queue_size
queue.size
end
|
#start ⇒ Object
73
74
75
76
77
|
# File 'lib/rpush/daemon/app_runner.rb', line 73
def start
app.connections.times { dispatchers.push(new_dispatcher_loop) }
start_loops
log_info("Started, #{dispatchers_str}.")
end
|
#stop ⇒ Object
79
80
81
82
|
# File 'lib/rpush/daemon/app_runner.rb', line 79
def stop
dispatchers.stop
stop_loops
end
|
#sync(app) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/rpush/daemon/app_runner.rb', line 92
def sync(app)
@app = app
diff = dispatchers.size - app.connections
return if diff == 0
if diff > 0
decrement_dispatchers(diff)
log_info("Stopped #{dispatchers_str(diff)}. #{dispatchers_str} running.")
else
increment_dispatchers(diff.abs)
log_info("Started #{dispatchers_str(diff)}. #{dispatchers_str} running.")
end
end
|