Class: Mosq::Client
- Inherits:
-
Object
- Object
- Mosq::Client
- Defined in:
- lib/mosq/client.rb,
lib/mosq/client/bucket.rb
Defined Under Namespace
Classes: Bucket, DestroyedError
Constant Summary collapse
- DEFAULT_PROTOCOL_TIMEOUT =
seconds
30
Instance Attribute Summary collapse
-
#protocol_timeout ⇒ Object
The timeout to use when waiting for protocol events, in seconds.
Class Method Summary collapse
Instance Method Summary collapse
-
#break! ⇒ nil
Stop iterating from within an execution of the #run_loop! method.
-
#clear_event_handler(type) ⇒ Proc?
Unregister the event handler associated with the given channel and method.
-
#close ⇒ Object
Gracefully close the connection with the server.
-
#destroy ⇒ Object
Free the native resources associated with this object.
- #heartbeat ⇒ Object
- #host ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
Create a new Client instance with the given properties.
- #max_in_flight ⇒ Object
-
#max_poll_interval ⇒ Object
The maximum time interval the user application should wait between yielding control back to the client object by calling methods like #run_loop! and #run_immediate!.
-
#on_event(type, callable = nil, &block) {|event| ... } ⇒ Proc, ...
(also: #on)
Register a handler for events on the given channel of the given type.
- #password ⇒ Object
- #port ⇒ Object
-
#publish(topic, payload, qos: 0, retain: false) ⇒ Client
Publish a message with the given topic and payload.
-
#publish_many(pairs, qos: 0, retain: false) ⇒ Client
Publish many pairs of topic/payload as messages.
-
#run_immediate! ⇒ Object
Yield control to the client object to do any connection-oriented work that needs to be done, including heartbeating.
-
#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined
Fetch and handle events in a loop that blocks the calling thread.
- #ssl? ⇒ Boolean
-
#start ⇒ Object
Initiate the connection with the server.
-
#subscribe(topic, qos: 0) ⇒ Client
Subscribe to the given topic.
-
#subscribe_many(topics, qos: 0) ⇒ Client
Subscribe to many topics.
-
#unsubscribe(topic) ⇒ Client
Unsubscribe from the given topic.
-
#unsubscribe_many(topics) ⇒ Client
Unsubscribe from many topics.
- #username ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Create a new Mosq::Client instance with the given properties.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/mosq/client.rb', line 14 def initialize(*args) = Util.connection_info(*args) [:max_in_flight] ||= 20 # messages [:heartbeat] ||= 30 # seconds @protocol_timeout = DEFAULT_PROTOCOL_TIMEOUT Util.null_check "creating the client", (@ptr = FFI.mosquitto_new([:client_id], true, nil)) @bucket = Bucket.new(@ptr) @event_handlers = {} @packet_id_ptr = Util.mem_ptr(:int) @finalizer = self.class.create_finalizer_for(@ptr) ObjectSpace.define_finalizer(self, @finalizer) end |
Instance Attribute Details
#protocol_timeout ⇒ Object
The timeout to use when waiting for protocol events, in seconds. By default, this has the value of DEFAULT_PROTOCOL_TIMEOUT. When set, it affects operations like #run_loop!.
139 140 141 |
# File 'lib/mosq/client.rb', line 139 def protocol_timeout @protocol_timeout end |
Class Method Details
.create_finalizer_for(ptr) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
34 35 36 37 38 |
# File 'lib/mosq/client.rb', line 34 def self.create_finalizer_for(ptr) Proc.new do FFI.mosquitto_destroy(ptr) end end |
Instance Method Details
#break! ⇒ nil
Stop iterating from within an execution of the #run_loop! method. Call this method only from within an event handler. It will take effect only after the handler finishes running.
289 290 291 292 |
# File 'lib/mosq/client.rb', line 289 def break! @breaking = true nil end |
#clear_event_handler(type) ⇒ Proc?
Unregister the event handler associated with the given channel and method.
132 133 134 |
# File 'lib/mosq/client.rb', line 132 def clear_event_handler(type) @event_handlers.delete(type.to_sym) end |
#close ⇒ Object
Gracefully close the connection with the server.
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/mosq/client.rb', line 84 def close @ruby_socket = nil Util.error_check "closing the connection to #{@options[:host]}", FFI.mosquitto_disconnect(ptr) self rescue Mosq::FFI::Error::NoConn self end |
#destroy ⇒ Object
Free the native resources associated with this object. This will be done automatically on garbage collection if not called explicitly.
97 98 99 100 101 102 103 104 105 |
# File 'lib/mosq/client.rb', line 97 def destroy if @finalizer @finalizer.call ObjectSpace.undefine_finalizer(self) end @ptr = @finalizer = @ruby_socket = @bucket = nil self end |
#heartbeat ⇒ Object
45 |
# File 'lib/mosq/client.rb', line 45 def heartbeat; .fetch(:heartbeat); end |
#host ⇒ Object
42 |
# File 'lib/mosq/client.rb', line 42 def host; .fetch(:host); end |
#max_in_flight ⇒ Object
46 |
# File 'lib/mosq/client.rb', line 46 def max_in_flight; .fetch(:max_in_flight); end |
#max_poll_interval ⇒ Object
The maximum time interval the user application should wait between yielding control back to the client object by calling methods like #run_loop! and #run_immediate!.
51 52 53 |
# File 'lib/mosq/client.rb', line 51 def max_poll_interval .fetch(:heartbeat) / 2.0 end |
#on_event(type, callable = nil, &block) {|event| ... } ⇒ Proc, ... Also known as: on
Register a handler for events on the given channel of the given type. Only one handler for each event type may be registered at a time. If no callable or block is given, the handler will be cleared.
117 118 119 120 121 122 123 124 |
# File 'lib/mosq/client.rb', line 117 def on_event(type, callable=nil, &block) handler = block || callable raise ArgumentError, "expected block or callable as the event handler" \ unless handler.respond_to?(:call) @event_handlers[type.to_sym] = handler handler end |
#password ⇒ Object
41 |
# File 'lib/mosq/client.rb', line 41 def password; .fetch(:password); end |
#port ⇒ Object
43 |
# File 'lib/mosq/client.rb', line 43 def port; .fetch(:port); end |
#publish(topic, payload, qos: 0, retain: false) ⇒ Client
Publish a message with the given topic and payload.
180 181 182 183 184 185 186 187 188 |
# File 'lib/mosq/client.rb', line 180 def publish(topic, payload, qos: 0, retain: false) Util.error_check "publishing a message", FFI.mosquitto_publish(ptr, @packet_id_ptr, topic, payload.bytesize, payload, qos, retain) fetch_response(:publish, @packet_id_ptr.read_int) self end |
#publish_many(pairs, qos: 0, retain: false) ⇒ Client
Publish many pairs of topic/payload as messages. This is more performant than many calls to #publish, as the transactions occur concurrently.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/mosq/client.rb', line 239 def publish_many(pairs, qos: 0, retain: false) packet_ids = [] pairs.each do |topic, payload| Util.error_check "publishing many messages", FFI.mosquitto_publish(ptr, @packet_id_ptr, topic, payload.bytesize, payload, qos, retain) packet_ids << @packet_id_ptr.read_int end fetch_responses(:publish, packet_ids) self end |
#run_immediate! ⇒ Object
Yield control to the client object to do any connection-oriented work that needs to be done, including heartbeating. This is the same as calling #run_loop! with no block and a timeout of 0.
279 280 281 |
# File 'lib/mosq/client.rb', line 279 def run_immediate! run_loop!(timeout: 0) end |
#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined
Fetch and handle events in a loop that blocks the calling thread. The loop will continue until the #break! method is called from within an event handler, or until the given timeout duration has elapsed. Note that this must be called at least as frequently as the heartbeat interval to ensure that the client is not disconnected - if control is not yielded to the client transport heartbeats will not be maintained.
269 270 271 272 273 |
# File 'lib/mosq/client.rb', line 269 def run_loop!(timeout: protocol_timeout, &block) timeout = Float(timeout) if timeout fetch_events(timeout, &block) nil end |
#ssl? ⇒ Boolean
44 |
# File 'lib/mosq/client.rb', line 44 def ssl?; .fetch(:ssl); end |
#start ⇒ Object
Initiate the connection with the server. It is necessary to call this before any other communication.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/mosq/client.rb', line 63 def start Util.error_check "configuring the maximum number of inflight messages", FFI.(ptr, [:max_in_flight]) Util.error_check "configuring the username and password", FFI.mosquitto_username_pw_set(ptr, [:usernam], [:password]) Util.error_check "connecting to #{@options[:host]}", FFI.mosquitto_connect(ptr, [:host], [:port], [:heartbeat]) @ruby_socket = Socket.for_fd(FFI.mosquitto_socket(ptr)) @ruby_socket.autoclose = false res = fetch_response(:connect, nil) raise Mosq::FFI::Error::NoConn, res.fetch(:message) \ unless res.fetch(:status) == 0 self end |
#subscribe(topic, qos: 0) ⇒ Client
Subscribe to the given topic. Messages with matching topic will be delivered to the :message event handler registered with #on_event.
149 150 151 152 153 154 155 156 |
# File 'lib/mosq/client.rb', line 149 def subscribe(topic, qos: 0) Util.error_check "subscribing to a topic", FFI.mosquitto_subscribe(ptr, @packet_id_ptr, topic, qos) fetch_response(:subscribe, @packet_id_ptr.read_int) self end |
#subscribe_many(topics, qos: 0) ⇒ Client
Subscribe to many topics. This is more performant than many calls to #subscribe, as the transactions occur concurrently.
197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/mosq/client.rb', line 197 def subscribe_many(topics, qos: 0) packet_ids = [] topics.each do |topic| Util.error_check "subscribing to many topics", FFI.mosquitto_subscribe(ptr, @packet_id_ptr, topic, qos) packet_ids << @packet_id_ptr.read_int end fetch_responses(:subscribe, packet_ids) self end |
#unsubscribe(topic) ⇒ Client
Unsubscribe from the given topic.
163 164 165 166 167 168 169 170 |
# File 'lib/mosq/client.rb', line 163 def unsubscribe(topic) Util.error_check "unsubscribing from a topic", FFI.mosquitto_unsubscribe(ptr, @packet_id_ptr, topic) fetch_response(:unsubscribe, @packet_id_ptr.read_int) self end |
#unsubscribe_many(topics) ⇒ Client
Unsubscribe from many topics. This is more performant than many calls to #unsubscribe, as the transactions occur concurrently.
217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/mosq/client.rb', line 217 def unsubscribe_many(topics) packet_ids = [] topics.each do |topic| Util.error_check "subscribing to many topics", FFI.mosquitto_unsubscribe(ptr, @packet_id_ptr, topic) packet_ids << @packet_id_ptr.read_int end fetch_responses(:unsubscribe, packet_ids) self end |
#username ⇒ Object
40 |
# File 'lib/mosq/client.rb', line 40 def username; .fetch(:username); end |