Class: GBDispatch::Queue

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/gb_dispatch/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, thread_pool) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • name (String)

    queue name, should be the same as is register in Celluloid

  • thread_pool (Celluloid::Pool)

    pool of runners for executing code.



11
12
13
14
# File 'lib/gb_dispatch/queue.rb', line 11

def initialize(name, thread_pool)
  @name = name
  @thread_pool = thread_pool
end

Instance Attribute Details

#nameString (readonly)

Returns queue name.

Returns:

  • (String)

    queue name



7
8
9
# File 'lib/gb_dispatch/queue.rb', line 7

def name
  @name
end

Instance Method Details

#perform(block = nil) { ... } ⇒ Object, Exception

Perform given block

If used with rails it will wrap block with connection pool.

Parameters:

  • block (Proc) (defaults to: nil)

Yields:

  • if there is no block given it yield without param.

Returns:

  • (Object, Exception)

    returns value of executed block or exception if block execution failed.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/gb_dispatch/queue.rb', line 22

def perform(block=nil)
  Thread.current[:name] ||= name
  if defined?(Rails) && defined?(ActiveRecord::Base)
    thread_block = ->() do
      begin
        ActiveRecord::Base.connection_pool.with_connection do
          block ? block.call : yield
        end
      ensure
        ActiveRecord::Base.clear_active_connections!
      end
    end
  else
    thread_block = block ? block : ->() { yield }
  end
  exclusive do
    begin
      @thread_pool.execute thread_block, name: name
    rescue Exception => e
      return e
    end
  end
end

#perform_after(time, block = nil) { ... } ⇒ Object

Perform block after given period

Parameters:

  • time (Fixnum)
  • block (Proc) (defaults to: nil)

Yields:

  • if there is no block given it yield without param.



50
51
52
53
54
55
# File 'lib/gb_dispatch/queue.rb', line 50

def perform_after(time, block=nil)
  after(time) do
    block = ->(){ yield } unless block
    self.async.perform block
  end
end