Class: GBDispatch::Queue

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async
Defined in:
lib/gb_dispatch/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • name (String)

    queue name, should be the same as is register in Celluloid



10
11
12
13
# File 'lib/gb_dispatch/queue.rb', line 10

def initialize(name)
  super()
  @name = name
end

Instance Attribute Details

#nameString (readonly)

Returns queue name.

Returns:

  • (String)

    queue name



7
8
9
# File 'lib/gb_dispatch/queue.rb', line 7

def name
  @name
end

Instance Method Details

#perform_after(time, block = nil) { ... } ⇒ Concurrent::ScheduledTask

Perform block after given period

Parameters:

  • time (Fixnum)
  • block (Proc) (defaults to: nil)

Yields:

  • if there is no block given it yield without param.

Returns:

  • (Concurrent::ScheduledTask)


57
58
59
60
61
62
63
64
# File 'lib/gb_dispatch/queue.rb', line 57

def perform_after(time, block=nil)
  task = Concurrent::ScheduledTask.new(time) do
    block = ->(){ yield } unless block
    self.async.perform_now block
  end
  task.execute
  task
end

#perform_now(block = nil) { ... } ⇒ Object, Exception

Perform given block

If used with rails it will wrap block with connection pool.

Parameters:

  • block (Proc) (defaults to: nil)

Yields:

  • if there is no block given it yield without param.

Returns:

  • (Object, Exception)

    returns value of executed block or exception if block execution failed.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/gb_dispatch/queue.rb', line 21

def perform_now(block=nil)
  Thread.current[:name] ||= name
  if defined?(Rails) && defined?(ActiveRecord::Base)
    require 'gb_dispatch/active_record_patch'
    thread_block = ->() do
      if Rails::VERSION::MAJOR < 5
        begin
          ActiveRecord::Base.connection_pool.force_new_connection do
            block ? block.call : yield
          end
        ensure
          ActiveRecord::Base.clear_active_connections!
        end
      else
        Rails.application.executor.wrap do
          ActiveRecord::Base.connection_pool.force_new_connection do
            block ? block.call : yield
          end
        end
      end
    end
  else
    thread_block = block ? block : ->() { yield }
  end
  begin
    Runner.execute thread_block, name: name
  rescue Exception => e
    return e
  end
end

#to_sObject



66
67
68
# File 'lib/gb_dispatch/queue.rb', line 66

def to_s
  self.name.to_s
end