Class: Arsenicum::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/arsenicum/queue.rb

Direct Known Subclasses

Sqs

Defined Under Namespace

Classes: Sqs

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
12
13
# File 'lib/arsenicum/queue.rb', line 8

def initialize(name, options)
  @name         = name
  @worker_count = options.delete(:worker_count)
  @router       = build_router options.delete(:router_class)
  @broker       = Arsenicum::Core::Broker.new worker_count: worker_count, router: router
end

Instance Attribute Details

#brokerObject (readonly)

Returns the value of attribute broker.



6
7
8
# File 'lib/arsenicum/queue.rb', line 6

def broker
  @broker
end

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/arsenicum/queue.rb', line 5

def name
  @name
end

#routerObject (readonly)

Returns the value of attribute router.



5
6
7
# File 'lib/arsenicum/queue.rb', line 5

def router
  @router
end

#worker_countObject (readonly)

Returns the value of attribute worker_count.



5
6
7
# File 'lib/arsenicum/queue.rb', line 5

def worker_count
  @worker_count
end

Instance Method Details

#handle_failure(e, original_message) ⇒ Object



50
51
52
# File 'lib/arsenicum/queue.rb', line 50

def handle_failure(e, original_message)
  #TODO implement correctly in your derived classes.
end

#handle_success(original_message) ⇒ Object



46
47
48
# File 'lib/arsenicum/queue.rb', line 46

def handle_success(original_message)
  #TODO implement correctly in your derived classes.
end

#register(task) ⇒ Object



42
43
44
# File 'lib/arsenicum/queue.rb', line 42

def register(task)
  broker[task.id] = task
end

#startObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/arsenicum/queue.rb', line 15

def start
  Arsenicum::Logger.info{"[queue]Queue #{name} is now starting"}
  broker.run
  Arsenicum::Logger.info{"[queue]Queue #{name} start-up completed"}

  loop do
    begin
      (message, original_message) = pick
    rescue => e
      handle_failure e, original_message
      next
    end

    unless message
      sleep(0.5)
      next
    end

    Arsenicum::Logger.info{"Queue picked. message: #{message.inspect}"}
    broker.delegate message, -> { handle_success(original_message) }, -> e { handle_failure(e, original_message) }
  end
end

#start_asyncObject



54
55
56
# File 'lib/arsenicum/queue.rb', line 54

def start_async
  Thread.new{start}
end

#stopObject



38
39
40
# File 'lib/arsenicum/queue.rb', line 38

def stop
  broker.stop
end