Class: BlackStack::Pampa::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/pampa.rb

Overview

stub job class

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(h) ⇒ Job

setup dispatcher configuration here



442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/pampa.rb', line 442

def initialize(h)
  errors = BlackStack::Pampa::Job.descriptor_errors(h)
  raise "The job descriptor is not valid: #{errors.uniq.join(".\n")}" if errors.length > 0        
  self.name = h[:name]
  self.table = h[:table]
  self.field_primary_key = h[:field_primary_key]
  self.field_id = h[:field_id]
  self.field_time = h[:field_time]
  self.field_times = h[:field_times]
  self.field_start_time = h[:field_start_time]
  self.field_end_time = h[:field_end_time]
  self.field_success = h[:field_success]
  self.field_error_description = h[:field_error_description]
  self.queue_size = h[:queue_size]
  self.max_job_duration_minutes = h[:max_job_duration_minutes]  
  self.max_try_times = h[:max_try_times]

  # dispatching custom functions
  self.occupied_function = h[:occupied_function]
  self.allowing_function = h[:allowing_function]
  self.selecting_function = h[:selecting_function]
  self.relaunching_function = h[:relaunching_function]
  self.relauncher_function = h[:relauncher_function]
  self.processing_function = h[:processing_function]

  # reporting custom functions
  self.total_function = h[:total_function]
  self.completed_function = h[:completed_function]
  self.pending_function = h[:pending_function]
  self.failed_function = h[:failed_function]

  # elastic workers assignation
  self.max_pending_tasks = h[:max_pending_tasks]
  self.max_assigned_workers = h[:max_assigned_workers]
  self.filter_worker_id = h[:filter_worker_id]
end

Instance Attribute Details

#allowing_functionObject

additional function to decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client it should returns true or false keep it nil if you want it returns always true



355
356
357
# File 'lib/pampa.rb', line 355

def allowing_function
  @allowing_function
end

#completed_functionObject

Returns the value of attribute completed_function.



383
384
385
# File 'lib/pampa.rb', line 383

def completed_function
  @completed_function
end

#failed_functionObject

Returns the value of attribute failed_function.



385
386
387
# File 'lib/pampa.rb', line 385

def failed_function
  @failed_function
end

#field_end_timeObject

Returns the value of attribute field_end_time.



333
334
335
# File 'lib/pampa.rb', line 333

def field_end_time
  @field_end_time
end

#field_error_descriptionObject

Returns the value of attribute field_error_description.



335
336
337
# File 'lib/pampa.rb', line 335

def field_error_description
  @field_error_description
end

#field_idObject

Returns the value of attribute field_id.



329
330
331
# File 'lib/pampa.rb', line 329

def field_id
  @field_id
end

#field_primary_keyObject

Returns the value of attribute field_primary_key.



328
329
330
# File 'lib/pampa.rb', line 328

def field_primary_key
  @field_primary_key
end

#field_start_timeObject

Returns the value of attribute field_start_time.



332
333
334
# File 'lib/pampa.rb', line 332

def field_start_time
  @field_start_time
end

#field_successObject

Returns the value of attribute field_success.



334
335
336
# File 'lib/pampa.rb', line 334

def field_success
  @field_success
end

#field_timeObject

Returns the value of attribute field_time.



330
331
332
# File 'lib/pampa.rb', line 330

def field_time
  @field_time
end

#field_timesObject

Returns the value of attribute field_times.



331
332
333
# File 'lib/pampa.rb', line 331

def field_times
  @field_times
end

#filter_worker_idObject

choose workers to assign tasks



394
395
396
# File 'lib/pampa.rb', line 394

def filter_worker_id
  @filter_worker_id
end

#finisher_functionObject

additional function to perform the update on a record to flag the finishing of the job by default this function will set the :field_end_time field with the current datetime keep this parameter nil if you want to use the default algorithm



373
374
375
# File 'lib/pampa.rb', line 373

def finisher_function
  @finisher_function
end

#max_assigned_workersObject

Returns the value of attribute max_assigned_workers.



391
392
393
# File 'lib/pampa.rb', line 391

def max_assigned_workers
  @max_assigned_workers
end

#max_job_duration_minutesObject

max number of minutes that a job should take to process. if :end_time keep nil x minutes after :start_time, that’s considered as the job has failed or interrumped



340
341
342
# File 'lib/pampa.rb', line 340

def max_job_duration_minutes
  @max_job_duration_minutes
end

#max_pending_tasksObject

ELASTIC WORKERS ASSIGNATION

stretch assignation/unassignation of workers



390
391
392
# File 'lib/pampa.rb', line 390

def max_pending_tasks
  @max_pending_tasks
end

#max_try_timesObject

max number of times that a record can start to process & fail (:start_time field is not nil, but :end_time field is still nil after :max_job_duration_minutes)



343
344
345
# File 'lib/pampa.rb', line 343

def max_try_times
  @max_try_times
end

#nameObject

Returns the value of attribute name.



324
325
326
# File 'lib/pampa.rb', line 324

def name
  @name
end

#occupied_functionObject

CUSTOM DISPATCHING FUNCTIONS

additional function to returns an array of tasks pending to be processed by a worker. it should returns an array keep it nil if you want to run the default function



350
351
352
# File 'lib/pampa.rb', line 350

def occupied_function
  @occupied_function
end

#pending_functionObject

Returns the value of attribute pending_function.



384
385
386
# File 'lib/pampa.rb', line 384

def pending_function
  @pending_function
end

#processing_functionObject

Function to execute for each task.



375
376
377
# File 'lib/pampa.rb', line 375

def processing_function
  @processing_function
end

#queue_sizeObject

max number of records assigned to a worker that have not started (:start_time field is nil)



337
338
339
# File 'lib/pampa.rb', line 337

def queue_size
  @queue_size
end

#relauncher_functionObject

additional function to perform the update on a record to retry keep this parameter nil if you want to use the default algorithm



365
366
367
# File 'lib/pampa.rb', line 365

def relauncher_function
  @relauncher_function
end

#relaunching_functionObject

additional function to choose the records to retry keep this parameter nil if you want to use the default algorithm



362
363
364
# File 'lib/pampa.rb', line 362

def relaunching_function
  @relaunching_function
end

#selecting_functionObject

additional function to choose the records to launch it should returns an array of IDs keep this parameter nil if you want to use the default algorithm



359
360
361
# File 'lib/pampa.rb', line 359

def selecting_function
  @selecting_function
end

#starter_functionObject

additional function to perform the update on a record to flag the starting of the job by default this function will set the :field_start_time field with the current datetime, and it will increase the :field_times counter keep this parameter nil if you want to use the default algorithm



369
370
371
# File 'lib/pampa.rb', line 369

def starter_function
  @starter_function
end

#tableObject

database information :field_times, :field_start_time and :field_end_time maybe nil



327
328
329
# File 'lib/pampa.rb', line 327

def table
  @table
end

#total_functionObject

CUSTOM REPORTING FUNCTIONS

additional function to returns the number of total tasks. it should returns an array keep it nil if you want to run the default function



382
383
384
# File 'lib/pampa.rb', line 382

def total_function
  @total_function
end

Class Method Details

.descriptor_errors(h) ⇒ Object

return an array with the errors found in the description of the job



435
436
437
438
439
# File 'lib/pampa.rb', line 435

def self.descriptor_errors(h)
    errors = []
    # TODO: Code Me!
    errors.uniq
end

Instance Method Details

#allowing(worker) ⇒ Object

decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client returns always true



505
506
507
508
509
510
511
512
# File 'lib/pampa.rb', line 505

def allowing(worker)
  if self.allowing_function.nil?
    return true
  else
    # TODO: validar que retorna true o false
    return self.allowing_function.call(worker, self)
  end
end

#available_slots(worker) ⇒ Object

returns the number of free slots in the procesing queue of this worker



492
493
494
495
496
497
498
499
500
# File 'lib/pampa.rb', line 492

def available_slots(worker)
  occupied = self.occupied_slots(worker).size
  allowed = self.queue_size
  if occupied > allowed
    return 0
  else
    return allowed - occupied
  end
end

#completedObject

reporting method: completed reutrn the number of completed tasks. if the numbr if completed tasks is higher than ‘max_tasks_to_show` then it returns `max_tasks_to_show`+.



684
685
686
687
688
689
690
691
692
693
694
695
696
# File 'lib/pampa.rb', line 684

def completed
  j = self
  if self.completed_function.nil?
    q = "
        SELECT COUNT(*) AS n
        FROM #{j.table.to_s} 
        WHERE COALESCE(#{j.field_success.to_s},false)=true
    "
    return DB[q].first[:n].to_i
  else
    return self.completed_function.call
  end
end

#failedObject

reporting method: running return the number of running tasks. if the number if running tasks is higher than ‘max_tasks_to_show` then it returns `max_tasks_to_show`+.



719
720
721
722
723
724
725
726
727
728
729
730
731
732
# File 'lib/pampa.rb', line 719

def failed
  j = self
  if self.failed_function.nil?
    q = "
        SELECT COUNT(*) AS n
        FROM #{j.table.to_s} 
        WHERE COALESCE(#{j.field_success.to_s},false)=false
        AND COALESCE(#{j.field_times.to_s},0) >= #{j.max_try_times.to_i}
    "
    return DB[q].first[:n].to_i
  else
    return self.failed_function.call
  end
end

#finish(o, e = nil) ⇒ Object



590
591
592
593
594
595
596
597
598
599
# File 'lib/pampa.rb', line 590

def finish(o, e=nil)
  if self.finisher_function.nil?
    o[self.field_end_time.to_sym] = DB["SELECT CAST('#{now}' AS TIMESTAMP) AS dt"].first[:dt] if !self.field_end_time.nil? && e.nil? # IMPORTANT: use DB location to get current time.
    o[self.field_success.to_sym] = e.nil?
    o[self.field_error_description.to_sym] = e.to_console if !e.nil? 
    self.update(o)
  else
    self.finisher_function.call(o, e, self)
  end
end

#occupied_slots(worker) ⇒ Object

returns an array of tasks pending to be processed by the worker. it will select the records with :reservation_id == worker.id, and :start_time == nil



481
482
483
484
485
486
487
488
489
# File 'lib/pampa.rb', line 481

def occupied_slots(worker)
  if self.occupied_function.nil?
    return DB[self.table.to_sym].where(self.field_id.to_sym => worker.id, self.field_start_time.to_sym => nil).all if !self.field_start_time.nil?
    return DB[self.table.to_sym].where(self.field_id.to_sym => worker.id).all if self.field_start_time.nil?
  else
    # TODO: validar que retorna un entero
    return self.occupied_function.call(worker, self)
  end
end

#pendingObject

reporting method: pending reutrn the number of pending tasks. if the numbr if pending tasks is higher than ‘max_tasks_to_show` then it returns `max_tasks_to_show`+.



701
702
703
704
705
706
707
708
709
710
711
712
713
714
# File 'lib/pampa.rb', line 701

def pending
    j = self
    if self.pending_function.nil?
      q = "
          SELECT COUNT(*) AS n
          FROM #{j.table.to_s} 
          WHERE COALESCE(#{j.field_success.to_s},false)=false
          AND COALESCE(#{j.field_times.to_s},0) < #{j.max_try_times.to_i}
      "
      return DB[q].first[:n].to_i
    else
      return self.pending_function.call
    end
end

#relaunch(o) ⇒ Object



572
573
574
575
576
577
578
# File 'lib/pampa.rb', line 572

def relaunch(o)
  o[self.field_id.to_sym] = nil
  o[self.field_time.to_sym] = nil
  o[self.field_start_time.to_sym] = nil if !self.field_start_time.nil?
  o[self.field_end_time.to_sym] = nil if !self.field_end_time.nil?
  self.update(o)
end

#relaunching(n) ⇒ Object

returns an array of failed tasks for restarting.



547
548
549
550
551
552
553
554
# File 'lib/pampa.rb', line 547

def relaunching(n)
  if self.relaunching_function.nil?
    return self.relaunching_dataset(n)
  else
    # TODO: validar que retorna un array de strings
    return self.relaunching_function.call(n, self)
  end
end

#relaunching_dataset(n) ⇒ Object

returns an array of failed tasks for restarting.



533
534
535
536
537
538
539
540
541
542
543
544
# File 'lib/pampa.rb', line 533

def relaunching_dataset(n)
  q = "
    SELECT * 
    FROM #{self.table.to_s} 
    WHERE COALESCE(#{self.field_time.to_s}, '1900-01-01') < CAST('#{now}' AS TIMESTAMP) - INTERVAL '#{self.max_job_duration_minutes.to_i} minutes' 
    AND #{self.field_id.to_s} IS NOT NULL 
    AND #{self.field_end_time.to_s} IS NULL
    --AND COALESCE(#{self.field_times.to_s},0) < #{self.max_try_times.to_i}
    LIMIT #{n}
  "
  DB[q].all
end

#run_dispatch(worker) ⇒ Object

dispatch records returns the # of records dispatched



618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
# File 'lib/pampa.rb', line 618

def run_dispatch(worker)
  # get # of available slots
  n = self.available_slots(worker)
  
  # dispatching n pending records
  i = 0
  if n>0
    ids = self.selecting(n).map { |h| h[self.field_primary_key.to_sym] }

    i = ids.size

    if i>0
      q = "
        UPDATE #{self.table.to_s}
        SET 
          #{self.field_id.to_s} = '#{worker.id}', 
      "

      if !self.field_start_time.nil?
        q += "
        #{self.field_start_time.to_s} = NULL,
        "
      end

      if !self.field_end_time.nil?
        q += "
        #{self.field_end_time.to_s} = NULL,
        "                  
      end

      q += "
        #{self.field_time.to_s} = CAST('#{now}' AS TIMESTAMP)  
        WHERE #{self.field_primary_key.to_s} IN ('#{ids.join("','")}')
      "

      DB.execute(q)
    end # if i>0
  end # if n>0
  
  #      
  return i
end

#run_relaunchObject

relaunch records



602
603
604
605
606
607
608
609
610
611
612
613
614
# File 'lib/pampa.rb', line 602

def run_relaunch()
  # relaunch failed records
  self.relaunching.each { |o|
    if self.relauncher_function.nil?
      self.relaunch(o)
    else
      self.relauncher_function.call(o)
    end
    # release resources
    DB.disconnect
    GC.start
  }
end

#selecting(n) ⇒ Object

returns an array of available tasks for dispatching.



523
524
525
526
527
528
529
530
# File 'lib/pampa.rb', line 523

def selecting(n)
  if self.selecting_function.nil?
    return self.selecting_dataset(n)
  else
    # TODO: validar que retorna un array de strings
    return self.selecting_function.call(n, self)
  end
end

#selecting_dataset(n) ⇒ Object

returns an array of available tasks for dispatching.



515
516
517
518
519
520
# File 'lib/pampa.rb', line 515

def selecting_dataset(n)
  ds = DB[self.table.to_sym].where(self.field_id.to_sym => nil) 
  ds = ds.filter(self.field_end_time.to_sym => nil) if !self.field_end_time.nil?  
  ds = ds.filter(Sequel.function(:coalesce, self.field_times.to_sym, 0)=>self.max_try_times.times.to_a) if !self.field_times.nil? 
  ds.limit(n).all
end

#start(o) ⇒ Object



580
581
582
583
584
585
586
587
588
# File 'lib/pampa.rb', line 580

def start(o)
  if self.starter_function.nil?
    o[self.field_start_time.to_sym] = DB["SELECT CAST('#{now}' AS TIMESTAMP) AS dt"].first[:dt] if !self.field_start_time.nil? # IMPORTANT: use DB location to get current time.
    o[self.field_times.to_sym] = o[self.field_times.to_sym].to_i + 1
    self.update(o)
  else
    self.starter_function.call(o, self)
  end
end

#to_hashObject

return a hash descriptor of the job



397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/pampa.rb', line 397

def to_hash()
    {
        :name => self.name,
        :table => self.table,
        :field_primary_key => self.field_primary_key,
        :field_id => self.field_id,
        :field_time => self.field_time,
        :field_times => self.field_times,
        :field_start_time => self.field_start_time,
        :field_end_time => self.field_end_time,
        :field_success => self.field_success,
        :field_error_description => self.field_error_description,
        :queue_size => self.queue_size,
        :max_job_duration_minutes => self.max_job_duration_minutes,
        :max_try_times => self.max_try_times,

        # dispatching custom functions
        :occupied_function => self.occupied_function.to_s,
        :allowing_function => self.allowing_function.to_s,
        :selecting_function => self.selecting_function.to_s,
        :relaunching_function => self.relaunching_function.to_s,
        :relauncher_function => self.relauncher_function.to_s,
        :starter_function => self.starter_function.to_s,
        :finisher_function => self.finisher_function.to_s,
        :processing_function => self.processing_function.to_s,
        :max_pending_tasks => self.max_pending_tasks,
        :max_assigned_workers => self.max_assigned_workers,
        :filter_worker_id => self.filter_worker_id,

        # reporting custom functions
        :total_function => self.total_function.to_s,
        :completed_function => self.completed_function.to_s,
        :pending_function => self.pending_function.to_s,
        :failed_function => self.failed_function.to_s,
    }
end

#totalObject

reporting method: total reutrn the number of total tasks. if the numbr if total tasks is higher than ‘max_tasks_to_show` then it returns `max_tasks_to_show`+.



667
668
669
670
671
672
673
674
675
676
677
678
# File 'lib/pampa.rb', line 667

def total
  j = self
  if self.total_function.nil?
    q = "
        SELECT COUNT(*) AS n
        FROM #{j.table.to_s} 
    "
    return DB[q].first[:n].to_i
  else
    return self.total_function.call
  end
end

#update(o) ⇒ Object



556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
# File 'lib/pampa.rb', line 556

def update(o)
  q = "
    UPDATE #{self.table.to_s}
    SET
      #{self.field_id.to_s} = #{o[self.field_id.to_sym].nil? ? 'NULL' : "'#{o[self.field_id.to_sym]}'"},
      #{self.field_time} = #{o[self.field_time.to_sym].nil? ? 'NULL' : "'#{o[self.field_time.to_sym].to_s}'"},
      #{self.field_times} = #{o[self.field_times.to_sym].to_i},
      #{self.field_start_time} = #{o[self.field_start_time.to_sym].nil? ? 'NULL' : "'#{o[self.field_start_time.to_sym].to_s}'"}, 
      #{self.field_end_time} = #{o[self.field_end_time.to_sym].nil? ? 'NULL' : "'#{o[self.field_end_time.to_sym].to_s}'"},
      #{self.field_success} = #{o[self.field_success.to_sym].nil? ? 'NULL' : o[self.field_success.to_sym].to_s},
      #{self.field_error_description} = #{o[self.field_error_description.to_sym].nil? ? 'NULL' : "'#{o[self.field_error_description.to_sym].to_sql}'"}
    WHERE #{self.field_primary_key} = '#{o[self.field_primary_key.to_sym]}'
  "
  DB.execute(q)
end