Class: Mosq::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mosq/client.rb,
lib/mosq/client/bucket.rb

Defined Under Namespace

Classes: Bucket, DestroyedError

Constant Summary collapse

DEFAULT_PROTOCOL_TIMEOUT =

seconds

30

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Create a new Mosq::Client instance with the given properties.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/mosq/client.rb', line 14

def initialize(*args)
  @options = Util.connection_info(*args)
  
  @options[:max_in_flight] ||= 20 # messages
  @options[:heartbeat]     ||= 30 # seconds
  @protocol_timeout = DEFAULT_PROTOCOL_TIMEOUT
  
  Util.null_check "creating the client",
    (@ptr = FFI.mosquitto_new(@options[:client_id], true, nil))
  
  @bucket = Bucket.new(@ptr)
  @event_handlers = {}
  
  @packet_id_ptr = Util.mem_ptr(:int)
  
  @finalizer = self.class.create_finalizer_for(@ptr)
  ObjectSpace.define_finalizer(self, @finalizer)
end

Instance Attribute Details

#protocol_timeoutObject

The timeout to use when waiting for protocol events, in seconds. By default, this has the value of DEFAULT_PROTOCOL_TIMEOUT. When set, it affects operations like #run_loop!.



139
140
141
# File 'lib/mosq/client.rb', line 139

def protocol_timeout
  @protocol_timeout
end

Class Method Details

.create_finalizer_for(ptr) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



34
35
36
37
38
# File 'lib/mosq/client.rb', line 34

def self.create_finalizer_for(ptr)
  Proc.new do
    FFI.mosquitto_destroy(ptr)
  end
end

Instance Method Details

#break!nil

Stop iterating from within an execution of the #run_loop! method. Call this method only from within an event handler. It will take effect only after the handler finishes running.



289
290
291
292
# File 'lib/mosq/client.rb', line 289

def break!
  @breaking = true
  nil
end

#clear_event_handler(type) ⇒ Proc?

Unregister the event handler associated with the given channel and method.



132
133
134
# File 'lib/mosq/client.rb', line 132

def clear_event_handler(type)
  @event_handlers.delete(type.to_sym)
end

#closeObject

Gracefully close the connection with the server.



84
85
86
87
88
89
90
91
92
93
# File 'lib/mosq/client.rb', line 84

def close
  @ruby_socket = nil
  
  Util.error_check "closing the connection to #{@options[:host]}",
    FFI.mosquitto_disconnect(ptr)
  
  self
rescue Mosq::FFI::Error::NoConn
  self
end

#destroyObject

Free the native resources associated with this object. This will be done automatically on garbage collection if not called explicitly.



97
98
99
100
101
102
103
104
105
# File 'lib/mosq/client.rb', line 97

def destroy
  if @finalizer
    @finalizer.call
    ObjectSpace.undefine_finalizer(self)
  end
  @ptr = @finalizer = @ruby_socket = @bucket = nil
  
  self
end

#heartbeatObject



45
# File 'lib/mosq/client.rb', line 45

def heartbeat;     @options.fetch(:heartbeat);     end

#hostObject



42
# File 'lib/mosq/client.rb', line 42

def host;          @options.fetch(:host);          end

#max_in_flightObject



46
# File 'lib/mosq/client.rb', line 46

def max_in_flight; @options.fetch(:max_in_flight); end

#max_poll_intervalObject

The maximum time interval the user application should wait between yielding control back to the client object by calling methods like #run_loop! and #run_immediate!.



51
52
53
# File 'lib/mosq/client.rb', line 51

def max_poll_interval
  @options.fetch(:heartbeat) / 2.0
end

#on_event(type, callable = nil, &block) {|event| ... } ⇒ Proc, ... Also known as: on

Register a handler for events on the given channel of the given type. Only one handler for each event type may be registered at a time. If no callable or block is given, the handler will be cleared.

Yield Parameters:

  • event (Hash)

    The event passed to the handler.

Raises:

  • (ArgumentError)


117
118
119
120
121
122
123
124
# File 'lib/mosq/client.rb', line 117

def on_event(type, callable=nil, &block)
  handler = block || callable
  raise ArgumentError, "expected block or callable as the event handler" \
    unless handler.respond_to?(:call)
  
  @event_handlers[type.to_sym] = handler
  handler
end

#passwordObject



41
# File 'lib/mosq/client.rb', line 41

def password;      @options.fetch(:password);      end

#portObject



43
# File 'lib/mosq/client.rb', line 43

def port;          @options.fetch(:port);          end

#publish(topic, payload, qos: 0, retain: false) ⇒ Client

Publish a message with the given topic and payload.



180
181
182
183
184
185
186
187
188
# File 'lib/mosq/client.rb', line 180

def publish(topic, payload, qos: 0, retain: false)
  Util.error_check "publishing a message",
    FFI.mosquitto_publish(ptr, @packet_id_ptr,
      topic, payload.bytesize, payload, qos, retain)
  
  fetch_response(:publish, @packet_id_ptr.read_int)
  
  self
end

#publish_many(pairs, qos: 0, retain: false) ⇒ Client

Publish many pairs of topic/payload as messages. This is more performant than many calls to #publish, as the transactions occur concurrently.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/mosq/client.rb', line 239

def publish_many(pairs, qos: 0, retain: false)
  packet_ids = []
  pairs.each do |topic, payload|
    Util.error_check "publishing many messages",
      FFI.mosquitto_publish(ptr, @packet_id_ptr,
        topic, payload.bytesize, payload, qos, retain)
    
    packet_ids << @packet_id_ptr.read_int
  end
  
  fetch_responses(:publish, packet_ids)
  
  self
end

#run_immediate!Object

Yield control to the client object to do any connection-oriented work that needs to be done, including heartbeating. This is the same as calling #run_loop! with no block and a timeout of 0.



279
280
281
# File 'lib/mosq/client.rb', line 279

def run_immediate!
  run_loop!(timeout: 0)
end

#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined

Fetch and handle events in a loop that blocks the calling thread. The loop will continue until the #break! method is called from within an event handler, or until the given timeout duration has elapsed. Note that this must be called at least as frequently as the heartbeat interval to ensure that the client is not disconnected - if control is not yielded to the client transport heartbeats will not be maintained.



269
270
271
272
273
# File 'lib/mosq/client.rb', line 269

def run_loop!(timeout: protocol_timeout, &block)
  timeout = Float(timeout) if timeout
  fetch_events(timeout, &block)
  nil
end

#ssl?Boolean



44
# File 'lib/mosq/client.rb', line 44

def ssl?;          @options.fetch(:ssl);           end

#startObject

Initiate the connection with the server. It is necessary to call this before any other communication.

Raises:

  • (Mosq::FFI::Error::NoConn)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/mosq/client.rb', line 63

def start
  Util.error_check "configuring the maximum number of inflight messages",
    FFI.mosquitto_max_inflight_messages_set(ptr, @options[:max_in_flight])
  
  Util.error_check "configuring the username and password",
    FFI.mosquitto_username_pw_set(ptr, @options[:usernam], @options[:password])
  
  Util.error_check "connecting to #{@options[:host]}",
    FFI.mosquitto_connect(ptr, @options[:host], @options[:port], @options[:heartbeat])
  
  @ruby_socket = Socket.for_fd(FFI.mosquitto_socket(ptr))
  @ruby_socket.autoclose = false
  
  res = fetch_response(:connect, nil)
  raise Mosq::FFI::Error::NoConn, res.fetch(:message) \
    unless res.fetch(:status) == 0
  
  self
end

#subscribe(topic, qos: 0) ⇒ Client

Subscribe to the given topic. Messages with matching topic will be delivered to the :message event handler registered with #on_event.



149
150
151
152
153
154
155
156
# File 'lib/mosq/client.rb', line 149

def subscribe(topic, qos: 0)
  Util.error_check "subscribing to a topic",
    FFI.mosquitto_subscribe(ptr, @packet_id_ptr, topic, qos)
  
  fetch_response(:subscribe, @packet_id_ptr.read_int)
  
  self
end

#subscribe_many(topics, qos: 0) ⇒ Client

Subscribe to many topics. This is more performant than many calls to #subscribe, as the transactions occur concurrently.



197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/mosq/client.rb', line 197

def subscribe_many(topics, qos: 0)
  packet_ids = []
  topics.each do |topic|
    Util.error_check "subscribing to many topics",
      FFI.mosquitto_subscribe(ptr, @packet_id_ptr, topic, qos)
    
    packet_ids << @packet_id_ptr.read_int
  end
  
  fetch_responses(:subscribe, packet_ids)
  
  self
end

#unsubscribe(topic) ⇒ Client

Unsubscribe from the given topic.



163
164
165
166
167
168
169
170
# File 'lib/mosq/client.rb', line 163

def unsubscribe(topic)
  Util.error_check "unsubscribing from a topic",
    FFI.mosquitto_unsubscribe(ptr, @packet_id_ptr, topic)
  
  fetch_response(:unsubscribe, @packet_id_ptr.read_int)
  
  self
end

#unsubscribe_many(topics) ⇒ Client

Unsubscribe from many topics. This is more performant than many calls to #unsubscribe, as the transactions occur concurrently.



217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/mosq/client.rb', line 217

def unsubscribe_many(topics)
  packet_ids = []
  topics.each do |topic|
    Util.error_check "subscribing to many topics",
      FFI.mosquitto_unsubscribe(ptr, @packet_id_ptr, topic)
    
    packet_ids << @packet_id_ptr.read_int
  end
  
  fetch_responses(:unsubscribe, packet_ids)
  
  self
end

#usernameObject



40
# File 'lib/mosq/client.rb', line 40

def username;      @options.fetch(:username);      end