Class: Jlog::Reader

Inherits:
Jlog
  • Object
show all
Defined in:
ext/jlog/jlog.c

Constant Summary

Constants inherited from Jlog

VERSION

Instance Method Summary collapse

Methods inherited from Jlog

#add_subscriber, #close, #destroy, #inspect, #list_subscribers, #raw_size, #remove_subscriber

Constructor Details

#initialize(*args) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'ext/jlog/jlog.c', line 58

VALUE rJlog_initialize(int argc, VALUE* argv, VALUE klass) 
{
   int options;
   Jlog jo;
   jlog_id zeroed = {0,0};
   VALUE path;
   VALUE optarg;
   VALUE size;

   rb_scan_args(argc, argv, "12", &path, &optarg, &size);

   if(NIL_P(optarg)) {
      options = O_CREAT;
   } else {
      options = (int)NUM2INT(optarg);
   }

   if(NIL_P(size)) {
      size = (size_t)INT2FIX(0);
   }

   Data_Get_Struct(klass, jlog_obj, jo);

   jo->ctx = jlog_new(StringValuePtr(path));
   jo->path = strdup(StringValuePtr(path));
   jo->auto_checkpoint = 0;
   jo->start = zeroed;
   jo->prev = zeroed;
   jo->last = zeroed;
   jo->end = zeroed;


   if(!jo->ctx) {
      rJlog_free(jo);
      rb_raise(eJlog, "jlog_new(%s) failed", StringValuePtr(path));
   }

   if(options & O_CREAT) {
      if(jlog_ctx_init(jo->ctx) != 0) {
         if(jlog_ctx_err(jo->ctx) == JLOG_ERR_CREATE_EXISTS) {
            if(options & O_EXCL) {
               rJlog_free(jo);
               rb_raise(eJlog, "file already exists: %s", StringValuePtr(path));
            }
         }else {
            rJlog_raise(jo, "Error initializing jlog");
         }
      }
      jlog_ctx_close(jo->ctx);
      jo->ctx = jlog_new(StringValuePtr(path));
      if(!jo->ctx) {
         rJlog_free(jo);
         rb_raise(eJlog, "jlog_new(%s) failed after successful init", StringValuePtr(path));
      }
      rJlog_populate_subscribers(klass);
   } 

   if(!jo) { 
      rb_raise(eJlog, "jo wasn't initialized.");
   }

   return klass;
}

Instance Method Details

#auto_checkpoint(*args) ⇒ Object



533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
# File 'ext/jlog/jlog.c', line 533

VALUE rJlog_R_auto_checkpoint(int argc, VALUE *argv, VALUE self)
{
   Jlog jo;

   Data_Get_Struct(self, jlog_obj, jo);

   if(!jo || !jo->ctx) {
      rb_raise(eJlog, "Invalid jlog context");
   }

   if(argc > 0) {
      int ac = FIX2INT(argv[0]);
      jo->auto_checkpoint = ac;
   }

   return INT2FIX(jo->auto_checkpoint);
}

#checkpointObject



508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'ext/jlog/jlog.c', line 508

VALUE rJlog_R_checkpoint(VALUE self)
{
   jlog_id epoch = { 0, 0 };
   Jlog_Reader jo;

   Data_Get_Struct(self, jlog_obj, jo);

   if(!jo || !jo->ctx) {
      rb_raise(eJlog, "Invalid jlog context");
   }

   if(memcmp(&jo->last, &epoch, sizeof(jlog_id)))
   {
      jlog_ctx_read_checkpoint(jo->ctx, &jo->last);

      // re-read the interval
      jo->last = epoch;
      jo->start = epoch;
      jo->end = epoch;
   }

   return Qtrue;
}

#open(subscriber) ⇒ Object



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'ext/jlog/jlog.c', line 307

VALUE rJlog_R_open(VALUE self, VALUE subscriber)
{
   Jlog_Reader jo;
   int err;

   Data_Get_Struct(self, jlog_obj, jo);

   if(!jo || !jo->ctx) {
      rb_raise(eJlog, "Invalid jlog context");
   }

   err = jlog_ctx_open_reader(jo->ctx, StringValuePtr(subscriber));

   if(err != 0) {
      rJlog_raise(jo, "jlog_ctx_open_reader failed");
   }

   return Qtrue;
}

#readObject



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'ext/jlog/jlog.c', line 328

VALUE rJlog_R_read(VALUE self)
{
   const jlog_id epoch = {0, 0};
   jlog_id cur = {0, 0};
   jlog_message message;
   int cnt;
   Jlog_Reader jo;

   Data_Get_Struct(self, jlog_obj, jo);

   if(!jo || !jo->ctx) {
      rb_raise(eJlog, "Invalid jlog context");
   }

   // If start is unset, read the interval
   if(jo->error || !memcmp(&jo->start, &epoch, sizeof(jlog_id)))
   {
      jo->error = 0;
      cnt = jlog_ctx_read_interval(jo->ctx, &jo->start, &jo->end);
      if(cnt == 0 || (cnt == -1 && jlog_ctx_err(jo->ctx) == JLOG_ERR_FILE_OPEN)) {
         jo->start = epoch;
         jo->end = epoch;
         return Qnil;
      }
      else if(cnt == -1)
         rJlog_raise(jo, "jlog_ctx_read_interval_failed");
   }

   // If last is unset, start at the beginning
   if(!memcmp(&jo->last, &epoch, sizeof(jlog_id))) {
      cur = jo->start;
   } else {
      // if we've already read the end, return; otherwise advance
      cur = jo->last;
      if(!memcmp(&jo->prev, &jo->end, sizeof(jlog_id))) {
         jo->start = epoch;
         jo->end = epoch;
         return Qnil;
      }
      jlog_ctx_advance_id(jo->ctx, &jo->last, &cur, &jo->end);
      if(!memcmp(&jo->last, &cur, sizeof(jlog_id))) {
            jo->start = epoch;
            jo->end = epoch;
            return Qnil;
      }
   }

   if(jlog_ctx_read_message(jo->ctx, &cur, &message) != 0) {
      if(jlog_ctx_err(jo->ctx) == JLOG_ERR_FILE_OPEN) {
         jo->error = 1;
         rJlog_raise(jo, "jlog_ctx_read_message failed");
         return Qnil;
      }

      // read failed; raise error but recover if read is retried
      jo->error = 1;
      rJlog_raise(jo, "read failed");
   }

   if(jo->auto_checkpoint) {
      if(jlog_ctx_read_checkpoint(jo->ctx, &cur) != 0)
         rJlog_raise(jo, "checkpoint failed");

      // must reread the interval after a checkpoint
      jo->last = epoch;
      jo->prev = epoch;
      jo->start = epoch;
      jo->end = epoch;
   } else {
      // update last
      jo->prev = jo->last;
      jo->last = cur;
   }

   return rb_str_new2(message.mess);
}

#read_messageObject



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'ext/jlog/jlog.c', line 405

VALUE rJlog_R_read_message(VALUE self)
{
   const jlog_id epoch = {0, 0};
   jlog_id cur = {0, 0};
   jlog_message message;
   int cnt;
   double ts;
   Jlog_Reader jo;
   VALUE message_hash;

   Data_Get_Struct(self, jlog_obj, jo);

   if(!jo || !jo->ctx) {
      rb_raise(eJlog, "Invalid jlog context");
   }

   // If start is unset, read the interval
   if(jo->error || !memcmp(&jo->start, &epoch, sizeof(jlog_id)))
   {
      jo->error = 0;
      cnt = jlog_ctx_read_interval(jo->ctx, &jo->start, &jo->end);
      if(cnt == 0 || (cnt == -1 && jlog_ctx_err(jo->ctx) == JLOG_ERR_FILE_OPEN)) {
         jo->start = epoch;
         jo->end = epoch;
         return Qnil;
      }
      else if(cnt == -1)
         rJlog_raise(jo, "jlog_ctx_read_interval_failed");
   }

   // If last is unset, start at the beginning
   if(!memcmp(&jo->last, &epoch, sizeof(jlog_id))) {
      cur = jo->start;
   } else {
      // if we've already read the end, return; otherwise advance
      cur = jo->last;
      if(!memcmp(&jo->prev, &jo->end, sizeof(jlog_id))) {
         jo->start = epoch;
         jo->end = epoch;
         return Qnil;
      }
      jlog_ctx_advance_id(jo->ctx, &jo->last, &cur, &jo->end);
      if(!memcmp(&jo->last, &cur, sizeof(jlog_id))) {
            jo->start = epoch;
            jo->end = epoch;
            return Qnil;
      }
   }

   if(jlog_ctx_read_message(jo->ctx, &cur, &message) != 0) {
      if(jlog_ctx_err(jo->ctx) == JLOG_ERR_FILE_OPEN) {
         jo->error = 1;
         rJlog_raise(jo, "jlog_ctx_read_message failed");
         return Qnil;
      }

      // read failed; raise error but recover if read is retried
      jo->error = 1;
      rJlog_raise(jo, "read failed");
   }

   if(jo->auto_checkpoint) {
      if(jlog_ctx_read_checkpoint(jo->ctx, &cur) != 0)
         rJlog_raise(jo, "checkpoint failed");

      // must reread the interval after a checkpoint
      jo->last = epoch;
      jo->prev = epoch;
      jo->start = epoch;
      jo->end = epoch;
   } else {
      // update last
      jo->prev = jo->last;
      jo->last = cur;
   }

   ts = message.header->tv_sec+(message.header->tv_usec/1000000.0);

   message_hash = rb_hash_new();
   rb_hash_aset(message_hash, message_sym, rb_str_new2(message.mess));
   rb_hash_aset(message_hash, timestamp_sym, rb_float_new(ts));

   
   return message_hash;
}

#rewindObject



492
493
494
495
496
497
498
499
500
501
502
503
504
505
# File 'ext/jlog/jlog.c', line 492

VALUE rJlog_R_rewind(VALUE self)
{
   Jlog_Reader jo;

   Data_Get_Struct(self, jlog_obj, jo);

   if(!jo || !jo->ctx) {
      rb_raise(eJlog, "Invalid jlog context");
   }

   jo->last = jo->prev;

   return Qtrue;
}