Module: SDN::CLI::MQTT::Subscriptions

Included in:
SDN::CLI::MQTT
Defined in:
lib/sdn/cli/mqtt/subscriptions.rb

Instance Method Summary collapse

Instance Method Details

#handle_message(topic, value) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/sdn/cli/mqtt/subscriptions.rb', line 5

def handle_message(topic, value)
  SDN.logger.info "got #{value.inspect} at #{topic}"
  if (match = topic.match(%r{^#{Regexp.escape(@base_topic)}/(?<addr>\h{6})/(?<property>discover|label|control|jog-(?<jog_type>pulses|ms)|position-pulses|position-percent|ip|reset|(?<speed_type>up-speed|down-speed|slow-speed)|up-limit|down-limit|direction|ip(?<ip>\d+)-(?<ip_type>pulses|percent)|groups)/set$}))
    addr = Message.parse_address(match[:addr])
    property = match[:property]
    # not homie compliant; allows linking the position-percent property
    # directly to an OpenHAB rollershutter channel
    if property == 'position-percent' && value =~ /^(?:UP|DOWN|STOP)$/i
      property = "control"
      value = value.downcase
    end
    mqtt_addr = Message.print_address(addr).gsub('.', '')
    motor = @motors[mqtt_addr]
    is_group = Message.is_group_address?(addr)
    group = @groups[mqtt_addr]
    follow_up = motor&.node_type == :st50ilt2 ? Message::ILT2::GetMotorPosition.new(addr) :
      Message::GetMotorStatus.new(addr)
    ns = motor&.node_type == :st50ilt2 ? Message::ILT2 : Message

    message = case property
      when 'discover'
        follow_up = nil
        if value == "discover"
          # discovery is low priority, and longer timeout
          enqueue(MessageAndRetries.new(Message::GetNodeAddr.new(addr), 1, 2), 2)
        end
        nil
      when 'label'
        follow_up = Message::GetNodeLabel.new(addr)
        ns::SetNodeLabel.new(addr, value) unless is_group
      when 'control'
        case value
        when 'up', 'down'
          (motor&.node_type == :st50ilt2 ? ns::SetMotorPosition : Message::MoveTo).
            new(addr, "#{value}_limit".to_sym)
        when 'stop'
          motor&.node_type == :st50ilt2 ? ns::SetMotorPosition.new(addr, :stop) : Message::Stop.new(addr)
        when 'next_ip'
          motor&.node_type == :st50ilt2 ? ns::SetMotorPosition.new(addr, :next_ip_down) :
            Message::MoveOf.new(addr, :next_ip)
        when 'previous_ip'
          motor&.node_type == :st50ilt2 ? ns::SetMotorPosition.new(addr, :next_ip_up) :
            Message::MoveOf.new(addr, :previous_ip)
        when 'wink'
          Message::Wink.new(addr)
        when 'refresh'
          follow_up = nil
          (motor&.node_type == :st50ilt2 ? ns::GetMotorPosition : Message::GetMotorStatus).
            new(addr)
        end
      when /jog-(?:pulses|ms)/
        value = value.to_i
        (motor&.node_type == :st50ilt2 ? ns::SetMotorPosition : Message::MoveOf).
          new(addr, "jog_#{value < 0 ? :up : :down }_#{match[:jog_type]}".to_sym, value.abs)
      when 'reset'
        return unless Message::SetFactoryDefault::RESET.keys.include?(value.to_sym)
        Message::SetFactoryDefault.new(addr, value.to_sym)
      when 'position-pulses', 'position-percent', 'ip'
        if value == 'REFRESH'
          follow_up = nil
          (motor&.node_type == :st50ilt2 ? ns::GetMotorPosition : Message::GetMotorStatus).
            new(addr)
        else
          (motor&.node_type == :st50ilt2 ? ns::SetMotorPosition : Message::MoveTo).
            new(addr, property.sub('position-', 'position_').to_sym, value.to_i)
        end
      when 'direction'
        return if is_group
        follow_up = Message::GetMotorDirection.new(addr)
        return unless %w{standard reversed}.include?(value)
        Message::SetMotorDirection.new(addr, value.to_sym)
      when 'up-limit', 'down-limit'
        return if is_group
        if %w{delete current_position jog_ms jog_pulses}.include?(value)
          type = value.to_sym
          value = 10
        else
          type = :specified_position
        end
        target = property == 'up-limit' ? :up : :down
        follow_up = Message::GetMotorLimits.new(addr)
        Message::SetMotorLimits.new(addr, type, target, value.to_i)
      when /^ip\d-(?:pulses|percent)$/
        return if is_group
        ip = match[:ip].to_i
        return unless (1..16).include?(ip)
        follow_up = ns::GetMotorIP.new(addr, ip)

        if motor&.node_type == :st50ilt2
          value = if value == 'delete'
            nil
          elsif value == 'current_position'
            motor.position_pulses
          elsif match[:ip_type] == 'pulses'
            value.to_i
          else
            value.to_f / motor.down_limit * 100
          end
          ns::SetMotorIP.new(addr, ip, value)
        else
          type = if value == 'delete'
            :delete
          elsif value == 'current_position'
            :current_position
          elsif match[:ip_type] == 'pulses'
            :position_pulses
          else
            :position_percent
          end
          Message::SetMotorIP.new(addr, type, ip, value.to_i)
        end
      when 'up-speed', 'down-speed', 'slow-speed'
        return if is_group
        return unless motor
        follow_up = Message::GetMotorRollingSpeed.new(addr)
        message = Message::SetMotorRollingSpeed.new(addr,
          up_speed: motor.up_speed,
          down_speed: motor.down_speed,
          slow_speed: motor.slow_speed)
        message.send(:"#{property.sub('-', '_')}=", value.to_i)
        message
      when 'groups'
        return if is_group
        return unless motor
        messages = motor.set_groups(value)
        @mutex.synchronize do
          messages.each { |m| @queues[0].push(MessageAndRetries.new(m, 5, 0)) }
          @cond.signal
        end
        nil
    end

    if motor
      motor.last_action = message.class if [Message::MoveTo, Message::Move, Message::Wink, Message::Stop].include?(message.class)
    end

    if message
      message.ack_requested = true if message.class.name !~ /^SDN::Message::Get/
      @mutex.synchronize do
        @queues[0].push(MessageAndRetries.new(message, 5, 0))
        if follow_up
          @queues[1].push(MessageAndRetries.new(follow_up, 5, 1)) unless @queues[1].any? { |mr| mr.message == follow_up }
        end
        @cond.signal
      end
    end
  end
end