Class: Mosq::Client
- Inherits:
-
Object
- Object
- Mosq::Client
- Defined in:
- lib/mosq/client.rb,
lib/mosq/client/bucket.rb
Defined Under Namespace
Classes: Bucket, DestroyedError
Constant Summary collapse
- DEFAULT_PROTOCOL_TIMEOUT =
seconds
30
Instance Attribute Summary collapse
-
#protocol_timeout ⇒ Object
The timeout to use when waiting for protocol events, in seconds.
Class Method Summary collapse
Instance Method Summary collapse
-
#break! ⇒ nil
Stop iterating from within an execution of the #run_loop! method.
-
#clear_event_handler(type) ⇒ Proc?
Unregister the event handler associated with the given channel and method.
-
#close ⇒ Object
Gracefully close the connection with the server.
-
#destroy ⇒ Object
Free the native resources associated with this object.
- #heartbeat ⇒ Object
- #host ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
Create a new Client instance with the given properties.
- #max_in_flight ⇒ Object
-
#max_poll_interval ⇒ Object
The maximum time interval the user application should wait between yielding control back to the client object by calling methods like #run_loop! and #run_immediate!.
-
#on_event(type, callable = nil, &block) {|event| ... } ⇒ Proc, ...
(also: #on)
Register a handler for events on the given channel of the given type.
- #password ⇒ Object
- #port ⇒ Object
-
#publish(topic, payload, qos: 0, retain: false) ⇒ Client
Publish a message with the given topic and payload.
-
#publish_many(pairs, qos: 0, retain: false) ⇒ Client
Publish many pairs of topic/payload as messages.
-
#run_immediate! ⇒ Object
Yield control to the client object to do any connection-oriented work that needs to be done, including heartbeating.
-
#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined
Fetch and handle events in a loop that blocks the calling thread.
- #ssl? ⇒ Boolean
-
#start ⇒ Object
Initiate the connection with the server.
-
#subscribe(topic, qos: 0) ⇒ Client
Subscribe to the given topic.
-
#subscribe_many(topics, qos: 0) ⇒ Client
Subscribe to many topics.
-
#unsubscribe(topic) ⇒ Client
Unsubscribe from the given topic.
-
#unsubscribe_many(topics) ⇒ Client
Unsubscribe from many topics.
- #username ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Create a new Mosq::Client instance with the given properties.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/mosq/client.rb', line 14 def initialize(*args) = Util.connection_info(*args) [:max_in_flight] ||= 20 # messages [:heartbeat] ||= 30 # seconds @protocol_timeout = DEFAULT_PROTOCOL_TIMEOUT Util.null_check "creating the client", (@ptr = FFI.mosquitto_new([:client_id], true, nil)) @bucket = Bucket.new(@ptr) @event_handlers = {} @packet_id_ptr = Util.mem_ptr(:int) @finalizer = self.class.create_finalizer_for(@ptr) ObjectSpace.define_finalizer(self, @finalizer) end |
Instance Attribute Details
#protocol_timeout ⇒ Object
The timeout to use when waiting for protocol events, in seconds. By default, this has the value of DEFAULT_PROTOCOL_TIMEOUT. When set, it affects operations like #run_loop!.
144 145 146 |
# File 'lib/mosq/client.rb', line 144 def protocol_timeout @protocol_timeout end |
Class Method Details
.create_finalizer_for(ptr) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
34 35 36 37 38 |
# File 'lib/mosq/client.rb', line 34 def self.create_finalizer_for(ptr) Proc.new do FFI.mosquitto_destroy(ptr) end end |
Instance Method Details
#break! ⇒ nil
Stop iterating from within an execution of the #run_loop! method. Call this method only from within an event handler. It will take effect only after the handler finishes running.
294 295 296 297 |
# File 'lib/mosq/client.rb', line 294 def break! @breaking = true nil end |
#clear_event_handler(type) ⇒ Proc?
Unregister the event handler associated with the given channel and method.
137 138 139 |
# File 'lib/mosq/client.rb', line 137 def clear_event_handler(type) @event_handlers.delete(type.to_sym) end |
#close ⇒ Object
Gracefully close the connection with the server.
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/mosq/client.rb', line 89 def close @ruby_socket = nil Util.error_check "closing the connection to #{@options[:host]}", FFI.mosquitto_disconnect(ptr) self rescue Mosq::FFI::Error::NoConn self end |
#destroy ⇒ Object
Free the native resources associated with this object. This will be done automatically on garbage collection if not called explicitly.
102 103 104 105 106 107 108 109 110 |
# File 'lib/mosq/client.rb', line 102 def destroy if @finalizer @finalizer.call ObjectSpace.undefine_finalizer(self) end @ptr = @finalizer = @ruby_socket = @bucket = nil self end |
#heartbeat ⇒ Object
45 |
# File 'lib/mosq/client.rb', line 45 def heartbeat; .fetch(:heartbeat); end |
#host ⇒ Object
42 |
# File 'lib/mosq/client.rb', line 42 def host; .fetch(:host); end |
#max_in_flight ⇒ Object
46 |
# File 'lib/mosq/client.rb', line 46 def max_in_flight; .fetch(:max_in_flight); end |
#max_poll_interval ⇒ Object
The maximum time interval the user application should wait between yielding control back to the client object by calling methods like #run_loop! and #run_immediate!.
51 52 53 |
# File 'lib/mosq/client.rb', line 51 def max_poll_interval .fetch(:heartbeat) / 2.0 end |
#on_event(type, callable = nil, &block) {|event| ... } ⇒ Proc, ... Also known as: on
Register a handler for events on the given channel of the given type. Only one handler for each event type may be registered at a time. If no callable or block is given, the handler will be cleared.
122 123 124 125 126 127 128 129 |
# File 'lib/mosq/client.rb', line 122 def on_event(type, callable=nil, &block) handler = block || callable raise ArgumentError, "expected block or callable as the event handler" \ unless handler.respond_to?(:call) @event_handlers[type.to_sym] = handler handler end |
#password ⇒ Object
41 |
# File 'lib/mosq/client.rb', line 41 def password; .fetch(:password); end |
#port ⇒ Object
43 |
# File 'lib/mosq/client.rb', line 43 def port; .fetch(:port); end |
#publish(topic, payload, qos: 0, retain: false) ⇒ Client
Publish a message with the given topic and payload.
185 186 187 188 189 190 191 192 193 |
# File 'lib/mosq/client.rb', line 185 def publish(topic, payload, qos: 0, retain: false) Util.error_check "publishing a message", FFI.mosquitto_publish(ptr, @packet_id_ptr, topic, payload.bytesize, payload, qos, retain) fetch_response(:publish, @packet_id_ptr.read_int) self end |
#publish_many(pairs, qos: 0, retain: false) ⇒ Client
Publish many pairs of topic/payload as messages. This is more performant than many calls to #publish, as the transactions occur concurrently.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/mosq/client.rb', line 244 def publish_many(pairs, qos: 0, retain: false) packet_ids = [] pairs.each do |topic, payload| Util.error_check "publishing many messages", FFI.mosquitto_publish(ptr, @packet_id_ptr, topic, payload.bytesize, payload, qos, retain) packet_ids << @packet_id_ptr.read_int end fetch_responses(:publish, packet_ids) self end |
#run_immediate! ⇒ Object
Yield control to the client object to do any connection-oriented work that needs to be done, including heartbeating. This is the same as calling #run_loop! with no block and a timeout of 0.
284 285 286 |
# File 'lib/mosq/client.rb', line 284 def run_immediate! run_loop!(timeout: 0) end |
#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined
Fetch and handle events in a loop that blocks the calling thread. The loop will continue until the #break! method is called from within an event handler, or until the given timeout duration has elapsed. Note that this must be called at least as frequently as the heartbeat interval to ensure that the client is not disconnected - if control is not yielded to the client transport heartbeats will not be maintained.
274 275 276 277 278 |
# File 'lib/mosq/client.rb', line 274 def run_loop!(timeout: protocol_timeout, &block) timeout = Float(timeout) if timeout fetch_events(timeout, &block) nil end |
#ssl? ⇒ Boolean
44 |
# File 'lib/mosq/client.rb', line 44 def ssl?; .fetch(:ssl); end |
#start ⇒ Object
Initiate the connection with the server. It is necessary to call this before any other communication.
75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/mosq/client.rb', line 75 def start start_configure @ruby_socket = Socket.for_fd(FFI.mosquitto_socket(ptr)) @ruby_socket.autoclose = false res = fetch_response(:connect, nil) raise Mosq::FFI::Error::NoConn, res.fetch(:message) \ unless res.fetch(:status) == 0 self end |
#subscribe(topic, qos: 0) ⇒ Client
Subscribe to the given topic. Messages with matching topic will be delivered to the :message event handler registered with #on_event.
154 155 156 157 158 159 160 161 |
# File 'lib/mosq/client.rb', line 154 def subscribe(topic, qos: 0) Util.error_check "subscribing to a topic", FFI.mosquitto_subscribe(ptr, @packet_id_ptr, topic, qos) fetch_response(:subscribe, @packet_id_ptr.read_int) self end |
#subscribe_many(topics, qos: 0) ⇒ Client
Subscribe to many topics. This is more performant than many calls to #subscribe, as the transactions occur concurrently.
202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/mosq/client.rb', line 202 def subscribe_many(topics, qos: 0) packet_ids = [] topics.each do |topic| Util.error_check "subscribing to many topics", FFI.mosquitto_subscribe(ptr, @packet_id_ptr, topic, qos) packet_ids << @packet_id_ptr.read_int end fetch_responses(:subscribe, packet_ids) self end |
#unsubscribe(topic) ⇒ Client
Unsubscribe from the given topic.
168 169 170 171 172 173 174 175 |
# File 'lib/mosq/client.rb', line 168 def unsubscribe(topic) Util.error_check "unsubscribing from a topic", FFI.mosquitto_unsubscribe(ptr, @packet_id_ptr, topic) fetch_response(:unsubscribe, @packet_id_ptr.read_int) self end |
#unsubscribe_many(topics) ⇒ Client
Unsubscribe from many topics. This is more performant than many calls to #unsubscribe, as the transactions occur concurrently.
222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/mosq/client.rb', line 222 def unsubscribe_many(topics) packet_ids = [] topics.each do |topic| Util.error_check "subscribing to many topics", FFI.mosquitto_unsubscribe(ptr, @packet_id_ptr, topic) packet_ids << @packet_id_ptr.read_int end fetch_responses(:unsubscribe, packet_ids) self end |
#username ⇒ Object
40 |
# File 'lib/mosq/client.rb', line 40 def username; .fetch(:username); end |