Class: PahoMqtt::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/paho_mqtt/publisher.rb

Instance Method Summary collapse

Constructor Details

#initialize(sender) ⇒ Publisher

Returns a new instance of Publisher.



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/paho_mqtt/publisher.rb', line 18

def initialize(sender)
  @waiting_puback = []
  @waiting_pubrec = []
  @waiting_pubrel = []
  @waiting_pubcomp = []
  @puback_mutex = Mutex.new
  @pubrec_mutex = Mutex.new
  @pubrel_mutex = Mutex.new
  @pubcomp_mutex = Mutex.new
  @sender = sender
end

Instance Method Details

#check_waiting_publisherObject



158
159
160
161
162
163
# File 'lib/paho_mqtt/publisher.rb', line 158

def check_waiting_publisher
  @sender.check_ack_alive(@waiting_puback, @puback_mutex, MAX_PUBACK)
  @sender.check_ack_alive(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC)
  @sender.check_ack_alive(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL)
  @sender.check_ack_alive(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP)
end

#config_all_message_queueObject



138
139
140
141
142
143
# File 'lib/paho_mqtt/publisher.rb', line 138

def config_all_message_queue
  config_message_queue(@waiting_puback, @puback_mutex, MAX_PUBACK)
  config_message_queue(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC)
  config_message_queue(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL)
  config_message_queue(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP)
end

#config_message_queue(queue, mutex, max_packet) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/paho_mqtt/publisher.rb', line 145

def config_message_queue(queue, mutex, max_packet)
  mutex.synchronize {
    cnt = 0 
    queue.each do |pck|
      pck[:packet].dup ||= true
      if cnt <= max_packet
        @sender.append_to_writing(pck[:packet])
        cnt += 1
      end
    end
  }
end

#do_puback(packet_id) ⇒ Object



78
79
80
81
82
83
# File 'lib/paho_mqtt/publisher.rb', line 78

def do_puback(packet_id)
  @puback_mutex.synchronize{
    @waiting_puback.delete_if { |pck| pck[:id] == packet_id }
  }
  MQTT_ERR_SUCCESS      
end

#do_pubcomp(packet_id) ⇒ Object



131
132
133
134
135
136
# File 'lib/paho_mqtt/publisher.rb', line 131

def do_pubcomp(packet_id)
  @pubcomp_mutex.synchronize {
    @waiting_pubcomp.delete_if { |pck| pck[:id] == packet_id }
  }
  MQTT_ERR_SUCCESS
end

#do_publish(qos, packet_id) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/paho_mqtt/publisher.rb', line 56

def do_publish(qos, packet_id)
  case qos
  when 0
  when 1
    send_puback(packet_id)
  when 2
    send_pubrec(packet_id)
  else
    @logger.error("The packet qos value is invalid in publish.") if logger?
    raise PacketException
  end
  MQTT_ERR_SUCCESS
end

#do_pubrec(packet_id) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/paho_mqtt/publisher.rb', line 96

def do_pubrec(packet_id)
  @pubrec_mutex.synchronize {
    @waiting_pubrec.delete_if { |pck| pck[:id] == packet_id }
  }
  send_pubrel(packet_id)
  MQTT_ERR_SUCCESS
end

#do_pubrel(packet_id) ⇒ Object



115
116
117
118
119
120
121
# File 'lib/paho_mqtt/publisher.rb', line 115

def do_pubrel(packet_id)
  @pubrel_mutex.synchronize {
    @waiting_pubrel.delete_if { |pck| pck[:id] == packet_id }
  }
  send_pubcomp(packet_id)
  MQTT_ERR_SUCCESS
end

#flush_publisherObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/paho_mqtt/publisher.rb', line 165

def flush_publisher
  @puback_mutex.synchronize {
    @waiting_puback = []
  }
  @pubrec_mutex.synchronize {
    @waiting_pubrec = []
  }
  @pubrel_mutex.synchronize {
    @waiting_pubrel = []
  }
  @pubcomp_mutex.synchronize {
    @waiting_pubcomp = []
  }
end

#send_puback(packet_id) ⇒ Object



70
71
72
73
74
75
76
# File 'lib/paho_mqtt/publisher.rb', line 70

def send_puback(packet_id)
  packet = PahoMqtt::Packet::Puback.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_pubcomp(packet_id) ⇒ Object



123
124
125
126
127
128
129
# File 'lib/paho_mqtt/publisher.rb', line 123

def send_pubcomp(packet_id)
  packet = PahoMqtt::Packet::Pubcomp.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  MQTT_ERR_SUCCESS
end

#send_publish(topic, payload, retain, qos, new_id) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/paho_mqtt/publisher.rb', line 34

def send_publish(topic, payload, retain, qos, new_id)
  packet = PahoMqtt::Packet::Publish.new(
    :id => new_id,
    :topic => topic,
    :payload => payload,
    :retain => retain,
    :qos => qos
  )
  @sender.append_to_writing(packet)
  case qos
  when 1
    @puback_mutex.synchronize{
      @waiting_puback.push({:id => new_id, :packet => packet, :timestamp => Time.now})
    }
  when 2
    @pubrec_mutex.synchronize{
      @waiting_pubrec.push({:id => new_id, :packet => packet, :timestamp => Time.now})
    }
  end
  MQTT_ERR_SUCCESS
end

#send_pubrec(packet_id) ⇒ Object



85
86
87
88
89
90
91
92
93
94
# File 'lib/paho_mqtt/publisher.rb', line 85

def send_pubrec(packet_id)
  packet = PahoMqtt::Packet::Pubrec.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  @pubrel_mutex.synchronize{
    @waiting_pubrel.push({:id => packet_id , :packet => packet, :timestamp => Time.now})
  }
  MQTT_ERR_SUCCESS
end

#send_pubrel(packet_id) ⇒ Object



104
105
106
107
108
109
110
111
112
113
# File 'lib/paho_mqtt/publisher.rb', line 104

def send_pubrel(packet_id)
  packet = PahoMqtt::Packet::Pubrel.new(
    :id => packet_id
  )
  @sender.append_to_writing(packet)
  @pubcomp_mutex.synchronize{
    @waiting_pubcomp.push({:id => packet_id, :packet => packet, :timestamp => Time.now})
  }
  MQTT_ERR_SUCCESS
end

#sender=(sender) ⇒ Object



30
31
32
# File 'lib/paho_mqtt/publisher.rb', line 30

def sender=(sender)
  @sender = sender
end