Class: Majordomo::Worker

Inherits:
Object
  • Object
show all
Defined in:
ext/majordomo/worker.c

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.Majordomo::Worker.new("tcp: //0.0.0.0:5555", "service") ⇒ Majordomo::Worker .Majordomo::Worker.new("tcp: //0.0.0.0:5555", "service", true) ⇒ Majordomo::Worker

Creates a new Majordomo::Worker instance. A broker URI and service identifier is required and an optional verbose flag can be passed to the initializer.

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.broker                                                    =>  "tcp://0.0.0.0:5555"
wk.heartbeat                                                 =>  2500
wk.recv                                                      =>  "request"

Overloads:



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'ext/majordomo/worker.c', line 77

static VALUE rb_majordomo_worker_s_new(int argc, VALUE *argv, VALUE klass)
{
    rb_majordomo_worker_t *worker = NULL;
    struct nogvl_md_worker_new_args args;
    VALUE obj, broker, service, verbose;
    rb_scan_args(argc, argv, "21", &broker, &service, &verbose);
    if (verbose == Qnil)
        verbose = Qfalse;
    Check_Type(broker, T_STRING);
    Check_Type(service, T_STRING);
    obj = Data_Make_Struct(klass, rb_majordomo_worker_t, rb_mark_majordomo_worker, rb_free_majordomo_worker, worker);

    args.broker = RSTRING_PTR(broker);
    args.service = RSTRING_PTR(service);
    args.verbose = (verbose == Qtrue ? 1 : 0);
    worker->worker = (mdp_worker_t *)rb_thread_blocking_region(rb_nogvl_mdp_worker_new, (void *)&args, RUBY_UBF_IO, 0);
    worker->broker = rb_str_new4(broker);
    worker->service = rb_str_new4(service);
    worker->heartbeat = INT2NUM(MAJORDOMO_WORKER_HEARTBEAT);
    worker->reconnect = INT2NUM(MAJORDOMO_WORKER_RECONNECT);
#ifndef HAVE_RB_THREAD_BLOCKING_REGION
    worker->recv_buffer = zlist_new();
#endif
    rb_obj_call_init(obj, 0, NULL);
    return obj;
}

Instance Method Details

#brokerString

Returns the URI of the broker this worker is connected to.

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.broker                                                    =>  "tcp://0.0.0.0:5555"

Returns:

  • (String)


115
116
117
118
# File 'ext/majordomo/worker.c', line 115

static VALUE rb_majordomo_worker_broker(VALUE obj){
    GetMajordomoWorker(obj);
    return worker->broker;
}

#closenil

Close the worker connection to the broker.

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.close                                                     =>  nil

Returns:

  • (nil)


344
345
346
347
348
349
350
# File 'ext/majordomo/worker.c', line 344

static VALUE rb_majordomo_worker_close(VALUE obj){
    VALUE ret;
    GetMajordomoWorker(obj);
    ret = rb_thread_blocking_region(rb_nogvl_mdp_worker_close, (void *)worker->worker, RUBY_UBF_IO, 0);
    worker->worker = NULL;
    return ret;
}

#heartbeatFixnum

Returns the worker heartbeat delay (in msecs).

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.heartbeat                                                 =>  2500

Returns:

  • (Fixnum)


147
148
149
150
# File 'ext/majordomo/worker.c', line 147

static VALUE rb_majordomo_worker_heartbeat(VALUE obj){
    GetMajordomoWorker(obj);
    return worker->heartbeat;
}

#heartbeat=(val) ⇒ nil

Sets the worker heartbeat delay (in msecs).

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.heartbeat = 100                                           =>  nil
wk.heartbeat                                                 =>  100

Returns:

  • (nil)


180
181
182
183
184
185
186
# File 'ext/majordomo/worker.c', line 180

static VALUE rb_majordomo_worker_heartbeat_equals(VALUE obj, VALUE heartbeat){
    GetMajordomoWorker(obj);
    Check_Type(heartbeat, T_FIXNUM);
    mdp_worker_set_heartbeat(worker->worker, FIX2INT(heartbeat));
    worker->heartbeat = heartbeat;
    return Qnil;
}

#reconnectFixnum

Returns the worker reconnect delay (in msecs).

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.reconnect                                                 =>  2500

Returns:

  • (Fixnum)


163
164
165
166
# File 'ext/majordomo/worker.c', line 163

static VALUE rb_majordomo_worker_reconnect(VALUE obj){
    GetMajordomoWorker(obj);
    return worker->reconnect;
}

#reconnect=(100) ⇒ nil

Sets the worker reconnect delay (in msecs).

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.reconnect = 100                                           =>  nil
wk.reconnect                                                 =>  100

Returns:

  • (nil)


200
201
202
203
204
205
206
# File 'ext/majordomo/worker.c', line 200

static VALUE rb_majordomo_worker_reconnect_equals(VALUE obj, VALUE reconnect){
    GetMajordomoWorker(obj);
    Check_Type(reconnect, T_FIXNUM);
    mdp_worker_set_reconnect(worker->worker, FIX2INT(reconnect));
    worker->reconnect = reconnect;
    return Qnil;
}

#recvString?

Receives a client request form the broker. Valid requests are of type String and NilClass

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.recv                                                      =>  ["request", "reply"]

Returns:

  • (String, nil)


252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'ext/majordomo/worker.c', line 252

static VALUE rb_majordomo_worker_recv(VALUE obj){
    VALUE req, reply;
    struct nogvl_md_worker_recv_args args;
    GetMajordomoWorker(obj);
    args.worker = worker;
    args.reply = NULL;
    zmsg_t *request = (zmsg_t *)rb_thread_blocking_region(rb_nogvl_mdp_worker_recv, (void *)&args, RUBY_UBF_IO, 0);
    if (!request)
        return Qnil;
    req = MajordomoEncode(rb_str_new2(zmsg_popstr(request)));
    zmsg_destroy(&request);
    reply = rb_str_new(zframe_data(args.reply), zframe_size(args.reply));
    zframe_destroy(&args.reply);
    return rb_ary_new3(2, req, reply);
}

#send(message, reply_to) ⇒ Boolean

Send a reply to a client request. Returns true if the send was succfessful.

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
req, reply_to = wk.recv                                      =>  ["request", "reply"]
wk.send("reply", reply_to)                                   =>  true

Returns:

  • (Boolean)


312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'ext/majordomo/worker.c', line 312

static VALUE rb_majordomo_worker_send(VALUE obj, VALUE message, VALUE reply_to){
    struct nogvl_md_worker_send_args args;
    GetMajordomoWorker(obj);
    args.worker = worker->worker;
    args.progress = zmsg_new();
    if (!args.progress)
        return Qfalse;
    if (zmsg_pushmem(args.progress, RSTRING_PTR(message), RSTRING_LEN(message)) == -1) {
        zmsg_destroy(&args.progress);
        return Qfalse;
    }
    args.reply_to = zframe_new(RSTRING_PTR(reply_to), RSTRING_LEN(reply_to));
    if (!args.reply_to) {
        zmsg_destroy(&args.progress);
        return Qfalse;
    }
    rb_thread_blocking_region(rb_nogvl_mdp_worker_send, (void *)&args, RUBY_UBF_IO, 0);
    zframe_destroy(&args.reply_to);
    return Qtrue;
}

#serviceString

Returns the service identifier this worker implements.

Examples

wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service")  =>  Majordomo::Worker
wk.service                                                   =>  "service"

Returns:

  • (String)


131
132
133
134
# File 'ext/majordomo/worker.c', line 131

static VALUE rb_majordomo_worker_service(VALUE obj){
    GetMajordomoWorker(obj);
    return worker->service;
}