Class: ActiveJob::GoogleCloudPubsub::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/activejob-google_cloud_pubsub/worker.rb

Constant Summary collapse

MAX_DEADLINE =
10.minutes

Instance Method Summary collapse

Constructor Details

#initialize(queue: 'default', min_threads: 0, max_threads: Concurrent.processor_count, pubsub: Google::Cloud::Pubsub.new, logger: Logger.new($stdout)) ⇒ Worker

Returns a new instance of Worker.



16
17
18
19
20
21
22
# File 'lib/activejob-google_cloud_pubsub/worker.rb', line 16

def initialize(queue: 'default', min_threads: 0, max_threads: Concurrent.processor_count, pubsub: Google::Cloud::Pubsub.new, logger: Logger.new($stdout))
  @queue_name  = queue
  @min_threads = min_threads
  @max_threads = max_threads
  @pubsub      = pubsub
  @logger      = logger
end

Instance Method Details

#ensure_subscriptionObject



48
49
50
51
52
# File 'lib/activejob-google_cloud_pubsub/worker.rb', line 48

def ensure_subscription
  @pubsub.subscription_for @queue_name

  nil
end

#runObject



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/activejob-google_cloud_pubsub/worker.rb', line 24

def run
  pool = Concurrent::ThreadPoolExecutor.new(min_threads: @min_threads, max_threads: @max_threads, max_queue: -1)

  @pubsub.subscription_for(@queue_name).listen do |message|
    @logger&.info "Message(#{message.message_id}) was received."

    begin
      Concurrent::Promise.execute(args: message, executor: pool) {|msg|
        process msg
      }.rescue {|e|
        @logger&.error e
      }
    rescue Concurrent::RejectedExecutionError
      Concurrent::Promise.execute(args: message) {|msg|
        msg.delay! 10.seconds.to_i

        @logger&.info "Message(#{msg.message_id}) was rescheduled after 10 seconds because the thread pool is full."
      }.rescue {|e|
        @logger&.error e
      }
    end
  end
end