Class: Mosquitto::Client

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
ext/mosquitto/client.c,
lib/mosquitto/client.rb

Constant Summary

Constants included from Logging

Logging::LOG_LEVELS

Instance Attribute Summary

Attributes included from Logging

#logger

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log

Class Method Details

.Mosquitto::Client.new("some-id") ⇒ Mosquitto::Client

Note:

As per the MQTT spec, client identifiers cannot exceed 23 characters

Create a new mosquitto client instance.

Examples:

Mosquitto::Client.new("session_id") -> Mosquitto::Client
Mosquitto::Client.new(nil, true) -> Mosquitto::Client

Returns:

Parameters:

  • identifier (String)

    the client identifier. Set to nil to have a random one generated. clean_session must be true if the identifier is nil.

  • clean_session (true, false)

    set to true to instruct the broker to clean all messages and subscriptions on disconnect, false to instruct it to keep them

Returns:

Raises:



474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
# File 'ext/mosquitto/client.c', line 474

static VALUE rb_mosquitto_client_s_new(int argc, VALUE *argv, VALUE client)
{
    VALUE client_id;
    VALUE cl_session;
    char *cl_id = NULL;
    mosquitto_client_wrapper *cl = NULL;
    bool clean_session;
    rb_scan_args(argc, argv, "02", &client_id, &cl_session);
    if (NIL_P(client_id)) {
        clean_session = true;
    } else {
        clean_session = false;
        Check_Type(client_id, T_STRING);
        MosquittoEncode(client_id);
        cl_id = StringValueCStr(client_id);
    }
    client = Data_Make_Struct(rb_cMosquittoClient, mosquitto_client_wrapper, rb_mosquitto_mark_client, rb_mosquitto_free_client, cl);
    cl->mosq = mosquitto_new(cl_id, clean_session, (void *)cl);
    if (cl->mosq == NULL) {
        xfree(cl);
        switch (errno) {
            case EINVAL:
                MosquittoError("invalid input params");
                break;
            case ENOMEM:
                rb_memerror();
                break;
            default:
                return Qfalse;
        }
    }
    cl->connect_cb = Qnil;
    cl->disconnect_cb = Qnil;
    cl->publish_cb = Qnil;
    cl->message_cb = Qnil;
    cl->subscribe_cb = Qnil;
    cl->unsubscribe_cb = Qnil;
    cl->log_cb = Qnil;
    cl->callback_thread = Qnil;
    cl->callback_queue = NULL;
    cl->waiter = NULL;
    rb_obj_call_init(client, 0, NULL);
    return client;
}

Instance Method Details

#auth("username", "password") ⇒ Boolean

Note:

This must be called before calling Mosquitto::Client#connect

Configure username and password for a mosquitto instance. This is only supported by brokers that implement the MQTT spec v3.1. By default, no username or password will be sent.

Examples:

client.auth("username", "password")

Returns:

  • (Boolean)

Parameters:

  • username (String)

    the username to send, or nil to disable authentication.

  • password (String)

    the password to send. Set to nil when username is valid in order to send just a username.

Returns:

  • (true)

    on success

Raises:



668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
# File 'ext/mosquitto/client.c', line 668

static VALUE rb_mosquitto_client_auth(VALUE obj, VALUE username, VALUE password)
{
    int ret;
    MosquittoGetClient(obj);
    if (!NIL_P(username)) {
        Check_Type(username, T_STRING);
        MosquittoEncode(username);
    }
    if (!NIL_P(password)) {
        Check_Type(password, T_STRING);
        MosquittoEncode(password);
    }
    ret = mosquitto_username_pw_set(client->mosq, (NIL_P(username) ? NULL : StringValueCStr(username)), (NIL_P(password) ? NULL : StringValueCStr(password)));
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       default:
           return Qtrue;
    }
}

#connect("localhost", 1883, 10) ⇒ Boolean

Connect to an MQTT broker.

Examples:

client.connect("localhost", 1883, 10)

Returns:

  • (Boolean)

Parameters:

  • host (String)

    the hostname or ip address of the broker to connect to.

  • port (Integer)

    the network port to connect to. Usually 1883 (or 8883 for TLS)

  • keepalive (Integer)

    the number of seconds after which the broker should send a PING message to the client if no other messages have been exchanged in that time.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
# File 'ext/mosquitto/client.c', line 935

static VALUE rb_mosquitto_client_connect(VALUE obj, VALUE host, VALUE port, VALUE keepalive)
{
    struct nogvl_connect_args args;
    int ret;
    MosquittoGetClient(obj);
    Check_Type(host, T_STRING);
    MosquittoEncode(host);
    Check_Type(port, T_FIXNUM);
    Check_Type(keepalive, T_FIXNUM);
    args.mosq = client->mosq;
    args.host = StringValueCStr(host);
    args.port = NUM2INT(port);
    args.keepalive = NUM2INT(keepalive);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_connect_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_connect");
           break;
       default:
           return Qtrue;
    }
}

#connect_async("localhost", 1883, 10) ⇒ Boolean

Connect to an MQTT broker. This is a non-blocking call. If you use Mosquitto::Client#connect_async your client must use the threaded interface Mosquitto::Client#loop_start. If you need to use Mosquitto::Client#loop, you must use Mosquitto::Client#connect to connect the client.

Examples:

client.connect_async("localhost", 1883, 10)

Returns:

  • (Boolean)

Parameters:

  • host (String)

    the hostname or ip address of the broker to connect to.

  • port (Integer)

    the network port to connect to. Usually 1883 (or 8883 for TLS)

  • keepalive (Integer)

    the number of seconds after which the broker should send a PING message to the client if no other messages have been exchanged in that time.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
# File 'ext/mosquitto/client.c', line 1039

static VALUE rb_mosquitto_client_connect_async(VALUE obj, VALUE host, VALUE port, VALUE keepalive)
{
    struct nogvl_connect_args args;
    int ret;
    MosquittoGetClient(obj);
    Check_Type(host, T_STRING);
    MosquittoEncode(host);
    Check_Type(port, T_FIXNUM);
    Check_Type(keepalive, T_FIXNUM);
    args.mosq = client->mosq;
    args.host = StringValueCStr(host);
    args.port = NUM2INT(port);
    args.keepalive = NUM2INT(keepalive);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_connect_async_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_connect_async");
           break;
       default:
           return Qtrue;
    }
}

#connect_bind("localhost", 1883, 10, "10.0.0.3") ⇒ Boolean

Connect to an MQTT broker. This extends the functionality of Mosquitto::Client#connect by adding the bind_address parameter. Use this function if you need to restrict network communication over a particular interface.

Examples:

client.connect_bind("localhost", 1883, 10, "10.0.0.3")

Returns:

  • (Boolean)

Parameters:

  • host (String)

    the hostname or ip address of the broker to connect to.

  • port (Integer)

    the network port to connect to. Usually 1883 (or 8883 for TLS)

  • keepalive (Integer)

    the number of seconds after which the broker should send a PING message to the client if no other messages have been exchanged in that time.

  • bind_address (String)

    the hostname or ip address of the local network interface to bind to

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
# File 'ext/mosquitto/client.c', line 985

static VALUE rb_mosquitto_client_connect_bind(VALUE obj, VALUE host, VALUE port, VALUE keepalive, VALUE bind_address)
{
    struct nogvl_connect_args args;
    int ret;
    MosquittoGetClient(obj);
    Check_Type(host, T_STRING);
    MosquittoEncode(host);
    Check_Type(port, T_FIXNUM);
    Check_Type(keepalive, T_FIXNUM);
    Check_Type(bind_address, T_STRING);
    MosquittoEncode(bind_address);
    args.mosq = client->mosq;
    args.host = StringValueCStr(host);
    args.port = NUM2INT(port);
    args.keepalive = NUM2INT(keepalive);
    args.bind_address = StringValueCStr(bind_address);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_connect_bind_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_connect_bind");
           break;
       default:
           return Qtrue;
    }
}

#connect_bind_async("localhost", 1883, 10, "10.0.0.3") ⇒ Boolean

Connect to an MQTT broker. This is a non-blocking call. If you use Mosquitto::Client#connect_async your client must use the threaded interface Mosquitto::Client#loop_start. If you need to use Mosquitto::Client#loop, you must use Mosquitto::Client#connect to connect the client.

This extends the functionality of Mosquitto::Client#connect_async by adding the bind_address parameter. Use this function if you need to restrict network communication over a particular interface.

Examples:

client.connect_bind_async("localhost", 1883, 10, "10.0.0.3")

Returns:

  • (Boolean)

Parameters:

  • host (String)

    the hostname or ip address of the broker to connect to.

  • port (Integer)

    the network port to connect to. Usually 1883 (or 8883 for TLS)

  • keepalive (Integer)

    the number of seconds after which the broker should send a PING message to the client if no other messages have been exchanged in that time.

  • bind_address (String)

    the hostname or ip address of the local network interface to bind to

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
# File 'ext/mosquitto/client.c', line 1095

static VALUE rb_mosquitto_client_connect_bind_async(VALUE obj, VALUE host, VALUE port, VALUE keepalive, VALUE bind_address)
{
    struct nogvl_connect_args args;
    int ret;
    MosquittoGetClient(obj);
    Check_Type(host, T_STRING);
    MosquittoEncode(host);
    Check_Type(port, T_FIXNUM);
    Check_Type(keepalive, T_FIXNUM);
    Check_Type(bind_address, T_STRING);
    MosquittoEncode(bind_address);
    args.mosq = client->mosq;
    args.host = StringValueCStr(host);
    args.port = NUM2INT(port);
    args.keepalive = NUM2INT(keepalive);
    args.bind_address = StringValueCStr(bind_address);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_connect_bind_async_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_connect_bind_async");
           break;
       default:
           return Qtrue;
    }
}

#destroyBoolean

Free memory associated with a mosquitto client instance. Used in integration tests only.

Examples:

client.destroy

Returns:

  • (Boolean)

Returns:

  • (true)

    true when memory freed



1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
# File 'ext/mosquitto/client.c', line 1838

static VALUE rb_mosquitto_client_destroy(VALUE obj)
{
    MosquittoGetClient(obj);
    if (!NIL_P(client->callback_thread)) {
        mosquitto_stop_waiting_for_callbacks(client);
        mosquitto_loop_stop(client->mosq, true);
        rb_mosquitto_client_reap_event_thread(client);
    }
    mosquitto_destroy(client->mosq);
    client->mosq = NULL;
    return Qtrue;
}

#disconnecttrue

Disconnect from the broker.

Examples:

client.disconnect

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or if the client is not connected



1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
# File 'ext/mosquitto/client.c', line 1179

static VALUE rb_mosquitto_client_disconnect(VALUE obj)
{
    int ret;
    bool retried = false;
    struct timeval time;
    MosquittoGetClient(obj);
  retry_once:
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_disconnect_nogvl, (void *)client->mosq, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NO_CONN:
           RetryNotConnectedOnce();
           MosquittoError("client not connected to broker");
           break;
       default:
           return Qtrue;
    }
}

#loop(10, 10) ⇒ Boolean

The main network loop for the client. You must call this frequently in order to keep communications between the client and broker working. If incoming data is present it will then be processed. Outgoing commands, from e.g. Mosquitto::Client#publish, are normally sent immediately that their function is called, but this is not always possible. Mosquitto::Client#loop will also attempt to send any remaining outgoing messages, which also includes commands that are part of the flow for messages with QoS>0.

An alternative approach is to use Mosquitto::Client#loop_start to run the client loop in its own thread.

This calls select() to monitor the client network socket. If you want to integrate mosquitto client operation with your own select() call, use Mosquitto::Client#socket, Mosquitto::Client#loop_read, Mosquitto::Client#loop_write and Mosquitto::Client#loop_misc.

Examples:

client.loop(10, 10)

Returns:

  • (Boolean)

Parameters:

  • timeout (Integer)

    Maximum number of milliseconds to wait for network activity in the select() call before timing out. Set to 0 for instant return. Set negative to use the default of 1000ms

  • max_packets (Integer)

    this parameter is currently unused and should be set to 1 for future compatibility.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
# File 'ext/mosquitto/client.c', line 1445

static VALUE rb_mosquitto_client_loop(VALUE obj, VALUE timeout, VALUE max_packets)
{
    struct nogvl_loop_args args;
    int ret;
    struct timeval time;
    bool retried = false;
    MosquittoGetClient(obj);
    Check_Type(timeout, T_FIXNUM);
    Check_Type(max_packets, T_FIXNUM);
    args.mosq = client->mosq;
    args.timeout = NUM2INT(timeout);
    args.max_packets = NUM2INT(max_packets);
  retry_once:
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NO_CONN:
           RetryNotConnectedOnce();
           MosquittoError("client not connected to broker");
           break;
       case MOSQ_ERR_CONN_LOST:
           MosquittoError("connection to the broker was lost");
           break;
       case MOSQ_ERR_PROTOCOL:
           MosquittoError("protocol error communicating with the broker");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_loop");
           break;
       default:
           return Qtrue;
    }
}

#loop_forever(10, 1) ⇒ Boolean

This function calls Mosquitto::Client#loop for you in an infinite blocking loop. It is useful for the case where you only want to run the MQTT client loop in your program.

It handles reconnecting in case server connection is lost. If you call Mosquitto::Client#disconnect in a callback it will return.

Examples:

client.loop_forever(10, 1)

Returns:

  • (Boolean)

Parameters:

  • timeout (Integer)

    Maximum number of milliseconds to wait for network activity in the select() call before timing out. Set to 0 for instant return. Set negative to use the default of 1000ms

  • max_packets (Integer)

    this parameter is currently unused and should be set to 1 for future compatibility.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
# File 'ext/mosquitto/client.c', line 1516

static VALUE rb_mosquitto_client_loop_forever(VALUE obj, VALUE timeout, VALUE max_packets)
{
    struct nogvl_loop_args args;
    int ret;
    struct timeval time;
    bool retried = false;
    MosquittoGetClient(obj);
    Check_Type(timeout, T_FIXNUM);
    Check_Type(max_packets, T_FIXNUM);
    args.mosq = client->mosq;
    args.timeout = NUM2INT(timeout);
    args.max_packets = NUM2INT(max_packets);
  retry_once:
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_forever_nogvl, (void *)&args, rb_mosquitto_client_loop_forever_ubf, client);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NO_CONN:
           RetryNotConnectedOnce();
           MosquittoError("client not connected to broker");
           break;
       case MOSQ_ERR_CONN_LOST:
           MosquittoError("connection to the broker was lost");
           break;
       case MOSQ_ERR_PROTOCOL:
           MosquittoError("protocol error communicating with the broker");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_loop");
           break;
       default:
           return Qtrue;
    }
}

#loop_miscBoolean

Carry out miscellaneous operations required as part of the network loop. This should only be used if you are not using Mosquitto::Client#loop and are monitoring the client network socket for activity yourself.

This function deals with handling PINGs and checking whether messages need to be retried, so should be called fairly frequently.

Examples:

client.loop_misc

Returns:

  • (Boolean)

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error)

    on invalid input params or when not connected to the broker



1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
# File 'ext/mosquitto/client.c', line 1791

static VALUE rb_mosquitto_client_loop_misc(VALUE obj)
{
    int ret;
    MosquittoGetClient(obj);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_misc_nogvl, (void *)client->mosq, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NO_CONN:
           MosquittoError("client not connected to broker");
           break;
       default:
           return Qtrue;
    }
}

#loop_read(10) ⇒ Boolean

Carry out network read operations. This should only be used if you are not using Mosquitto::Client#loop and are monitoring the client network socket for activity yourself.

Examples:

client.loop_read(10)

Returns:

  • (Boolean)

Parameters:

  • max_packets (Integer)

    this parameter is currently unused and should be set to 1 for future compatibility.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
# File 'ext/mosquitto/client.c', line 1682

static VALUE rb_mosquitto_client_loop_read(VALUE obj, VALUE max_packets)
{
    struct nogvl_loop_args args;
    int ret;
    MosquittoGetClient(obj);
    Check_Type(max_packets, T_FIXNUM);
    args.mosq = client->mosq;
    args.max_packets = NUM2INT(max_packets);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_read_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NO_CONN:
           MosquittoError("client not connected to broker");
           break;
       case MOSQ_ERR_CONN_LOST:
           MosquittoError("connection to the broker was lost");
           break;
       case MOSQ_ERR_PROTOCOL:
           MosquittoError("protocol error communicating with the broker");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_loop");
           break;
       default:
           return Qtrue;
    }
}

#loop_startBoolean

This is part of the threaded client interface. Call this once to start a new thread to process network traffic. This provides an alternative to repeatedly calling Mosquitto::Client#loop yourself.

Examples:

client.loop_start

Returns:

  • (Boolean)

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error)

    on invalid input params or if thread support is not available



1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
# File 'ext/mosquitto/client.c', line 1574

static VALUE rb_mosquitto_client_loop_start(VALUE obj)
{
    int ret;
    struct timeval time;
    MosquittoGetClient(obj);
    /* Let's not spawn duplicate threaded loops */
    if (!NIL_P(client->callback_thread)) return Qtrue;
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_start_nogvl, (void *)client->mosq, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOT_SUPPORTED :
           MosquittoError("thread support is not available");
           break;
       default:
           client->waiter = MOSQ_ALLOC(mosquitto_callback_waiting_t);
           if (pthread_mutex_init(&client->callback_mutex, NULL) != 0) MosquittoError("failed to create callback thread mutex");
           if (pthread_cond_init(&client->callback_cond, NULL) != 0) MosquittoError("failed to create callback thread condition var");
           client->callback_thread = rb_thread_create(rb_mosquitto_callback_thread, client);
           /* Allow the callback thread some startup time */
           time.tv_sec  = 0;
           time.tv_usec = 100 * 1000;  /* 0.1 sec */
           rb_thread_wait_for(time);
           return Qtrue;
    }
}

#loop_stop(true) ⇒ Boolean

This is part of the threaded client interface. Call this once to stop the network thread previously created with Mosquitto::Client#loop_start. This call will block until the network thread finishes. For the network thread to end, you must have previously called Mosquitto::Client#disconnect or have set the force parameter to true.

Examples:

client.loop_stop(true)

Returns:

  • (Boolean)

Parameters:

  • force (Boolean)

    set to true to force thread cancellation. If false, Mosquitto::Client#disconnect must have already been called.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error)

    on invalid input params or if thread support is not available



1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
# File 'ext/mosquitto/client.c', line 1640

static VALUE rb_mosquitto_client_loop_stop(VALUE obj, VALUE force)
{
    struct nogvl_loop_stop_args args;
    int ret;
    MosquittoGetClient(obj);
    args.mosq = client->mosq;
    args.force = ((force == Qtrue) ? true : false);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_stop_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("Threaded main loop not running for this client. Are you sure you haven't already called Mosquitto::Client#loop_stop ?");
           break;
       case MOSQ_ERR_NOT_SUPPORTED :
           MosquittoError("thread support is not available");
           break;
       default:
           rb_mosquitto_client_reap_event_thread(client);
           return Qtrue;
    }
}

#loop_write(1) ⇒ Boolean

Carry out network write operations. This should only be used if you are not using Mosquitto::Client#loop and are monitoring the client network socket for activity yourself.

Examples:

client.loop_write(1)

Returns:

  • (Boolean)

Parameters:

  • max_packets (Integer)

    this parameter is currently unused and should be set to 1 for future compatibility.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
# File 'ext/mosquitto/client.c', line 1736

static VALUE rb_mosquitto_client_loop_write(VALUE obj, VALUE max_packets)
{
    struct nogvl_loop_args args;
    int ret;
    MosquittoGetClient(obj);
    Check_Type(max_packets, T_FIXNUM);
    args.mosq = client->mosq;
    args.max_packets = NUM2INT(max_packets);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_write_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NO_CONN:
           MosquittoError("client not connected to broker");
           break;
       case MOSQ_ERR_CONN_LOST:
           MosquittoError("connection to the broker was lost");
           break;
       case MOSQ_ERR_PROTOCOL:
           MosquittoError("protocol error communicating with the broker");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_loop");
           break;
       default:
           return Qtrue;
    }
}

#max_inflight_messages=(10) ⇒ Boolean

Set the number of QoS 1 and 2 messages that can be “in flight” at one time. An in flight message is part way through its delivery flow. Attempts to send further messages with mosquitto::Client#publish will result in the messages being queued until the number of in flight messages reduces.

A higher number here results in greater message throughput, but if set higher than the maximum in flight messages on the broker may lead to delays in the messages being acknowledged.

Set to 0 for no maximum.

Examples:

client.max_inflight_messages = 10

Returns:

  • (Boolean)

Parameters:

  • max_messages (Integer)

    the maximum number of inflight messages. Defaults to 20.

Returns:

  • (true)

    on success

Raises:



1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
# File 'ext/mosquitto/client.c', line 1921

static VALUE rb_mosquitto_client_max_inflight_messages_equals(VALUE obj, VALUE max_messages)
{
    int ret;
    MosquittoGetClient(obj);
    Check_Type(max_messages, T_FIXNUM);
    ret = mosquitto_max_inflight_messages_set(client->mosq, INT2NUM(max_messages));
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       default:
           return Qtrue;
    }
}

#message_retry=(10) ⇒ Boolean

Set the number of seconds to wait before retrying messages. This applies to publish messages with QoS>0. May be called at any time.

Examples:

client.message_retry = 10

Returns:

  • (Boolean)

Parameters:

  • message_retry (Integer)

    the number of seconds to wait for a response before retrying. Defaults to 20.

Returns:

  • (true)

    on success

Raises:



1950
1951
1952
1953
1954
1955
1956
# File 'ext/mosquitto/client.c', line 1950

static VALUE rb_mosquitto_client_message_retry_equals(VALUE obj, VALUE seconds)
{
    MosquittoGetClient(obj);
    Check_Type(seconds, T_FIXNUM);
    mosquitto_message_retry_set(client->mosq, INT2NUM(seconds));
    return Qtrue;
}

#on_connect {|rc| ... } ⇒ Boolean

Set the connect callback. This is called when the broker sends a CONNACK message in response to a connection.

Examples:

client.on_connect{|rc| p :connected }

Yields:

  • (rc)

Returns:

  • (Boolean)

Yields:

  • connect callback

Yield Parameters:

  • rc (Integer)

    the return code of the connection response, one of: 0 - success, 1 - connection refused (unacceptable protocol version), 2 - connection refused (identifier rejected) 3 - connection refused (broker unavailable)

Returns:

  • (true)

    on success

Raises:

  • (TypeError, ArgumentError)

    if callback is not a Proc or if the method arity is wrong



1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
# File 'ext/mosquitto/client.c', line 1976

static VALUE rb_mosquitto_client_on_connect(int argc, VALUE *argv, VALUE obj)
{
    VALUE proc, cb;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01&", &proc, &cb);
    MosquittoAssertCallback(cb, 1);
    if (!NIL_P(client->connect_cb)) rb_gc_unregister_address(&client->connect_cb);
    mosquitto_connect_callback_set(client->mosq, rb_mosquitto_client_on_connect_cb);
    client->connect_cb = cb;
    rb_gc_register_address(&client->connect_cb);
    return Qtrue;
}

#on_disconnect {|rc| ... } ⇒ Boolean

Set the disconnect callback. This is called when the broker has received the DISCONNECT command and has disconnected the client.

Examples:

client.on_disconnect{|rc| p :disconnected }

Yields:

  • (rc)

Returns:

  • (Boolean)

Yields:

  • disconnect callback

Yield Parameters:

  • rc (Integer)

    integer value indicating the reason for the disconnect. A value of 0 means the client has called Mosquitto::Client#disconnect. Any other value indicates that the disconnect is unexpected.

Returns:

  • (true)

    on success

Raises:

  • (TypeError, ArgumentError)

    if callback is not a Proc or if the method arity is wrong



2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
# File 'ext/mosquitto/client.c', line 2006

static VALUE rb_mosquitto_client_on_disconnect(int argc, VALUE *argv, VALUE obj)
{
    VALUE proc, cb;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01&", &proc, &cb);
    MosquittoAssertCallback(cb, 1);
    if (!NIL_P(client->disconnect_cb)) rb_gc_unregister_address(&client->disconnect_cb);
    mosquitto_disconnect_callback_set(client->mosq, rb_mosquitto_client_on_disconnect_cb);
    client->disconnect_cb = cb;
    rb_gc_register_address(&client->disconnect_cb);
    return Qtrue;
}

#on_log {|level, msg| ... } ⇒ Boolean

Set the logging callback. This should be used if you want event logging information from the client library.

Examples:

client.on_log{|level, msg| p msg }

Yields:

  • (level, msg)

Returns:

  • (Boolean)

Yields:

  • unsubscribe callback

Yield Parameters:

Returns:

  • (true)

    on success

Raises:

  • (TypeError, ArgumentError)

    if callback is not a Proc or if the method arity is wrong



2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
# File 'ext/mosquitto/client.c', line 2151

static VALUE rb_mosquitto_client_on_log(int argc, VALUE *argv, VALUE obj)
{
    VALUE proc, cb;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01&", &proc, &cb);
    MosquittoAssertCallback(cb, 2);
    if (!NIL_P(client->log_cb)) rb_gc_unregister_address(&client->log_cb);
    mosquitto_log_callback_set(client->mosq, rb_mosquitto_client_on_log_cb);
    client->log_cb = cb;
    rb_gc_register_address(&client->log_cb);
    return Qtrue;
}

#on_message {|msg| ... } ⇒ Boolean

Set the message callback. This is called when a message is received from the broker.

Examples:

client.on_message{|msg| p msg }

Yields:

  • (msg)

Returns:

  • (Boolean)

Yields:

  • message callback

Yield Parameters:

Returns:

  • (true)

    on success

Raises:

  • (TypeError, ArgumentError)

    if callback is not a Proc or if the method arity is wrong



2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
# File 'ext/mosquitto/client.c', line 2062

static VALUE rb_mosquitto_client_on_message(int argc, VALUE *argv, VALUE obj)
{
    VALUE proc, cb;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01&", &proc, &cb);
    MosquittoAssertCallback(cb, 1);
    if (!NIL_P(client->message_cb)) rb_gc_unregister_address(&client->message_cb);
    mosquitto_message_callback_set(client->mosq, rb_mosquitto_client_on_message_cb);
    client->message_cb = cb;
    rb_gc_register_address(&client->message_cb);
    return Qtrue;
}

#on_publish {|mid| ... } ⇒ Boolean

Set the publish callback. This is called when a message initiated with Mosquitto::Client#publish has been sent to the broker successfully.

Examples:

client.on_publish{|mid| p :published }

Yields:

  • (mid)

Returns:

  • (Boolean)

Yields:

  • publish callback

Yield Parameters:

  • mid (Integer)

    the message id of the sent message

Returns:

  • (true)

    on success

Raises:

  • (TypeError, ArgumentError)

    if callback is not a Proc or if the method arity is wrong



2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
# File 'ext/mosquitto/client.c', line 2034

static VALUE rb_mosquitto_client_on_publish(int argc, VALUE *argv, VALUE obj)
{
    VALUE proc, cb;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01&", &proc, &cb);
    MosquittoAssertCallback(cb, 1);
    if (!NIL_P(client->publish_cb)) rb_gc_unregister_address(&client->publish_cb);
    mosquitto_publish_callback_set(client->mosq, rb_mosquitto_client_on_publish_cb);
    client->publish_cb = cb;
    rb_gc_register_address(&client->publish_cb);
    return Qtrue;
}

#on_subscribe {|mid, granted_qos| ... } ⇒ Boolean

Set the subscribe callback. This is called when the broker responds to a subscription request.

Examples:

client.on_subscribe{|mid, granted_qos| p :subscribed }

Yields:

  • (mid, granted_qos)

Returns:

  • (Boolean)

Yields:

  • subscription callback

Yield Parameters:

  • mid (Integer)

    the message id of the subscribe message.

  • granted_qos (Array)

    an array of integers indicating the granted QoS for each of the subscriptions.

Returns:

  • (true)

    on success

Raises:

  • (TypeError, ArgumentError)

    if callback is not a Proc or if the method arity is wrong



2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
# File 'ext/mosquitto/client.c', line 2092

static VALUE rb_mosquitto_client_on_subscribe(int argc, VALUE *argv, VALUE obj)
{
    VALUE proc, cb;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01&", &proc, &cb);
    MosquittoAssertCallback(cb, 2);
    if (!NIL_P(client->subscribe_cb)) rb_gc_unregister_address(&client->subscribe_cb);
    mosquitto_subscribe_callback_set(client->mosq, rb_mosquitto_client_on_subscribe_cb);
    client->subscribe_cb = cb;
    rb_gc_register_address(&client->subscribe_cb);
    return Qtrue;
}

#on_unsubscribe {|mid| ... } ⇒ Boolean

Set the unsubscribe callback. This is called when the broker responds to a unsubscription request.

Examples:

client.on_unsubscribe{|mid| p :unsubscribed }

Yields:

  • (mid)

Returns:

  • (Boolean)

Yields:

  • unsubscribe callback

Yield Parameters:

  • mid (Integer)

    the message id of the unsubscribe message.

Returns:

  • (true)

    on success

Raises:

  • (TypeError, ArgumentError)

    if callback is not a Proc or if the method arity is wrong



2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
# File 'ext/mosquitto/client.c', line 2121

static VALUE rb_mosquitto_client_on_unsubscribe(int argc, VALUE *argv, VALUE obj)
{
    VALUE proc, cb;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01&", &proc, &cb);
    MosquittoAssertCallback(cb, 1);
    if (!NIL_P(client->unsubscribe_cb)) rb_gc_unregister_address(&client->unsubscribe_cb);
    mosquitto_unsubscribe_callback_set(client->mosq, rb_mosquitto_client_on_unsubscribe_cb);
    client->unsubscribe_cb = cb;
    rb_gc_register_address(&client->unsubscribe_cb);
    return Qtrue;
}

#publish(3, "publish", "test", Mosquitto: :AT_MOST_ONCE, true) ⇒ Boolean

Publish a message on a given topic.

Examples:

client.publish(3, "publish", "test", Mosquitto::AT_MOST_ONCE, true)

Returns:

  • (Boolean)

Parameters:

  • mid (Integer, nil)

    If not nil, the function will set this to the message id of this particular message. This can be then used with the publish callback to determine when the message has been sent. Note that although the MQTT protocol doesn’t use message ids for messages with QoS=0, libmosquitto assigns them message ids so they can be tracked with this parameter.

  • payload (String)

    Message payload to send. Max 256MB

  • qos (Mosquitto::AT_MOST_ONCE, Mosquitto::AT_LEAST_ONCE, Mosquitto::EXACTLY_ONCE)

    Quality of Service to be used for the message.

  • retain (true, false)

    set to true to make the message retained

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
# File 'ext/mosquitto/client.c', line 1226

static VALUE rb_mosquitto_client_publish(VALUE obj, VALUE mid, VALUE topic, VALUE payload, VALUE qos, VALUE retain)
{
    struct nogvl_publish_args args;
    int ret, msg_id;
    struct timeval time;
    bool retried = false;
    MosquittoGetClient(obj);
    Check_Type(topic, T_STRING);
    MosquittoEncode(topic);
    Check_Type(payload, T_STRING);
    MosquittoEncode(payload);
    Check_Type(qos, T_FIXNUM);
    if (!NIL_P(mid)) {
        Check_Type(mid, T_FIXNUM);
        msg_id = NUM2INT(mid);
    }
    args.mosq = client->mosq;
    args.mid = NIL_P(mid) ? NULL : &msg_id;
    args.topic = StringValueCStr(topic);
    args.payloadlen = (int)RSTRING_LEN(payload);
    args.payload = (const char *)(args.payloadlen == 0 ? NULL : StringValueCStr(payload));
    args.qos = NUM2INT(qos);
    args.retain = (retain == Qtrue) ? true : false;
  retry_once:
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_publish_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NO_CONN:
           RetryNotConnectedOnce();
           MosquittoError("client not connected to broker");
           break;
       case MOSQ_ERR_PROTOCOL:
           MosquittoError("protocol error communicating with broker");
           break;
       case MOSQ_ERR_PAYLOAD_SIZE:
           MosquittoError("payload too large");
           break;
       default:
           return Qtrue;
    }
}

#reconnectBoolean

Note:

It must not be called before Mosquitto::Client#connect

Reconnect to a broker.

This function provides an easy way of reconnecting to a broker after a connection has been lost. It uses the values that were provided in the Mosquitto::Client#connect call.

Examples:

client.reconnect

Returns:

  • (Boolean)

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
# File 'ext/mosquitto/client.c', line 1145

static VALUE rb_mosquitto_client_reconnect(VALUE obj)
{
    int ret;
    MosquittoGetClient(obj);
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_reconnect_nogvl, (void *)client->mosq, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_ERRNO:
           rb_sys_fail("mosquitto_reconnect");
           break;
       default:
           return Qtrue;
    }
}

#reconnect_delay_set(2, 10, true) ⇒ Boolean

Control the behaviour of the client when it has unexpectedly disconnected in Mosquitto::Client#loop_forever or after Mosquitto::Client#loop_start. The default behaviour if this function is not used is to repeatedly attempt to reconnect with a delay of 1 second until the connection succeeds.

Use reconnect_delay parameter to change the delay between successive reconnection attempts. You may also enable exponential backoff of the time between reconnections by setting reconnect_exponential_backoff to true and set an upper bound on the delay with reconnect_delay_max.

Example 1: delay=2, delay_max=10, exponential_backoff=False Delays would be: 2, 4, 6, 8, 10, 10, …

Example 2: delay=3, delay_max=30, exponential_backoff=True Delays would be: 3, 6, 12, 24, 30, 30, …

Examples:

client.reconnect_delay_set(2, 10, true)

Returns:

  • (Boolean)

Parameters:

  • delay (Integer)

    the number of seconds to wait between reconnects

  • delay_max (Integer)

    the maximum number of seconds to wait between reconnects

  • exponential_backoff (true, false)

    use exponential backoff between reconnect attempts. Set to true to enable exponential backoff.

Returns:

  • (true)

    on success

Raises:



1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
# File 'ext/mosquitto/client.c', line 1883

static VALUE rb_mosquitto_client_reconnect_delay_set(VALUE obj, VALUE delay, VALUE delay_max, VALUE exp_backoff)
{
    int ret;
    MosquittoGetClient(obj);
    Check_Type(delay, T_FIXNUM);
    Check_Type(delay_max, T_FIXNUM);
    ret = mosquitto_reconnect_delay_set(client->mosq, INT2NUM(delay), INT2NUM(delay_max), ((exp_backoff == Qtrue) ? true : false));
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       default:
           return Qtrue;
    }
}

#reinitialise("some-id") ⇒ Mosquitto::Client

Note:

As per the MQTT spec, client identifiers cannot exceed 23 characters

Allows an existing mosquitto client to be reused. Call on a mosquitto instance to close any open network connections, free memory and reinitialise the client with the new parameters.

Examples:

client.reinitialise("session_id") -> Mosquitto::Client
client.reinitialise(nil, true) -> Mosquitto::Client

Returns:

Parameters:

  • identifier (String)

    the client identifier. Set to nil to have a random one generated. clean_session must be true if the identifier is nil.

  • clean_session (true, false)

    set to true to instruct the broker to clean all messages and subscriptions on disconnect, false to instruct it to keep them

Returns:

Raises:



545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# File 'ext/mosquitto/client.c', line 545

static VALUE rb_mosquitto_client_reinitialise(int argc, VALUE *argv, VALUE obj)
{
    struct nogvl_reinitialise_args args;
    VALUE client_id;
    int ret;
    bool clean_session;
    char *cl_id = NULL;
    MosquittoGetClient(obj);
    rb_scan_args(argc, argv, "01", &client_id);
    if (NIL_P(client_id)) {
        clean_session = true;
    } else {
        clean_session = false;
        Check_Type(client_id, T_STRING);
        MosquittoEncode(client_id);
        cl_id = StringValueCStr(client_id);
    }
    args.mosq = client->mosq;
    args.client_id = cl_id;
    args.clean_session = clean_session;
    args.obj = (void *)obj;
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_reinitialise_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       default:
           return Qtrue;
    }
}

#socketInteger

Return the socket handle for a mosquitto instance. Useful if you want to include a mosquitto client in your own select() calls.

Examples:

client.socket

Returns:

  • (Integer)

Returns:

  • (Integer)

    socket identifier, or -1 on failure



1401
1402
1403
1404
1405
1406
1407
# File 'ext/mosquitto/client.c', line 1401

static VALUE rb_mosquitto_client_socket(VALUE obj)
{
    int socket;
    MosquittoGetClient(obj);
    socket = mosquitto_socket(client->mosq);
    return INT2NUM(socket);
}

#subscribe(3, "subscribe", Mosquitto: :AT_MOST_ONCE) ⇒ Boolean

Subscribe to a topic.

Examples:

client.subscribe(3, "subscribe", Mosquitto::AT_MOST_ONCE)

Returns:

  • (Boolean)

Parameters:

  • mid (Integer, nil)

    If not nil, the function will set this to the message id of this particular message. This can be then used with the subscribe callback to determine when the message has been sent.

  • subscription (String)

    The subscription pattern

  • qos (Mosquitto::AT_MOST_ONCE, Mosquitto::AT_LEAST_ONCE, Mosquitto::EXACTLY_ONCE)

    Quality of Service to be used for the subscription

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
# File 'ext/mosquitto/client.c', line 1297

static VALUE rb_mosquitto_client_subscribe(VALUE obj, VALUE mid, VALUE subscription, VALUE qos)
{
    struct nogvl_subscribe_args args;
    int ret, msg_id;
    struct timeval time;
    bool retried = false;
    MosquittoGetClient(obj);
    Check_Type(subscription, T_STRING);
    MosquittoEncode(subscription);
    Check_Type(qos, T_FIXNUM);
    if (!NIL_P(mid)) {
        Check_Type(mid, T_FIXNUM);
        msg_id = NUM2INT(mid);
    }
    args.mosq = client->mosq;
    args.mid = NIL_P(mid) ? NULL : &msg_id;
    args.subscription = StringValueCStr(subscription);
    args.qos = NUM2INT(qos);
  retry_once:
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_subscribe_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NO_CONN:
           RetryNotConnectedOnce();
           MosquittoError("client not connected to broker");
           break;
       default:
           return Qtrue;
    }
}

#insecure=(true) ⇒ Boolean

Note:

This must be called before calling Mosquitto::Client#connect

Configure verification of the server hostname in the server certificate. If value is set to true, it is impossible to guarantee that the host you are connecting to is not impersonating your server. This can be useful in initial server testing, but makes it possible for a malicious third party to impersonate your server through DNS spoofing, for example. Do not use this function in a real system. Setting value to true makes the connection encryption pointless.

Examples:

client.insecure = true

Returns:

  • (Boolean)

Parameters:

  • insecure (true, false)

    if set to false, the default, certificate hostname checking is performed. If set to true, no hostname checking is performed and the connection is insecure.

Returns:

  • (true)

    on success

Raises:



794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
# File 'ext/mosquitto/client.c', line 794

static VALUE rb_mosquitto_client_tls_insecure_set(VALUE obj, VALUE insecure)
{
    int ret;
    MosquittoGetClient(obj);
    if (insecure != Qtrue && insecure != Qfalse) {
         rb_raise(rb_eTypeError, "changing TLS verification semantics requires a boolean value");
    }

    ret = mosquitto_tls_insecure_set(client->mosq, ((insecure == Qtrue) ? true : false));
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOT_SUPPORTED:
           MosquittoError("TLS support is not available");
       default:
           return Qtrue;
    }
}

#tls_opts_set(Mosquitto: :SSL_VERIFY_PEER, "tlsv1.2", nil) ⇒ Boolean

Note:

This must be called before calling Mosquitto::Client#connect

Set advanced SSL/TLS options.

Examples:

client.tls_opts_set(Mosquitto::SSL_VERIFY_PEER, "tlsv1.2", nil)

Returns:

  • (Boolean)

Parameters:

  • cert_reqs (Mosquitto::SSL_VERIFY_NONE, Mosquitto::SSL_VERIFY_NONE)

    an integer defining the verification requirements the client will impose on the server. The default and recommended value is Mosquitto::SSL_VERIFY_PEER. Using Mosquitto::SSL_VERIFY_NONE provides no security.

  • tls_version ("tlsv1.2", "tlsv1.1", "tlsv1")

    the version of the SSL/TLS protocol to use as a string. If nil, the default value is used.

  • ciphers (String)

    a string describing the ciphers available for use. See the ‘openssl ciphers` tool for more information. If nil, the default ciphers will be used.

Returns:

  • (true)

    on success

Raises:

See Also:

  • ciphers`


835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
# File 'ext/mosquitto/client.c', line 835

static VALUE rb_mosquitto_client_tls_opts_set(VALUE obj, VALUE cert_reqs, VALUE tls_version, VALUE ciphers)
{
    int ret;
    MosquittoGetClient(obj);
    Check_Type(cert_reqs, T_FIXNUM);
    if (!NIL_P(tls_version)) {
        Check_Type(tls_version, T_STRING);
        MosquittoEncode(tls_version);
    }
    if (!NIL_P(ciphers)) {
        Check_Type(ciphers, T_STRING);
        MosquittoEncode(ciphers);
    }

    if (NUM2INT(cert_reqs) != 0 && NUM2INT(cert_reqs) != 1) {
        MosquittoError("TLS verification requirement should be one of Mosquitto::SSL_VERIFY_NONE or Mosquitto::SSL_VERIFY_PEER");
    }

    ret = mosquitto_tls_opts_set(client->mosq, NUM2INT(cert_reqs), (NIL_P(tls_version) ? NULL : StringValueCStr(tls_version)), (NIL_P(ciphers) ? NULL : StringValueCStr(ciphers)));
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NOT_SUPPORTED:
           MosquittoError("TLS support is not available");
       default:
           return Qtrue;
    }
}

#tls_psk_set("deadbeef", "psk-id", nil) ⇒ Boolean

Note:

This must be called before calling Mosquitto::Client#connect

Configure the client for pre-shared-key based TLS support.

Examples:

client.tls_psk_set("deadbeef", "psk-id", nil)

Returns:

  • (Boolean)

Parameters:

  • psk (String)

    the pre-shared-key in hex format with no leading “0x”.

  • identity (String)

    the identity of this client. May be used as the username depending on the server settings.

  • ciphers (String)

    a string describing the ciphers available for use. See the ‘openssl ciphers` tool for more information. If nil, the default ciphers will be used.

Returns:

  • (true)

    on success

Raises:

See Also:

  • ciphers`


887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
# File 'ext/mosquitto/client.c', line 887

static VALUE rb_mosquitto_client_tls_psk_set(VALUE obj, VALUE psk, VALUE identity, VALUE ciphers)
{
    int ret;
    MosquittoGetClient(obj);
    Check_Type(psk, T_STRING);
    Check_Type(identity, T_STRING);
    if (!NIL_P(ciphers)) {
        Check_Type(ciphers, T_STRING);
        MosquittoEncode(ciphers);
    }

    ret = mosquitto_tls_psk_set(client->mosq, StringValueCStr(psk), StringValueCStr(identity), (NIL_P(ciphers) ? NULL : StringValueCStr(ciphers)));
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NOT_SUPPORTED:
           MosquittoError("TLS support is not available");
       default:
           return Qtrue;
    }
}

#tls_set('/certs/all-ca.crt') ⇒ Boolean

Note:

This must be called before calling Mosquitto::Client#connect

Configure the client for certificate based SSL/TLS support.

Cannot be used in conjunction with Mosquitto::Client#tls_psk_set.

Define the Certificate Authority certificates to be trusted (ie. the server certificate must be signed with one of these certificates) using cafile.

If the server you are connecting to requires clients to provide a certificate, define certfile and keyfile with your client certificate and private key

Examples:

client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key', 'password')

Returns:

  • (Boolean)

Parameters:

  • cafile (String)

    path to a file containing the PEM encoded trusted CA certificate files. Either cafile or capath must not be nil.

  • capath (String)

    path to a directory containing the PEM encoded trusted CA certificate files. Either cafile or capath must not be nil.

  • certfile (String)

    path to a file containing the PEM encoded certificate file for this client. If nil, keyfile must also be nil and no client certificate will be used.

  • keyfile (String)

    path to a file containing the PEM encoded private key for this client. If nil, certfile must also be NULL and no client certificate will be used.

  • password (String)

    password for encrypted keyfile

Returns:

  • (true)

    on success

Raises:



724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
# File 'ext/mosquitto/client.c', line 724

static VALUE rb_mosquitto_client_tls_set(VALUE obj, VALUE cafile, VALUE capath, VALUE certfile, VALUE keyfile, VALUE password)
{
    int ret;
    int (*pw_callback)(char *, int, int, void *) = NULL;
    MosquittoGetClient(obj);
    if (!NIL_P(cafile)) {
        Check_Type(cafile, T_STRING);
        MosquittoEncode(cafile);
    }
    if (!NIL_P(capath)) {
        Check_Type(capath, T_STRING);
        MosquittoEncode(capath);
    }
    if (!NIL_P(certfile)) {
        Check_Type(certfile, T_STRING);
        MosquittoEncode(certfile);
    }
    if (!NIL_P(keyfile)) {
        Check_Type(keyfile, T_STRING);
        MosquittoEncode(keyfile);
    }

    if (!NIL_P(password)) {
        Check_Type(password, T_STRING);
        mosquitto_tls_password = password;
        rb_gc_register_address(&mosquitto_tls_password);
        pw_callback = rb_mosquitto_tls_password_callback;
    }

    if (NIL_P(cafile) && NIL_P(capath)) MosquittoError("Either CA path or CA file is required!");
    if (NIL_P(certfile) && !NIL_P(keyfile)) MosquittoError("Key file can only be used with a certificate file!");
    if (NIL_P(keyfile) && !NIL_P(certfile)) MosquittoError("Certificate file also requires a key file!");

    ret = mosquitto_tls_set(client->mosq, (NIL_P(cafile) ? NULL : StringValueCStr(cafile)), (NIL_P(capath) ? NULL : StringValueCStr(capath)), (NIL_P(certfile) ? NULL : StringValueCStr(certfile)), (NIL_P(keyfile) ? NULL : StringValueCStr(keyfile)), pw_callback);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NOT_SUPPORTED:
           MosquittoError("TLS support is not available");
       default:
           return Qtrue;
    }
}

#unsubscribe(3, "unsubscribe") ⇒ Boolean

Unsubscribe from a topic.

Examples:

client.unsubscribe(3, "unsubscribe")

Returns:

  • (Boolean)

Parameters:

  • mid (Integer, nil)

    If not nil, the function will set this to the message id of this particular message. This can be then used with the unsubscribe callback to determine when the message has been sent.

  • subscription (String)

    the unsubscription pattern.

Returns:

  • (true)

    on success

Raises:

  • (Mosquitto::Error, SystemCallError)

    on invalid input params or system call errors



1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
# File 'ext/mosquitto/client.c', line 1355

static VALUE rb_mosquitto_client_unsubscribe(VALUE obj, VALUE mid, VALUE subscription)
{
    struct nogvl_subscribe_args args;
    int ret, msg_id;
    struct timeval time;
    bool retried = false;
    MosquittoGetClient(obj);
    Check_Type(subscription, T_STRING);
    MosquittoEncode(subscription);
    if (!NIL_P(mid)) {
        Check_Type(mid, T_FIXNUM);
        msg_id = NUM2INT(mid);
    }
    args.mosq = client->mosq;
    args.mid = NIL_P(mid) ? NULL : &msg_id;
    args.subscription = StringValueCStr(subscription);
  retry_once:
    ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_unsubscribe_nogvl, (void *)&args, RUBY_UBF_IO, 0);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_NO_CONN:
           RetryNotConnectedOnce();
           MosquittoError("client not connected to broker");
           break;
       default:
           return Qtrue;
    }
}

#wait_readable(timeout = 10) ⇒ Object



10
11
12
13
14
15
16
17
18
# File 'lib/mosquitto/client.rb', line 10

def wait_readable(timeout = 10)
  retries ||= 0
  IO.for_fd(socket).wait_readable(timeout) && sleep(2)
rescue Errno::EBADF
  retries += 1
  sleep 0.5
  raise if retries > 4
  retry
end

#want_write?Boolean

Returns true if there is data ready to be written on the socket.

Examples:

client.want_write

Returns:

  • (Boolean)

Returns:

  • (true, false)

    true if there is data ready to be written on the socket



1819
1820
1821
1822
1823
1824
1825
# File 'ext/mosquitto/client.c', line 1819

static VALUE rb_mosquitto_client_want_write(VALUE obj)
{
    bool ret;
    MosquittoGetClient(obj);
    ret = mosquitto_want_write(client->mosq);
    return (ret == true) ? Qtrue : Qfalse;
}

#will_clearBoolean

Note:

This must be called before calling Mosquitto::Client#connect

Remove a previously configured will.

Examples:

client.will_clear

Returns:

  • (Boolean)

Returns:

  • (true)

    on success

Raises:



637
638
639
640
641
642
643
644
645
646
647
648
649
# File 'ext/mosquitto/client.c', line 637

static VALUE rb_mosquitto_client_will_clear(VALUE obj)
{
    int ret;
    MosquittoGetClient(obj);
    ret = mosquitto_will_clear(client->mosq);
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       default:
           return Qtrue;
    }
}

#will_set("topic", "died", Mosquitto: :AT_MOST_ONCE, false) ⇒ Mosquitto::Client

Note:

This must be called before calling Mosquitto::Client#connect

Configure will information for a mosquitto instance. By default, clients do not have a will.

Examples:

client.will_set("will_set", "test", Mosquitto::AT_MOST_ONCE, true)

Returns:

Parameters:

Returns:

  • (true)

    on success

Raises:



597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
# File 'ext/mosquitto/client.c', line 597

static VALUE rb_mosquitto_client_will_set(VALUE obj, VALUE topic, VALUE payload, VALUE qos, VALUE retain)
{
    int ret;
    int payload_len;
    MosquittoGetClient(obj);
    Check_Type(topic, T_STRING);
    MosquittoEncode(topic);
    Check_Type(payload, T_STRING);
    MosquittoEncode(payload);
    Check_Type(qos, T_FIXNUM);
    payload_len = (int)RSTRING_LEN(payload);
    ret = mosquitto_will_set(client->mosq, StringValueCStr(topic), payload_len, (payload_len == 0 ? NULL : StringValueCStr(payload)), NUM2INT(qos), ((retain == Qtrue) ? true : false));
    switch (ret) {
       case MOSQ_ERR_INVAL:
           MosquittoError("invalid input params");
           break;
       case MOSQ_ERR_NOMEM:
           rb_memerror();
           break;
       case MOSQ_ERR_PAYLOAD_SIZE:
           MosquittoError("payload too large");
           break;
       default:
           return Qtrue;
    }
}