Class: MiniMqtt::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mini_mqtt/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
# File 'lib/mini_mqtt/client.rb', line 7

def initialize params = {}
  @host = params[:host] || 'localhost'
  @port = params[:port] || 1883
  @user = params[:user]
  @password = params[:password]
  @keep_alive = params[:keep_alive] || 10
  @client_id = params[:client_id] || generate_client_id
  @clean_session = params.fetch :clean_session, true
end

Instance Attribute Details

#clean_sessionObject

Returns the value of attribute clean_session.



5
6
7
# File 'lib/mini_mqtt/client.rb', line 5

def clean_session
  @clean_session
end

#client_idObject

Returns the value of attribute client_id.



5
6
7
# File 'lib/mini_mqtt/client.rb', line 5

def client_id
  @client_id
end

#hostObject

Returns the value of attribute host.



5
6
7
# File 'lib/mini_mqtt/client.rb', line 5

def host
  @host
end

#passwordObject

Returns the value of attribute password.



5
6
7
# File 'lib/mini_mqtt/client.rb', line 5

def password
  @password
end

#portObject

Returns the value of attribute port.



5
6
7
# File 'lib/mini_mqtt/client.rb', line 5

def port
  @port
end

#userObject

Returns the value of attribute user.



5
6
7
# File 'lib/mini_mqtt/client.rb', line 5

def user
  @user
end

Instance Method Details

#connect(options = {}) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/mini_mqtt/client.rb', line 17

def connect options = {}
  # Create socket and packet handler
  @socket = TCPSocket.new @host, @port
  @handler = PacketHandler.new @socket

  # Send ConnectPacket
  @handler.write_packet ConnectPacket.new user: @user,
    password: @password, keep_alive: @keep_alive, client_id: @client_id,
    clean_session: @clean_session, will_topic: options[:will_topic],
    will_message: options[:will_message], will_retain: options[:will_retain]

  # Receive connack packet
  connack = @handler.get_packet

  if connack.accepted?
    @received_messages = Queue.new
    @last_ping_response = Time.now
    spawn_read_thread!
    spawn_keepalive_thread!
  else
    raise StandardError.new(connack.error)
  end
end

#connected?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/mini_mqtt/client.rb', line 80

def connected?
  @socket && !@socket.closed?
end

#disconnectObject



62
63
64
65
66
67
68
# File 'lib/mini_mqtt/client.rb', line 62

def disconnect
  # Send DisconnectPacket, then kill threads and close socket
  @handler.write_packet DisconnectPacket.new
  @read_thread.kill
  @keepalive_thread.kill
  @socket.close
end

#get_messageObject



70
71
72
# File 'lib/mini_mqtt/client.rb', line 70

def get_message
  @received_messages.pop
end

#get_messagesObject



74
75
76
77
78
# File 'lib/mini_mqtt/client.rb', line 74

def get_messages
  while message = get_message
    yield message.message, message.topic
  end
end

#publish(topic, message, options = {}) ⇒ Object



56
57
58
59
60
# File 'lib/mini_mqtt/client.rb', line 56

def publish topic, message, options = {}
  packet = PublishPacket.new topic: topic, message: message.to_s,
    retain: options[:retain], qos: options[:qos]
  @handler.write_packet packet
end

#subscribe(*params) ⇒ Object



41
42
43
44
45
46
47
48
49
50
# File 'lib/mini_mqtt/client.rb', line 41

def subscribe *params
  # Each param can be a topic or a topic with its max qos.
  # Example: subscribe 'topic1', 'topic2' => 1
  topics = params.map do |arg|
    arg.is_a?(Hash) ? arg : { arg => 0 }
  end
  topics = topics.inject :merge
  packet = SubscribePacket.new topics: topics
  @handler.write_packet packet
end

#unsubscribe(*topics) ⇒ Object



52
53
54
# File 'lib/mini_mqtt/client.rb', line 52

def unsubscribe *topics
  @handler.write_packet UnsubscribePacket.new topics: topics
end