Class: RailsPipeline::IronmqPullingSubscriber

Inherits:
Object
  • Object
show all
Includes:
Subscriber
Defined in:
lib/rails-pipeline/ironmq_pulling_subscriber.rb

Constant Summary

Constants included from Subscriber

Subscriber::Error, Subscriber::NoApiKeyError, Subscriber::WrongApiKeyError

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Subscriber

included, register, registered_handlers, target_class, target_handler

Constructor Details

#initialize(queue_name) ⇒ IronmqPullingSubscriber

Returns a new instance of IronmqPullingSubscriber.



10
11
12
13
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 10

def initialize(queue_name)
    @queue_name  = queue_name
    @subscription_status = false
end

Instance Attribute Details

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



8
9
10
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 8

def queue_name
  @queue_name
end

Instance Method Details

#activate_subscriptionObject



54
55
56
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 54

def activate_subscription
    @subscription_status = true
end

#active_subscription?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 50

def active_subscription?
    @subscription_status
end

#deactivate_subscriptionObject



58
59
60
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 58

def deactivate_subscription
    @subscription_status = false
end

#process_envelope(envelope, message, block) ⇒ Object



62
63
64
65
66
67
68
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 62

def process_envelope(envelope, message, block)
    callback_status = block.call(envelope)

    if callback_status
        message.delete
    end
end

#process_message(message, halt_on_error, block) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 29

def process_message(message, halt_on_error, block)
    begin
        if message.nil? || JSON.parse(message.body).empty?
            deactivate_subscription
        else
            payload = parse_ironmq_payload(message.body)
            envelope = generate_envelope(payload)

            process_envelope(envelope, message, block)
        end
    rescue Exception => e
        if halt_on_error
            deactivate_subscription
        end

        RailsPipeline.logger.error "A message was unable to be processed as was not removed from the queue."
        RailsPipeline.logger.error "The message: #{message.inspect}"
        raise e
    end
end

#pull_message(wait_time) {|queue.get(:wait => wait_time)| ... } ⇒ Object

the wait time on this may need to be changed haven’t seen rate limit info on these calls but didnt look all that hard either.

Yields:

  • (queue.get(:wait => wait_time))


74
75
76
77
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 74

def pull_message(wait_time)
    queue = _iron.queue(queue_name)
    yield queue.get(:wait => wait_time)
end

#start_subscription(params = {wait_time: 2, halt_on_error: true}, &block) ⇒ Object

Valid Parameters at this time are wait_time - An integer indicating how long in seconds we should long poll on empty queues halt_on_error - A boolean indicating if we should stop our queue subscription if an error occurs



18
19
20
21
22
23
24
25
26
# File 'lib/rails-pipeline/ironmq_pulling_subscriber.rb', line 18

def start_subscription(params={wait_time: 2, halt_on_error: true}, &block)
    activate_subscription

    while active_subscription?
        pull_message(params[:wait_time]) do |message|
            process_message(message, params[:halt_on_error], block)
        end
    end
end