Class: PahoMqtt::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/paho_mqtt/publisher.rb

Instance Method Summary collapse

Constructor Details

#initialize(sender) ⇒ Publisher

Returns a new instance of Publisher.



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/paho_mqtt/publisher.rb', line 18

def initialize(sender)
  @waiting_puback  = []
  @waiting_pubrec  = []
  @waiting_pubrel  = []
  @waiting_pubcomp = []
  @puback_mutex    = Mutex.new
  @pubrec_mutex    = Mutex.new
  @pubrel_mutex    = Mutex.new
  @pubcomp_mutex   = Mutex.new
  @sender          = sender
end

Instance Method Details

#check_waiting_publisherObject



158
159
160
161
162
163
# File 'lib/paho_mqtt/publisher.rb', line 158

def check_waiting_publisher
  @sender.check_ack_alive(@waiting_puback, @puback_mutex)
  @sender.check_ack_alive(@waiting_pubrec, @pubrec_mutex)
  @sender.check_ack_alive(@waiting_pubrel, @pubrel_mutex)
  @sender.check_ack_alive(@waiting_pubcomp, @pubcomp_mutex)
end

#config_all_message_queueObject



143
144
145
146
147
148
# File 'lib/paho_mqtt/publisher.rb', line 143

def config_all_message_queue
  config_message_queue(@waiting_puback, @puback_mutex)
  config_message_queue(@waiting_pubrec, @pubrec_mutex)
  config_message_queue(@waiting_pubrel, @pubrel_mutex)
  config_message_queue(@waiting_pubcomp, @pubcomp_mutex)
end

#config_message_queue(queue, mutex) ⇒ Object



150
151
152
153
154
155
156
# File 'lib/paho_mqtt/publisher.rb', line 150

def config_message_queue(queue, mutex)
  mutex.synchronize do
    queue.each do |pck|
      pck[:timestamp] = Time.now
    end
  end
end

#do_puback(packet_id) ⇒ Object



90
91
92
93
94
95
# File 'lib/paho_mqtt/publisher.rb', line 90

def do_puback(packet_id)
  @puback_mutex.synchronize do
    @waiting_puback.delete_if { |pck| pck[:id] == packet_id }
  end
  MQTT_ERR_SUCCESS
end

#do_pubcomp(packet_id) ⇒ Object



136
137
138
139
140
141
# File 'lib/paho_mqtt/publisher.rb', line 136

def do_pubcomp(packet_id)
  @pubcomp_mutex.synchronize do
    @waiting_pubcomp.delete_if { |pck| pck[:id] == packet_id }
  end
  MQTT_ERR_SUCCESS
end

#do_publish(qos, packet_id) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/paho_mqtt/publisher.rb', line 68

def do_publish(qos, packet_id)
  case qos
  when 0
  when 1
    send_puback(packet_id)
  when 2
    send_pubrec(packet_id)
  else
    PahoMqtt.logger.error("The packet QoS value is invalid in publish.") if PahoMqtt.logger?
    raise PacketException.new('Invalid publish QoS value')
  end
  MQTT_ERR_SUCCESS
end

#do_pubrec(packet_id) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/paho_mqtt/publisher.rb', line 105

def do_pubrec(packet_id)
  @pubrec_mutex.synchronize do
    @waiting_pubrec.delete_if { |pck| pck[:id] == packet_id }
  end
  send_pubrel(packet_id)
  MQTT_ERR_SUCCESS
end

#do_pubrel(packet_id) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/paho_mqtt/publisher.rb', line 120

def do_pubrel(packet_id)
  @pubrel_mutex.synchronize do
    @waiting_pubrel.delete_if { |pck| pck[:id] == packet_id }
  end
  send_pubcomp(packet_id)
  MQTT_ERR_SUCCESS
end

#flush_publisherObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/paho_mqtt/publisher.rb', line 165

def flush_publisher
  @puback_mutex.synchronize do
    @waiting_puback = []
  end
  @pubrec_mutex.synchronize do
    @waiting_pubrec = []
  end
  @pubrel_mutex.synchronize do
    @waiting_pubrel = []
  end
  @pubcomp_mutex.synchronize do
    @waiting_pubcomp = []
  end
end

#push_queue(waiting_queue, queue_mutex, max_packet, packet, new_id) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/paho_mqtt/publisher.rb', line 52

def push_queue(waiting_queue, queue_mutex, max_packet, packet, new_id)
  begin
    if waiting_queue.length >= max_packet
      raise FullQueueException
    end
    queue_mutex.synchronize do
      waiting_queue.push(:id => new_id, :packet => packet, :timestamp => Time.now)
    end
  rescue FullQueueException
    PahoMqtt.logger.error("#{packet.type_name} queue is full") if PahoMqtt.logger?
    sleep SELECT_TIMEOUT
    retry
  end
  MQTT_ERR_SUCCESS
end

#send_puback(packet_id) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/paho_mqtt/publisher.rb', line 82

def send_puback(packet_id)
  packet = PahoMqtt::Packet::Puback.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_pubcomp(packet_id) ⇒ Object



128
129
130
131
132
133
134
# File 'lib/paho_mqtt/publisher.rb', line 128

def send_pubcomp(packet_id)
  packet = PahoMqtt::Packet::Pubcomp.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_publish(topic, payload, retain, qos, new_id) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/paho_mqtt/publisher.rb', line 34

def send_publish(topic, payload, retain, qos, new_id)
  packet = PahoMqtt::Packet::Publish.new(
    :id      => new_id,
    :topic   => topic,
    :payload => payload,
    :retain  => retain,
    :qos     => qos
  )
  case qos
  when 1
    push_queue(@waiting_puback, @puback_mutex, MAX_PUBACK, packet, new_id)
  when 2
    push_queue(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC, packet, new_id)
  end
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_pubrec(packet_id) ⇒ Object



97
98
99
100
101
102
103
# File 'lib/paho_mqtt/publisher.rb', line 97

def send_pubrec(packet_id)
  packet = PahoMqtt::Packet::Pubrec.new(
    :id => packet_id
  )
  push_queue(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL, packet, packet_id)
  MQTT_ERR_SUCCESS
end

#send_pubrel(packet_id) ⇒ Object



113
114
115
116
117
118
# File 'lib/paho_mqtt/publisher.rb', line 113

def send_pubrel(packet_id)
  packet = PahoMqtt::Packet::Pubrel.new(
    :id => packet_id
  )
  push_queue(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP, packet, packet_id)
end

#sender=(sender) ⇒ Object



30
31
32
# File 'lib/paho_mqtt/publisher.rb', line 30

def sender=(sender)
  @sender = sender
end