Class: Proxy::RemoteExecution::Ssh::MQTT::DispatcherActor::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb

Instance Method Summary collapse

Constructor Details

#initialize(limit, clock) ⇒ Tracker

Returns a new instance of Tracker.



37
38
39
40
41
42
43
44
45
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 37

def initialize(limit, clock)
  @clock = clock
  @limit = limit
  @jobs = {}
  @pending = []
  @running = Set.new
  @hot = Set.new
  @cold = Set.new
end

Instance Method Details

#dispatch_pendingObject



86
87
88
89
90
91
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 86

def dispatch_pending
  pending_count.times do
    mqtt_notify(@pending.first)
    @hot << @pending.shift
  end
end

#done(uuid) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 65

def done(uuid)
  @jobs.delete(uuid)
  [@pending, @running, @hot, @cold].each do |source|
    source.delete(uuid)
  end
  dispatch_pending
end

#mqtt_notify(uuid) ⇒ Object



101
102
103
104
105
106
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 101

def mqtt_notify(uuid)
  job = @jobs[uuid]
  return if job.nil?

  Proxy::RemoteExecution::Ssh::MQTT.publish(job.topic, JSON.dump(job.payload))
end

#needs_processing?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 73

def needs_processing?
  pending_count.positive? || @hot.any? || @cold.any?
end

#new(uuid, topic, payload) ⇒ Object



47
48
49
50
51
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 47

def new(uuid, topic, payload)
  @jobs[uuid] = JobDefinition.new(uuid, topic, payload)
  @pending << uuid
  dispatch_pending
end

#pending_countObject



77
78
79
80
81
82
83
84
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 77

def pending_count
  pending = @pending.count
  return pending if @limit.nil?

  running = [@running, @hot, @cold].map(&:count).sum
  capacity = @limit - running
  pending > capacity ? capacity : pending
end

#processObject



93
94
95
96
97
98
99
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 93

def process
  @cold.each { |uuid| schedule_resend(uuid) }
  @cold = @hot
  @hot = Set.new

  dispatch_pending
end

#resend(uuid) ⇒ Object



58
59
60
61
62
63
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 58

def resend(uuid)
  return unless @jobs[uuid]

  @pending << uuid
  dispatch_pending
end

#resend_intervalObject



116
117
118
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 116

def resend_interval
  settings[:mqtt_resend_interval]
end

#running(uuid) ⇒ Object



53
54
55
56
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 53

def running(uuid)
  [@pending, @hot, @cold].each { |source| source.delete(uuid) }
  @running << uuid
end

#schedule_resend(uuid) ⇒ Object



112
113
114
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 112

def schedule_resend(uuid)
  @clock.ping(Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance, resend_interval, uuid, :resend)
end

#settingsObject



108
109
110
# File 'lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb', line 108

def settings
  Proxy::RemoteExecution::Ssh::Plugin.settings
end