5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/sdn/cli/mqtt/write.rb', line 5
def write
last_write_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
loop do
message_and_retries = nil
@mutex.synchronize do
if @response_pending
while @response_pending
remaining_wait = @response_pending - Time.now.to_f
if remaining_wait < 0
SDN.logger.debug "timed out waiting on response"
@response_pending = nil
@broadcast_pending = nil
if @prior_message && @prior_message&.remaining_retries != 0
SDN.logger.debug "retrying #{@prior_message.remaining_retries} more times ..."
if Message.is_group_address?(@prior_message.message.src) && !@pending_group_motors.empty?
SDN.logger.debug "re-targetting group message to individual motors"
@pending_group_motors.each do |addr|
new_message = @prior_message.message.dup
new_message.src = [0, 0, 1]
new_message.dest = Message.parse_address(addr)
@queues[@prior_message.priority].push(MessageAndRetries.new(new_message, @prior_message.remaining_retries, @prior_message.priority))
end
@pending_group_motors = []
else
@queues[@prior_message.priority].push(@prior_message)
end
@prior_message = nil
end
else
@cond.wait(@mutex, remaining_wait)
end
end
end
@queues.find { |q| message_and_retries = q.shift }
if message_and_retries
if message_and_retries.message.ack_requested || message_and_retries.message.class.name =~ /^SDN::Message::Get/
@response_pending = Time.now.to_f + WAIT_TIME
@pending_group_motors = if Message.is_group_address?(message_and_retries.message.src)
group_addr = Message.print_address(message_and_retries.message.src).gsub('.', '')
@groups[group_addr]&.motor_objects&.map(&:addr) || []
else
[]
end
if message_and_retries.message.dest == BROADCAST_ADDRESS || Message.is_group_address?(message_and_retries.message.src) && message_and_retries.message.is_a?(Message::GetNodeAddr)
@broadcast_pending = Time.now.to_f + BROADCAST_WAIT
end
end
end
if @response_pending
message_and_retries.remaining_retries -= 1
@prior_message = message_and_retries
elsif message_and_retries
@prior_message = nil
else
if @auto_discover && @motors_found
message_and_retries = MessageAndRetries.new(Message::GetNodeAddr.new, 1, 2)
@motors_found = false
else
@cond.wait(@mutex)
end
end
end
next unless message_and_retries
message = message_and_retries.message
SDN.logger.info "writing #{message.inspect}"
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
sleep_time = 0.1 - (now - last_write_at)
sleep(sleep_time) if sleep_time > 0
@sdn.send(message)
last_write_at = now
end
rescue => e
SDN.logger.fatal "failure writing: #{e}: #{e.backtrace}"
exit 1
end
|