Module: SDN::CLI::MQTT::Read

Included in:
SDN::CLI::MQTT
Defined in:
lib/sdn/cli/mqtt/read.rb

Instance Method Summary collapse

Instance Method Details

#readObject



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/sdn/cli/mqtt/read.rb', line 7

def read
  loop do
    @sdn.receive do |message|
      @mqtt.batch_publish do
        src = Message.print_address(message.src)
        # ignore the UAI Plus and ourselves
        if src != "7F.7F.7F" && !Message.group_address?(message.src) && !(motor = @motors[src.delete(".")])
          SDN.logger.info "Found new motor #{src}"
          @motors_found = true
          motor = publish_motor(src.delete("."), message.node_type)
        end

        follow_ups = []
        case message
        when Message::PostNodeLabel
          publish("#{motor.addr}/$name", message.label) if motor.publish(:label, message.label) && @homie
        when Message::PostMotorPosition,
          Message::ILT2::PostMotorPosition
          if message.is_a?(Message::ILT2::PostMotorPosition)
            # keep polling while it's still moving; check prior two positions
            if motor.position_pulses == message.position_pulses &&
               motor.last_position_pulses == message.position_pulses
              motor.publish(:state, :stopped)
            else
              motor.publish(:state, :running)
              if motor.position_pulses && motor.position_pulses != message.position_pulses
                motor.publish(:last_direction,
                              (motor.position_pulses < message.position_pulses) ? :down : :up)
              end
              follow_ups << Message::ILT2::GetMotorPosition.new(message.src)
            end
            motor.last_position_pulses = motor.position_pulses
            ip = (1..16).find do |i|
              # divide by 5 for some leniency
              motor["ip#{i}_pulses"].to_i / 5 == message.position_pulses / 5
            end
            motor.publish(:ip, ip)
          end
          motor.publish(:position_percent, message.position_percent)
          motor.publish(:position_pulses, message.position_pulses)
          motor.publish(:ip, message.ip) if message.respond_to?(:ip)
          motor.group_objects.each do |group|
            positions_percent = group.motor_objects.map(&:position_percent)
            positions_pulses = group.motor_objects.map(&:position_pulses)
            ips = group.motor_objects.map(&:ip)

            position_percent = nil
            # calculate an average, but only if we know a position for
            # every shade
            if !positions_percent.include?(:nil) && !positions_percent.include?(nil)
              position_percent = positions_percent.sum / positions_percent.length
            end

            position_pulses = nil
            if !positions_pulses.include?(:nil) && !positions_pulses.include?(nil)
              position_pulses = positions_pulses.sum / positions_pulses.length
            end

            ip = nil
            ip = ips.first if ips.uniq.length == 1
            ip = nil if ip == :nil

            group.publish(:position_percent, position_percent)
            group.publish(:position_pulses, position_pulses)
            group.publish(:ip, ip)
          end
        when Message::PostMotorStatus
          if message.state == :running || motor.state == :running ||
             # if it's explicitly stopped, but we didn't ask it to, it's probably
             # changing directions so keep querying
             (message.state == :stopped &&
               message.last_action_cause == :explicit_command &&
               !(motor.last_action == Message::Stop || motor.last_action.nil?))
            follow_ups << Message::GetMotorStatus.new(message.src)
          end
          # this will do one more position request after it stopped
          follow_ups << Message::GetMotorPosition.new(message.src)
          motor.publish(:state, message.state)
          motor.publish(:last_direction, message.last_direction)
          hass_state = if message.state == :running
                         (message.last_direction == :down) ? :closing : :opening
                       elsif motor.position_percent&.zero?
                         :open
                       elsif motor.position_percent == 100
                         :closed
                       else
                         :stopped
                       end
          motor.publish(:hass_state, hass_state)
          motor.publish(:last_action_source, message.last_action_source)
          motor.publish(:last_action_cause, message.last_action_cause)
          motor.group_objects.each do |group|
            states = group.motor_objects.map(&:state).uniq
            state = (states.length == 1) ? states.first : "mixed"
            group.publish(:state, state)

            directions = group.motor_objects.map(&:last_direction).uniq
            direction = (directions.length == 1) ? directions.first : "mixed"
            group.publish(:last_direction, direction)

            positions = group.motor_objects.map(&:position_percent).uniq
            position = (positions.length == 1) ? positions.first : 50

            hass_state = if state == :running
                           (direction == :down) ? :closing : :opening
                         elsif position.zero?
                           :open
                         elsif position == 100
                           :closed
                         else
                           :stopped
                         end
            group.publish(:hass_state, hass_state)
          end
        when Message::PostMotorLimits
          motor.publish(:up_limit, message.up_limit)
          motor.publish(:down_limit, message.down_limit)
        when Message::ILT2::PostMotorSettings
          motor.publish(:down_limit, message.limit)
        when Message::PostMotorDirection
          motor.publish(:direction, message.direction)
        when Message::PostMotorRollingSpeed
          motor.publish(:up_speed, message.up_speed)
          motor.publish(:down_speed, message.down_speed)
          motor.publish(:slow_speed, message.slow_speed)
        when Message::PostMotorIP,
          Message::ILT2::PostMotorIP
          motor.publish(:"ip#{message.ip}_pulses", message.position_pulses)
          if message.respond_to?(:position_percent)
            motor.publish(:"ip#{message.ip}_percent", message.position_percent)
          elsif motor.down_limit
            motor.publish(:"ip#{message.ip}_percent", message.position_pulses.to_f / motor.down_limit * 100)
          end
        when Message::PostGroupAddr
          motor.add_group(message.group_index, message.group_address)
        end

        @mutex.synchronize do
          prior_message_to_group = Message.group_address?(@prior_message&.message&.src) if @prior_message

          correct_response = @response_pending && @prior_message&.message&.class&.expected_response?(message)
          correct_response = false if !prior_message_to_group && message.src != @prior_message&.message&.dest
          correct_response = false if prior_message_to_group && message.dest != @prior_message&.message&.src

          if prior_message_to_group && correct_response
            @pending_group_motors.delete(Message.print_address(message.src).delete("."))
            correct_response = false unless @pending_group_motors.empty?
          end

          signal = correct_response || !follow_ups.empty?
          @response_pending = @broadcast_pending if correct_response
          follow_ups.each do |follow_up|
            unless @queue.any? { |mr| mr.message == follow_up }
              @queue.push(MessageAndRetries.new(follow_up, 5, 1))
            end
          end
          @cond.signal if signal
        end
      rescue EOFError
        SDN.logger.fatal "EOF reading"
        exit 2
      rescue MalformedMessage => e
        SDN.logger.warn "Ignoring malformed message: #{e}" unless e.to_s.include?("issing data")
      rescue => e
        SDN.logger.error "Got garbage: #{e}; #{e.backtrace}"
      end
    end
  end
end