Class: Agoo::Upgraded
- Inherits:
-
Object
- Object
- Agoo::Upgraded
- Defined in:
- ext/agoo/upgraded.c
Instance Method Summary collapse
-
#close ⇒ Object
call-seq: close().
- #env ⇒ Object
-
#open? ⇒ Boolean
call-seq: open?().
-
#pending ⇒ Object
call-seq: pending().
-
#protocol ⇒ Object
call-seq: protocol().
-
#publish ⇒ Object
Use the publish from the Agoo module.
-
#subscribe(subject) ⇒ Object
call-seq: subscribe(subject).
-
#unsubscribe(*args) ⇒ Object
call-seq: unsubscribe(subject=nil).
-
#write(msg) ⇒ Object
call-seq: write(msg).
Instance Method Details
#close ⇒ Object
call-seq: close()
Closes the connections associated with the handler.
256 257 258 259 260 261 262 263 264 265 |
# File 'ext/agoo/upgraded.c', line 256
static VALUE
up_close(VALUE self) {
Upgraded up = get_upgraded(self);
if (NULL != up) {
atomic_fetch_add(&up->pending, 1);
queue_push(&the_server.pub_queue, pub_close(up));
}
return Qnil;
}
|
#env ⇒ Object
378 379 380 381 382 383 384 385 386 |
# File 'ext/agoo/upgraded.c', line 378
static VALUE
env(VALUE self) {
Upgraded up = get_upgraded(self);
if (NULL != up) {
return up->env;
}
return Qnil;
}
|
#open? ⇒ Boolean
call-seq: open?()
Returns true if the connection is open and false otherwise.
292 293 294 295 296 297 298 299 300 301 302 |
# File 'ext/agoo/upgraded.c', line 292
static VALUE
up_open(VALUE self) {
Upgraded up = get_upgraded(self);
int pending = -1;
if (NULL != up) {
pending = atomic_load(&up->pending);
atomic_fetch_sub(&up->ref_cnt, 1);
}
return 0 <= pending ? Qtrue : Qfalse;
}
|
#pending ⇒ Object
call-seq: pending()
Returns the number of pending WebSocket or SSE writes. If the connection is closed then -1 is returned.
274 275 276 277 278 279 280 281 282 283 284 |
# File 'ext/agoo/upgraded.c', line 274
static VALUE
pending(VALUE self) {
Upgraded up = get_upgraded(self);
int pending = -1;
if (NULL != up) {
pending = atomic_load(&up->pending);
atomic_fetch_sub(&up->ref_cnt, 1);
}
return INT2NUM(pending);
}
|
#protocol ⇒ Object
call-seq: protocol()
Returns the protocol of the upgraded connection as either :websocket or :sse. If not longer connected nil is returned.
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'ext/agoo/upgraded.c', line 311
static VALUE
protocol(VALUE self) {
VALUE pro = Qnil;
if (the_server.active) {
Upgraded up;
pthread_mutex_lock(&the_server.up_lock);
if (NULL != (up = DATA_PTR(self)) && NULL != up->con) {
switch (up->con->kind) {
case CON_WS:
pro = websocket_sym;
break;
case CON_SSE:
pro = sse_sym;
break;
default:
break;
}
}
pthread_mutex_unlock(&the_server.up_lock);
}
return pro;
}
|
#publish ⇒ Object
Use the publish from the Agoo module.
376 |
# File 'ext/agoo/upgraded.c', line 376
extern VALUE ragoo_publish(VALUE self, VALUE subject, VALUE message);
|
#subscribe(subject) ⇒ Object
call-seq: subscribe(subject)
Subscribes to messages published on the specified subject. The subject is a dot delimited string that can include a ‘*’ character as a wild card that matches any set of characters. The ‘>’ character matches all remaining characters. Examples: people.fred.log, people.*.log, people.fred.>
Symbols can also be used as can any other object that responds to #to_s.
212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'ext/agoo/upgraded.c', line 212
static VALUE
up_subscribe(VALUE self, VALUE subject) {
Upgraded up;
int slen;
const char *subj = extract_subject(subject, &slen);
if (NULL != (up = get_upgraded(self))) {
atomic_fetch_add(&up->pending, 1);
queue_push(&the_server.pub_queue, pub_subscribe(up, subj, slen));
}
return Qnil;
}
|
#unsubscribe(*args) ⇒ Object
call-seq: unsubscribe(subject=nil)
Unsubscribes to messages on the provided subject. If the subject is nil then all subscriptions for the object are removed.
Symbols can also be used as can any other object that responds to #to_s.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'ext/agoo/upgraded.c', line 234
static VALUE
up_unsubscribe(int argc, VALUE *argv, VALUE self) {
Upgraded up;
const char *subject = NULL;
int slen = 0;
if (0 < argc) {
subject = extract_subject(argv[0], &slen);
}
if (NULL != (up = get_upgraded(self))) {
atomic_fetch_add(&up->pending, 1);
queue_push(&the_server.pub_queue, pub_unsubscribe(up, subject, slen));
}
return Qnil;
}
|
#write(msg) ⇒ Object
call-seq: write(msg)
Writes a message to the WebSocket or SSE connection. Returns true if the message has been queued and false otherwise. A closed connection or too many pending messages could cause a value of false to be returned.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'ext/agoo/upgraded.c', line 171
static VALUE
up_write(VALUE self, VALUE msg) {
Upgraded up = get_upgraded(self);
Pub p;
if (NULL == up) {
return Qfalse;
}
if (0 < the_server.max_push_pending && the_server.max_push_pending <= atomic_load(&up->pending)) {
atomic_fetch_sub(&up->ref_cnt, 1);
// Too many pending messages.
return Qfalse;
}
if (T_STRING == rb_type(msg)) {
if (RB_ENCODING_IS_ASCII8BIT(msg)) {
p = pub_write(up, StringValuePtr(msg), RSTRING_LEN(msg), true);
} else {
p = pub_write(up, StringValuePtr(msg), RSTRING_LEN(msg), false);
}
} else {
volatile VALUE rs = rb_funcall(msg, to_s_id, 0);
p = pub_write(up, StringValuePtr(rs), RSTRING_LEN(rs), false);
}
atomic_fetch_add(&up->pending, 1);
queue_push(&the_server.pub_queue, p);
return Qtrue;
}
|