Class: Statsrb

Inherits:
Object
  • Object
show all
Defined in:
lib/statsrb.rb,
ext/statsrb/statsrb.c

Overview

Author:

  • Kevin Hankens

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeObject

Class constructor, sets up an instance variable.



722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
# File 'ext/statsrb/statsrb.c', line 722

static VALUE statsrb_constructor(VALUE self) {
  VALUE statsrb_split_file_dir = rb_str_new("/tmp", 4);
  rb_iv_set(self, "@split_file_dir", statsrb_split_file_dir);
  rb_iv_set(self, "@flush_count", INT2NUM(9));

  // Internal symbols for :ts, :ns and :v.
  VALUE statsrb_key_ts = rb_str_intern(rb_str_new2("ts"));
  rb_iv_set(self, "@key_ts", statsrb_key_ts);
  VALUE statsrb_key_ns = rb_str_intern(rb_str_new2("ns"));
  rb_iv_set(self, "@key_ns", statsrb_key_ns);
  VALUE statsrb_key_v = rb_str_intern(rb_str_new2("v"));
  rb_iv_set(self, "@key_v", statsrb_key_v);

  return self;
}

Instance Attribute Details

#flush_countObject

When used with a rack server, the max count of internal events.

#split_file_dirObject

The file directory to write when splitting namespaces. @see #split_write

Instance Method Details

#call(env) ⇒ Object

Returns a rack-compatible response.

Parameters:

  • env (Hash)


543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
# File 'ext/statsrb/statsrb.c', line 543

static VALUE statsrb_rack_call(VALUE self, VALUE env) {
  VALUE response = rb_ary_new();
  VALUE headers = rb_hash_new();
  VALUE body = rb_ary_new();

  char *path = RSTRING_PTR(rb_hash_aref(env, rb_str_new2("PATH_INFO")));

  rb_hash_aset(headers, rb_str_new2("Content-Type"), rb_str_new2("text/json"));

  // Parse the query string
  char *qs = RSTRING_PTR(rb_hash_aref(env, rb_str_new2("QUERY_STRING")));
  VALUE query_string = statsrb_parse_qs(qs);

  //const char *method = RSTRING_PTR(rb_hash_aref(env, rb_str_new2("REQUEST_METHOD")));
  // @TODO consider moving the request method to the proper REQUEST_METHOD
  const char *method_get = "get";
  const char *method_getu = "GET";
  const char *method_put = "put";
  const char *method_putu = "PUT";
  // Remove the leading slash.
  path++;
  const char *method = strtok(path, "/\0");
  if (method && (strcmp(method, method_put) == 0 || strcmp(method, method_putu) == 0)) {
    long int statsrb_ts, statsrb_v;

    // Get the timestamp, default to now.
    VALUE statsrb_ts_qs = rb_hash_aref(query_string, rb_str_new("time", 4));
    if (statsrb_ts_qs != Qnil) {
      statsrb_ts = atoi(RSTRING_PTR(statsrb_ts_qs ));
    }
    else {
      statsrb_ts = (long int)time(NULL);
    }

    // Get the namespace.
    VALUE statsrb_ns = rb_hash_aref(query_string, rb_str_new("name", 4));
    if (statsrb_ns == Qnil) {
      statsrb_ns = NULL;
    }

    if (statsrb_ns) {
      // Get the value.
      statsrb_v= 0;
      VALUE statsrb_v_qs = rb_hash_aref(query_string, rb_str_new("value", 5));
      if (statsrb_v_qs != Qnil) {
        statsrb_v = atoi(RSTRING_PTR(statsrb_v_qs));
      }

      statsrb_data_push_event(self, RSTRING_PTR(statsrb_ns), statsrb_ts, statsrb_v);

      int data_length = NUM2INT(statsrb_length(self));

      rb_ary_push(body, rb_obj_as_string(INT2NUM(data_length)));

      if (data_length >= NUM2INT(rb_iv_get(self, "@flush_count"))) {
        statsrb_sort(self);
        statsrb_split_write(self, rb_iv_get(self, "@split_file_dir"), rb_str_new2("a+"));
        statsrb_data_clear_events(self);
      }

      rb_ary_push(body, statsrb_ns);
    }
  }
  else if (method && (strcmp(method, method_get) == 0 || strcmp(method, method_getu) == 0)) {
    const char * statsrb_str_ns = strtok(NULL, "/\0");
    if (statsrb_str_ns == NULL) {
      statsrb_str_ns = "data";
    }

    VALUE jsoncallback = rb_hash_aref(query_string, rb_str_new("jsoncallback", 12));
    if (jsoncallback != Qnil) {
      rb_ary_push(body, rb_str_plus(jsoncallback, rb_str_new("(", 1)));
    }
    // @TODO move this to a to_json method.
    char json_start[256];
    sprintf(json_start, "{\"%s\":[", statsrb_str_ns);
    rb_ary_push(body, rb_str_new2(json_start));

    // If they didn't specify a namespace, bail out immediately.
    if (statsrb_str_ns) {
      VALUE statsrb_ns = rb_str_new2(statsrb_str_ns);
      long int query_limit, query_start, query_end;

      // Get the query limit.
      query_limit = 100;
      VALUE query_limit_qs = rb_hash_aref(query_string, rb_str_new("limit", 5));
      if (query_limit_qs != Qnil) {
        query_limit = atoi(RSTRING_PTR(query_limit_qs));
      }

      // Get the query start.
      query_start = 0;
      VALUE query_start_qs = rb_hash_aref(query_string, rb_str_new("start", 5));
      if (query_start_qs != Qnil) {
        query_start = atoi(RSTRING_PTR(query_start_qs));
      }

      // Get the query end.
      query_end = 0;
      VALUE query_end_qs = rb_hash_aref(query_string, rb_str_new("end", 3));
      if (query_end_qs != Qnil) {
        query_end = atoi(RSTRING_PTR(query_end_qs));
      }

      // Get the past N seconds of data.
      // @TODO the query method fails if we query for data newer than the last entry.
      VALUE query_recent = rb_hash_aref(query_string, rb_str_new("recent", 6));
      if (query_recent != Qnil) {
        query_end = (long int)time(NULL);
        long int history = atoi(RSTRING_PTR(query_recent));
        query_start = query_end - history;
      }

      // Create a new Statsrb object to query from.
      VALUE klass = rb_obj_class(self);
      VALUE tmp = rb_class_new_instance(0, NULL, klass);

      statsrb_read(tmp, rb_str_plus(rb_iv_get(self, "@split_file_dir"), statsrb_ns), statsrb_ns, INT2NUM(query_limit), INT2NUM(query_start), INT2NUM(query_end));
      statsrb_sort(tmp);

      int i, data_length = NUM2INT(statsrb_length(tmp));
      StatsrbInternal *internal = statsrb_get_internal(tmp);

      for (i = 0; i < data_length; i++) {
        rb_ary_push(body, rb_str_new("[", 1));
        rb_ary_push(body, rb_obj_as_string(INT2NUM(internal->event_list[i].timestamp)));
        rb_ary_push(body, rb_str_new(",\"", 2));
        rb_ary_push(body, rb_str_new2(internal->ns_list[internal->event_list[i].ns_index].namespace));
        rb_ary_push(body, rb_str_new("\",", 2));
        rb_ary_push(body, rb_obj_as_string(INT2NUM(internal->event_list[i].value)));
        rb_ary_push(body, rb_str_new("]", 1));

        if (i < data_length - 1) {
          rb_ary_push(body, rb_str_new(",", 1));
        }
        rb_ary_push(body, rb_str_new("\n", 1));
      }
      statsrb_data_clear_events(tmp);
    }
    rb_ary_push(body, rb_str_new("]}", 2));
    if (jsoncallback != Qnil) {
      rb_ary_push(body, rb_str_new(")", 1));
    }
  }
  else {
    rb_ary_push(response, INT2NUM(404));
    rb_ary_push(response, headers);
    rb_ary_push(response, body);
    return response;
  }

  rb_ary_push(response, INT2NUM(200));
  rb_ary_push(response, headers);
  rb_ary_push(response, body);

  return response;
}

#clearObject

#get(query_ns, query_limit, query_start, query_end) ⇒ Array

Retrieves internal data based on specified filters.

Parameters:

  • namespace (String)
  • limit (Number)
  • start_time (Number)
  • end_time (Number)

Returns:

  • (Array)

    An array of data event hashes.



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'ext/statsrb/statsrb.c', line 293

static VALUE statsrb_get(VALUE self, VALUE query_ns, VALUE query_limit, VALUE query_start, VALUE query_end) {
  // @TODO maybe it would be sane to make a new statsrb object and then just have
  // methods to dump everything to ary, json, etc.
  StatsrbInternal *internal = statsrb_get_internal(self);
  int tmp_ts, tmp_v, tmp_i;

  VALUE filtered_data = rb_ary_new();
  VALUE rb_ns_list = rb_ary_new();
  VALUE statsrb_event;

  int i = 0;
  int filtered_count = 0;

  int limit = NUM2INT(query_limit);
  int qstart = NUM2INT(query_start);
  int qend = NUM2INT(query_end);

  VALUE rb_ns;

  // Create rb strings for the namespaces.
  signed int found = -1;
  for (i = 0; i < internal->ns_count; i++) {
    rb_hash_aset(rb_ns_list, INT2NUM(i), rb_str_new2(internal->ns_list[i].namespace));
    if (strcmp(RSTRING_PTR(query_ns), RSTRING_PTR(rb_hash_aref(rb_ns_list, INT2NUM(i)))) == 0) {
      memcpy(&found, &i, sizeof(int));
    }
  }

  // Return right away if the namespace doesn't exist.
  if (found == -1) {
    rb_ary_resize(filtered_data, (long) 0);
    return filtered_data;
  }

  // Iterate through the in-memory data to find matches.
  for (i = 0; i < internal->event_count; i++) {
    if (found == internal->event_list[i].ns_index
        && (qstart == 0 || internal->event_list[i].timestamp >= qstart)
        && (qend == 0 || internal->event_list[i].timestamp <= qend)) {

      memcpy(&tmp_ts, &internal->event_list[i].timestamp, sizeof(int));
      memcpy(&tmp_v, &internal->event_list[i].value, sizeof(int));

      statsrb_event = statsrb_create_rb_event_hash(
        self,
        INT2NUM(tmp_ts),
        rb_hash_aref(rb_ns_list, INT2NUM(found)),
        INT2NUM(tmp_v)
      );

      rb_ary_push(filtered_data, statsrb_event);
      filtered_count++;
    }

    if (limit > 0 && filtered_count == limit) {
      break;
    }
  }

  rb_ary_resize(filtered_data, filtered_count);
  return filtered_data;
}

#lengthNumeric

Returns the length of the internal storage.

Returns:

  • (Numeric)

    The count of items in the internal storage.



102
103
104
105
106
107
108
109
# File 'ext/statsrb/statsrb.c', line 102

static VALUE statsrb_length(VALUE self) {
  StatsrbInternal *internal = statsrb_get_internal(self);
  if (!internal->event_count) {
    internal->event_count = 0;
  }

  return INT2NUM(internal->event_count);
}

#load_testObject

#push(timestamp, namespace, value) ⇒ Statsrb

Pushes a stat onto the statsrb object.

Parameters:

  • timestamp (Number)
  • namespace (String)
  • value (Number)

Returns:

  • (Statsrb)

    A reference to the object.



277
278
279
280
281
282
283
# File 'ext/statsrb/statsrb.c', line 277

static VALUE statsrb_push(VALUE self, VALUE timestamp, VALUE namespace, VALUE value) {
  int ts = NUM2INT(timestamp);
  int v = NUM2INT(value);
  const char *ns = RSTRING_PTR(namespace);
  statsrb_data_push_event(self, ns, ts, v);
  return self;
}

#query(logfile, query_ns, query_limit, query_start, query_end) ⇒ Statsrb

Locates data from a specified file and loads into internal memory.

Parameters:

  • filepath (String)
  • namespace (String)
  • limit (Number)
  • start_time (Number)
  • end_time (Number)

Returns:

  • (Statsrb)

    A reference to the object.



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'ext/statsrb/statsrb.c', line 365

static VALUE statsrb_read(VALUE self, VALUE logfile, VALUE query_ns, VALUE query_limit, VALUE query_start, VALUE query_end) {
  FILE * file;
  int line_size = 512;
  char *line = (char *) malloc(line_size);
  char *tmp_ns = (char *) malloc(256);
  const char *filepath = RSTRING_PTR(logfile);
  const char *query_ns_char = RSTRING_PTR(query_ns);
  int tmp_v, tmp_ts;

  // Convert into an int that ruby understands.
  int limit = NUM2INT(query_limit);
  int qstart = NUM2INT(query_start);
  int qend = NUM2INT(query_end);

  file = fopen(filepath, "r");
  if (file == NULL) {
    fprintf(stderr, "File error: could not open file %s for reading.", filepath);
    return self;
  }

  int count = 0;

  while (NULL != fgets(line, line_size, file) && count < limit) {
    // strstr doesn't work with newline chars.
    size_t len = strlen(line) - 1;
    if (line[len] == '\n');
        line[len] = '\0';

    // If the namespace is in the row, explode it.
    if (line[0] != '\0' && line[0] != '\n' && strchr(line, query_ns_char[0]) && strstr(line, query_ns_char)) {
      //VALUE statsrb_event = rb_hash_new();

      // I tried sscanf for convenience, but it was predictably slower.
      //int statsrb_ts, statsrb_v;
      //sscanf(line, "%d\t%*s\t%d", &statsrb_ts, &statsrb_v);

      // @TODO this should something more robust than atoi.
      tmp_ts = atoi(strtok(line, "\t"));

      if (tmp_ts != NULL && (qstart == 0 || tmp_ts >= qstart) && (qend == 0 || tmp_ts <= qend)) {
        // @TODO this should probably use the actual namespace if we do wildcard queries.
        strcpy(tmp_ns, strtok(NULL, "\t"));
        //strtok(NULL, "\t");
        tmp_v = atoi(strtok(NULL, "\0"));

        // @TODO this should really query the namespace exactly instead of just relying on strstr.
        //if (rb_str_cmp(query_ns, statsrb_str_empty) == 0 || rb_str_cmp(query_ns, statsrb_str_ns) == 0) {
        if (tmp_ts && (tmp_v || tmp_v == 0)) {
          statsrb_data_push_event(self,
            tmp_ns,
            tmp_ts,
            tmp_v);
          count++;
        }
      }
    }
  }

  // terminate
  fclose (file);
  free (line);
  free (tmp_ns);

  return self;
}

#read(logfile, query_ns, query_limit, query_start, query_end) ⇒ Statsrb

Locates data from a specified file and loads into internal memory.

Parameters:

  • filepath (String)
  • namespace (String)
  • limit (Number)
  • start_time (Number)
  • end_time (Number)

Returns:

  • (Statsrb)

    A reference to the object.



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'ext/statsrb/statsrb.c', line 365

static VALUE statsrb_read(VALUE self, VALUE logfile, VALUE query_ns, VALUE query_limit, VALUE query_start, VALUE query_end) {
  FILE * file;
  int line_size = 512;
  char *line = (char *) malloc(line_size);
  char *tmp_ns = (char *) malloc(256);
  const char *filepath = RSTRING_PTR(logfile);
  const char *query_ns_char = RSTRING_PTR(query_ns);
  int tmp_v, tmp_ts;

  // Convert into an int that ruby understands.
  int limit = NUM2INT(query_limit);
  int qstart = NUM2INT(query_start);
  int qend = NUM2INT(query_end);

  file = fopen(filepath, "r");
  if (file == NULL) {
    fprintf(stderr, "File error: could not open file %s for reading.", filepath);
    return self;
  }

  int count = 0;

  while (NULL != fgets(line, line_size, file) && count < limit) {
    // strstr doesn't work with newline chars.
    size_t len = strlen(line) - 1;
    if (line[len] == '\n');
        line[len] = '\0';

    // If the namespace is in the row, explode it.
    if (line[0] != '\0' && line[0] != '\n' && strchr(line, query_ns_char[0]) && strstr(line, query_ns_char)) {
      //VALUE statsrb_event = rb_hash_new();

      // I tried sscanf for convenience, but it was predictably slower.
      //int statsrb_ts, statsrb_v;
      //sscanf(line, "%d\t%*s\t%d", &statsrb_ts, &statsrb_v);

      // @TODO this should something more robust than atoi.
      tmp_ts = atoi(strtok(line, "\t"));

      if (tmp_ts != NULL && (qstart == 0 || tmp_ts >= qstart) && (qend == 0 || tmp_ts <= qend)) {
        // @TODO this should probably use the actual namespace if we do wildcard queries.
        strcpy(tmp_ns, strtok(NULL, "\t"));
        //strtok(NULL, "\t");
        tmp_v = atoi(strtok(NULL, "\0"));

        // @TODO this should really query the namespace exactly instead of just relying on strstr.
        //if (rb_str_cmp(query_ns, statsrb_str_empty) == 0 || rb_str_cmp(query_ns, statsrb_str_ns) == 0) {
        if (tmp_ts && (tmp_v || tmp_v == 0)) {
          statsrb_data_push_event(self,
            tmp_ns,
            tmp_ts,
            tmp_v);
          count++;
        }
      }
    }
  }

  // terminate
  fclose (file);
  free (line);
  free (tmp_ns);

  return self;
}

#sortStatsrb

Sorts internal data using a quicksort algorithm based on the hash element’s timestamp.

Returns:

  • (Statsrb)

    A reference to the object.



165
166
167
168
169
170
171
172
# File 'ext/statsrb/statsrb.c', line 165

static VALUE statsrb_sort(VALUE self) {
  StatsrbInternal *internal = statsrb_get_internal(self);
  if (internal->event_count > 0) {
    time_sort(0, internal->event_count - 1, internal->event_list);
  }

  return self;
}

#split_write(logdir, mode) ⇒ Statsrb

Writes the in memory data to a separate files based on namespace.

Parameters:

  • filepath (String)
  • filemode (String)

Returns:

  • (Statsrb)

    A reference to the object.



473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'ext/statsrb/statsrb.c', line 473

static VALUE statsrb_split_write(VALUE self, VALUE logdir, VALUE mode) {
  StatsrbInternal *internal = statsrb_get_internal(self);
  int i, ii, ns_len;

  VALUE filename;
  VALUE klass = rb_obj_class(self);
  VALUE tmp = rb_class_new_instance(0, NULL, klass);

  for (i = 0; i < internal->ns_count; i++) {
    for (ii = 0; ii < internal->event_count; ii++) {
      if (strcmp(internal->ns_list[i].namespace, internal->ns_list[internal->event_list[ii].ns_index].namespace) == 0) {
        statsrb_data_push_event(tmp,
          internal->ns_list[internal->event_list[ii].ns_index].namespace,
          internal->event_list[ii].timestamp,
          internal->event_list[ii].value);
      }
    }

    // If there is no trailing slash on the log dir, add one.
    const char *filepath = RSTRING_PTR(logdir);
    size_t len = strlen(filepath);
    if (filepath[len - 1] != '/') {
      logdir = rb_str_plus(logdir, rb_str_new2("/"));
    }
    filename = rb_str_new2(internal->ns_list[i].namespace);
    statsrb_write(tmp, rb_str_plus(logdir, filename), mode);
    statsrb_data_clear_events(tmp);
  }

  return self;
}

#write(logfile, mode) ⇒ Statsrb

Writes the in memory data to a specified file.

Parameters:

  • filepath (String)
  • filemode (String)

Returns:

  • (Statsrb)

    A reference to the object.



438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'ext/statsrb/statsrb.c', line 438

static VALUE statsrb_write(VALUE self, VALUE logfile, VALUE mode) {
  FILE * file;
  const char *filepath = RSTRING_PTR(logfile);
  const char *filemode = RSTRING_PTR(mode);

  StatsrbInternal *internal = statsrb_get_internal(self);
  int i;

  file = fopen(filepath, filemode);
  if (file==NULL) {
    fprintf(stderr, "File error: could not open file %s mode %s.", filepath, filemode);
    return self;
  }

  // Iterate through the internal data, writing as we go.
  for (i = 0; i < internal->event_count; i++) {
    // @TODO make sure that these values are not empty before writing.
    fprintf(file,
            "%d\t%s\t%d\n",
            internal->event_list[i].timestamp,
            internal->ns_list[internal->event_list[i].ns_index].namespace,
            internal->event_list[i].value
    );
  }

  fclose (file);
  return self;
}