Module: RobotMaster::Queue

Defined in:
lib/robot-master/queue.rb

Overview

Manages a workflow to enqueue jobs into a priority queue

Class Method Summary collapse

Class Method Details

.enqueue(step, druid, priority, opts = {}) ⇒ Hash

Adds the given item to the priority queue for this step

Job names for the given step are converted like so:

  • `dor:assemblyWF:jp2-create` into `Robots::DorRepo::Assembly::Jp2Create`

  • `dor:etdSubmitWF:binder-transfer` into `Robots:DorRepo::EtdSubmit::BinderTransfer`

Parameters:

  • step (String)

    fully qualified name

  • druid (String)
  • priority (Symbol)

    see `priority_class`

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :repo_suffix (String)

    suffix to append to the Repo component of the step name

Returns:

  • (Hash)

    returns the `:queue` name and `klass` name enqueued


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/robot-master/queue.rb', line 50

def self.enqueue(step, druid, priority, opts = {})
  Workflow.assert_qualified(step)
  
  # generate the specific priority queue name
  queue = queue_name(step, priority)
  
  # generate the robot job class name
  opts[:repo_suffix] ||= 'Repo'
  r, w, s = Workflow.parse_qualified(step)
  klass = [
    'Robots',
    r.camelcase + opts[:repo_suffix], # 'Dor' conflicts with dor-services
    w.sub('WF', '').camelcase,
    s.sub('-', '_').camelcase
  ].join('::')
  
  # perform the enqueue to Resque
  ROBOT_LOG.debug { "enqueue_to: #{queue} #{klass} #{druid}" }
  Resque.enqueue_to(queue.to_sym, klass, druid)
  
  { :queue => queue, :klass => klass }
end

.needs_work?(step, priority = :default, threshold = 100) ⇒ Boolean

Returns true if the queue for the step is “empty”

Parameters:

  • step (String)

    a fully qualified name

  • priority (Symbol, Integer) (defaults to: :default)
  • threshold (Integer) (defaults to: 100)

    The number of items below which the queue is considered empty

Returns:

  • (Boolean)

    true if the queue for the step is “empty”


29
30
31
32
33
34
35
# File 'lib/robot-master/queue.rb', line 29

def self.needs_work?(step, priority = :default, threshold = 100)
  Workflow.assert_qualified(step)
  queue = queue_name(step, priority)
  n = Resque.size(queue)
  ROBOT_LOG.debug { "queue size=#{n} #{queue}"}
  (n < threshold)
end

.queue_name(step, priority = :default) ⇒ String

Generate the queue name from step and priority

Examples:

queue_name('dor:assemblyWF:jp2-create')
=> 'dor_assemblyWF_jp2-create_default'
queue_name('dor:assemblyWF:jp2-create', 100)
=> 'dor_assemblyWF_jp2-create_high'

Parameters:

  • step (String)

    fully qualified name

  • priority (Symbol | Integer) (defaults to: :default)

Returns:

  • (String)

    the queue name


14
15
16
17
18
19
20
21
22
23
# File 'lib/robot-master/queue.rb', line 14

def self.queue_name(step, priority = :default)
  Workflow.assert_qualified(step)
  unless priority.is_a?(Integer) or Priority::PRIORITIES.include?(priority)
    raise ArgumentError, "Unknown priority: #{priority}"
  end
  [ 
    Workflow.parse_qualified(step),
    priority.is_a?(Integer) ? Priority.priority_class(priority) : priority
  ].flatten.join('_')
end