Class: Rpush::Daemon::AppRunner

Inherits:
Object
  • Object
show all
Extended by:
Reflectable
Includes:
Loggable, Reflectable
Defined in:
lib/rpush/daemon/app_runner.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Reflectable

reflect

Methods included from Loggable

#log_error, #log_info, #log_warn

Constructor Details

#initialize(app) ⇒ AppRunner

Returns a new instance of AppRunner.



68
69
70
71
# File 'lib/rpush/daemon/app_runner.rb', line 68

def initialize(app)
  @app = app
  @loops = []
end

Class Attribute Details

.runnersObject (readonly)

Returns the value of attribute runners.



9
10
11
# File 'lib/rpush/daemon/app_runner.rb', line 9

def runners
  @runners
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



65
66
67
# File 'lib/rpush/daemon/app_runner.rb', line 65

def app
  @app
end

#batchObject

Returns the value of attribute batch.



66
67
68
# File 'lib/rpush/daemon/app_runner.rb', line 66

def batch
  @batch
end

Class Method Details

.debugObject



53
54
55
# File 'lib/rpush/daemon/app_runner.rb', line 53

def self.debug
  runners.values.map(&:debug)
end

.enqueue(notifications) ⇒ Object



14
15
16
17
18
19
20
21
22
23
# File 'lib/rpush/daemon/app_runner.rb', line 14

def self.enqueue(notifications)
  notifications.group_by(&:app_id).each do |app_id, group|
    batch = Batch.new(group)
    if app = runners[app_id]
      app.enqueue(batch)
    else
      Rpush.logger.error("No such app '#{app_id}' for notifications #{batch.describe}.")
    end
  end
end

.idleObject



57
58
59
# File 'lib/rpush/daemon/app_runner.rb', line 57

def self.idle
  runners.values.select(&:idle?)
end

.stopObject



48
49
50
51
# File 'lib/rpush/daemon/app_runner.rb', line 48

def self.stop
  runners.values.map(&:stop)
  runners.clear
end

.syncObject



25
26
27
28
29
30
# File 'lib/rpush/daemon/app_runner.rb', line 25

def self.sync
  apps = Rpush::App.all
  apps.each { |app| sync_app(app) }
  removed = runners.keys - apps.map(&:id)
  removed.each { |app_id| runners.delete(app_id).stop }
end

.sync_app(app) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rpush/daemon/app_runner.rb', line 32

def self.sync_app(app)
  if runners[app.id]
    runners[app.id].sync(app)
  else
    runner = new(app)
    begin
      runner.start
      runners[app.id] = runner
    rescue StandardError => e
      Rpush.logger.error("[#{app.name}] Exception raised during startup. Notifications will not be delivered for this app.")
      Rpush.logger.error(e)
      reflect(:error, e)
    end
  end
end

.waitObject



61
62
63
# File 'lib/rpush/daemon/app_runner.rb', line 61

def self.wait
  sleep 0.1 while !runners.values.all?(&:idle?)
end

Instance Method Details

#batch_processedObject



137
138
139
# File 'lib/rpush/daemon/app_runner.rb', line 137

def batch_processed
  batch ? batch.num_processed : 0
end

#batch_sizeObject



133
134
135
# File 'lib/rpush/daemon/app_runner.rb', line 133

def batch_size
  batch ? batch.num_notifications : 0
end

#debugObject



113
114
115
116
117
118
119
120
121
122
123
# File 'lib/rpush/daemon/app_runner.rb', line 113

def debug
  Rpush.logger.info <<-EOS

#{@app.name}:
  dispatchers: #{num_dispatchers}
  queued: #{queue_size}
  batch size: #{batch_size}
  batch processed: #{batch_processed}
  idle: #{idle?}
  EOS
end

#decrement_dispatchers(num) ⇒ Object



105
106
107
# File 'lib/rpush/daemon/app_runner.rb', line 105

def decrement_dispatchers(num)
  num.times { dispatchers.pop }
end

#enqueue(batch) ⇒ Object



84
85
86
87
88
89
90
# File 'lib/rpush/daemon/app_runner.rb', line 84

def enqueue(batch)
  self.batch = batch
  batch.notifications.each do |notification|
    queue.push([notification, batch])
    reflect(:notification_enqueued, notification)
  end
end

#idle?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/rpush/daemon/app_runner.rb', line 125

def idle?
  batch ? batch.complete? : true
end

#increment_dispatchers(num) ⇒ Object



109
110
111
# File 'lib/rpush/daemon/app_runner.rb', line 109

def increment_dispatchers(num)
  num.times { dispatchers.push(new_dispatcher_loop) }
end

#num_dispatchersObject



141
142
143
# File 'lib/rpush/daemon/app_runner.rb', line 141

def num_dispatchers
  dispatchers.size
end

#queue_sizeObject



129
130
131
# File 'lib/rpush/daemon/app_runner.rb', line 129

def queue_size
  queue.size
end

#startObject



73
74
75
76
77
# File 'lib/rpush/daemon/app_runner.rb', line 73

def start
  app.connections.times { dispatchers.push(new_dispatcher_loop) }
  start_loops
  log_info("Started, #{dispatchers_str}.")
end

#stopObject



79
80
81
82
# File 'lib/rpush/daemon/app_runner.rb', line 79

def stop
  dispatchers.stop
  stop_loops
end

#sync(app) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/rpush/daemon/app_runner.rb', line 92

def sync(app)
  @app = app
  diff = dispatchers.size - app.connections
  return if diff == 0
  if diff > 0
    decrement_dispatchers(diff)
    log_info("Stopped #{dispatchers_str(diff)}. #{dispatchers_str} running.")
  else
    increment_dispatchers(diff.abs)
    log_info("Started #{dispatchers_str(diff)}. #{dispatchers_str} running.")
  end
end