7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/sdn/cli/mqtt/write.rb', line 7
def write
last_write_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
loop do
message_and_retries = nil
@mutex.synchronize do
if @response_pending
while @response_pending
remaining_wait = @response_pending - Time.now.to_f
if remaining_wait.negative?
SDN.logger.debug "Timed out waiting on response"
@response_pending = nil
@broadcast_pending = nil
if @prior_message && @prior_message&.remaining_retries != 0
SDN.logger.debug "Retrying #{@prior_message.remaining_retries} more times ..."
if Message.group_address?(@prior_message.message.src) && !@pending_group_motors.empty?
SDN.logger.debug "Re-targetting group message to individual motors"
@pending_group_motors.each do |addr|
new_message = @prior_message.message.dup
new_message.src = [0, 0, 1]
new_message.dest = Message.parse_address(addr)
@queue.push(MessageAndRetries.new(new_message,
@prior_message.remaining_retries,
@prior_message.priority))
end
@pending_group_motors = []
else
@queue.push(@prior_message)
end
@prior_message = nil
end
else
@cond.wait(@mutex, remaining_wait)
end
end
end
message_and_retries = @queue.shift
if message_and_retries && (
message_and_retries.message.ack_requested ||
message_and_retries.message.class.name =~ /^SDN::Message::Get/)
@response_pending = Time.now.to_f + WAIT_TIME
@pending_group_motors = if Message.group_address?(message_and_retries.message.src)
group_addr = Message.print_address(message_and_retries.message.src).delete(
"."
)
@groups[group_addr]&.motor_objects&.map(&:addr) || []
else
[]
end
if message_and_retries.message.dest == BROADCAST_ADDRESS || (
Message.group_address?(message_and_retries.message.src) &&
message_and_retries.message.is_a?(Message::GetNodeAddr))
@broadcast_pending = Time.now.to_f + BROADCAST_WAIT
end
end
if @response_pending
message_and_retries.remaining_retries -= 1
@prior_message = message_and_retries
elsif message_and_retries
@prior_message = nil
elsif @auto_discover && @motors_found
message_and_retries = MessageAndRetries.new(Message::GetNodeAddr.new, 1, 50)
@motors_found = false
else
@cond.wait(@mutex)
end
end
next unless message_and_retries
message = message_and_retries.message
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
sleep_time = 0.1 - (now - last_write_at)
sleep(sleep_time) if sleep_time.positive?
@sdn.send(message)
last_write_at = now
end
rescue => e
SDN.logger.fatal "Failure writing: #{e}: #{e.backtrace}"
exit 1
end
|