Module: SDN::CLI::MQTT::Read

Included in:
SDN::CLI::MQTT
Defined in:
lib/sdn/cli/mqtt/read.rb

Instance Method Summary collapse

Instance Method Details

#readObject



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/sdn/cli/mqtt/read.rb', line 5

def read
  loop do
    begin
      @sdn.receive do |message|
        @mqtt.batch_publish do
          SDN.logger.info "read #{message.inspect}"

          src = Message.print_address(message.src)
          # ignore the UAI Plus and ourselves
          if src != '7F.7F.7F' && !Message.is_group_address?(message.src) && !(motor = @motors[src.gsub('.', '')])
            SDN.logger.info "found new motor #{src}"
            @motors_found = true
            motor = publish_motor(src.gsub('.', ''), message.node_type)
          end

          follow_ups = []
          case message
          when Message::PostNodeLabel
            if (motor.publish(:label, message.label))
              publish("#{motor.addr}/$name", message.label)
            end
          when Message::PostMotorPosition,
            Message::ILT2::PostMotorPosition
            if message.is_a?(Message::ILT2::PostMotorPosition)
              # keep polling while it's still moving; check prior two positions
              if motor.position_pulses == message.position_pulses &&
                motor.last_position_pulses == message.position_pulses
                motor.publish(:state, :stopped)
              else
                motor.publish(:state, :running)
                if motor.position_pulses && motor.position_pulses != message.position_pulses
                  motor.publish(:last_direction, motor.position_pulses < message.position_pulses ? :down : :up)
                end
                follow_ups << Message::ILT2::GetMotorPosition.new(message.src)
              end
              motor.last_position_pulses = motor.position_pulses
              ip = (1..16).find do |i|
                # divide by 5 for some leniency
                motor["ip#{i}_pulses"].to_i / 5 == message.position_pulses / 5
              end
              motor.publish(:ip, ip)
            end
            motor.publish(:position_percent, message.position_percent)
            motor.publish(:position_pulses, message.position_pulses)
            motor.publish(:ip, message.ip) if message.respond_to?(:ip)
            motor.group_objects.each do |group|
              positions_percent = group.motor_objects.map(&:position_percent)
              positions_pulses = group.motor_objects.map(&:position_pulses)
              ips = group.motor_objects.map(&:ip)

              position_percent = nil
              # calculate an average, but only if we know a position for
              # every shade
              if !positions_percent.include?(:nil) && !positions_percent.include?(nil)
                position_percent = positions_percent.inject(&:+) / positions_percent.length
              end

              position_pulses = nil
              if !positions_pulses.include?(:nil) && !positions_pulses.include?(nil)
                position_pulses = positions_pulses.inject(&:+) / positions_pulses.length
              end

              ip = nil
              ip = ips.first if ips.uniq.length == 1
              ip = nil if ip == :nil

              group.publish(:position_percent, position_percent)
              group.publish(:position_pulses, position_pulses)
              group.publish(:ip, ip)
            end
          when Message::PostMotorStatus
            if message.state == :running || motor.state == :running ||
              # if it's explicitly stopped, but we didn't ask it to, it's probably
              # changing directions so keep querying
              (message.state == :stopped &&
                message.last_action_cause == :explicit_command &&
                !(motor.last_action == Message::Stop || motor.last_action.nil?))
              follow_ups << Message::GetMotorStatus.new(message.src)
            end
            # this will do one more position request after it stopped
            follow_ups << Message::GetMotorPosition.new(message.src)
            motor.publish(:state, message.state)
            motor.publish(:last_direction, message.last_direction)
            motor.publish(:last_action_source, message.last_action_source)
            motor.publish(:last_action_cause, message.last_action_cause)
            motor.group_objects.each do |group|
              states = group.motor_objects.map(&:state).uniq
              state = states.length == 1 ? states.first : 'mixed'
              group.publish(:state, state)

              directions = group.motor_objects.map(&:last_direction).uniq
              direction = directions.length == 1 ? directions.first : 'mixed'
              group.publish(:last_direction, direction)
            end
          when Message::PostMotorLimits
            motor.publish(:up_limit, message.up_limit)
            motor.publish(:down_limit, message.down_limit)
          when Message::ILT2::PostMotorSettings
            motor.publish(:down_limit, message.limit)
          when Message::PostMotorDirection
            motor.publish(:direction, message.direction)
          when Message::PostMotorRollingSpeed
            motor.publish(:up_speed, message.up_speed)
            motor.publish(:down_speed, message.down_speed)
            motor.publish(:slow_speed, message.slow_speed)
          when Message::PostMotorIP,
            Message::ILT2::PostMotorIP
            motor.publish(:"ip#{message.ip}_pulses", message.position_pulses)
            if message.respond_to?(:position_percent)
              motor.publish(:"ip#{message.ip}_percent", message.position_percent) 
            elsif motor.down_limit
              motor.publish(:"ip#{message.ip}_percent", message.position_pulses.to_f / motor.down_limit * 100) 
            end
          when Message::PostGroupAddr
            motor.add_group(message.group_index, message.group_address)
          end

          @mutex.synchronize do
            prior_message_to_group = Message.is_group_address?(@prior_message&.message&.src) if @prior_message

            correct_response = @response_pending && @prior_message&.message&.class&.expected_response?(message)
            correct_response = false if !prior_message_to_group && message.src != @prior_message&.message&.dest
            correct_response = false if prior_message_to_group && message.dest != @prior_message&.message&.src

            if prior_message_to_group && correct_response
              @pending_group_motors.delete(Message.print_address(message.src).gsub('.', ''))
              correct_response = false unless @pending_group_motors.empty?
            end

            signal = correct_response || !follow_ups.empty?
            @response_pending = @broadcast_pending if correct_response
            follow_ups.each do |follow_up|
              @queues[1].push(MessageAndRetries.new(follow_up, 5, 1)) unless @queues[1].any? { |mr| mr.message == follow_up }
            end
            @cond.signal if signal
          end
        rescue EOFError
          SDN.logger.fatal "EOF reading"
          exit 2
        rescue MalformedMessage => e
          SDN.logger.warn "ignoring malformed message: #{e}" unless e.to_s =~ /issing data/
        rescue => e
          SDN.logger.error "got garbage: #{e}; #{e.backtrace}"
        end
      end
    end
  end
end