Class: Philotic::Subscriber
- Inherits:
-
Object
- Object
- Philotic::Subscriber
- Defined in:
- lib/philotic/subscriber.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
Instance Method Summary collapse
- #acknowledge(message, up_to_and_including = false) ⇒ Object
- #config ⇒ Object
- #endure ⇒ Object
- #get_subscription_settings(subscription, subscribe_options) ⇒ Object
-
#initialize(connection) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #initialize_queue(subscription_settings) ⇒ Object
- #logger ⇒ Object
- #reject(message, requeue = true) ⇒ Object
- #subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block) ⇒ Object
- #subscribe_to_any(options = {}) ⇒ Object
- #subscription_callback(&block) ⇒ Object
Constructor Details
#initialize(connection) ⇒ Subscriber
Returns a new instance of Subscriber.
9 10 11 |
# File 'lib/philotic/subscriber.rb', line 9 def initialize(connection) @connection = connection end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
7 8 9 |
# File 'lib/philotic/subscriber.rb', line 7 def connection @connection end |
Instance Method Details
#acknowledge(message, up_to_and_including = false) ⇒ Object
78 79 80 |
# File 'lib/philotic/subscriber.rb', line 78 def acknowledge(, up_to_and_including=false) connection.channel.acknowledge(.delivery_tag, up_to_and_including) end |
#config ⇒ Object
17 18 19 |
# File 'lib/philotic/subscriber.rb', line 17 def config connection.config end |
#endure ⇒ Object
92 93 94 95 96 |
# File 'lib/philotic/subscriber.rb', line 92 def endure while true Thread.pass end end |
#get_subscription_settings(subscription, subscribe_options) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/philotic/subscriber.rb', line 51 def get_subscription_settings(subscription, ) if [Symbol, String].include? subscription.class queue_name = subscription subscription = = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS else queue_name = subscription[:queue_name] || '' = Philotic::DEFAULT_ANONYMOUS_QUEUE_OPTIONS = .merge(subscription[:subscribe_options]) if subscription[:subscribe_options] arguments = subscription[:arguments] || subscription arguments['x-match'] ||= 'all' end .merge!(subscription[:queue_options] || {}) [:auto_delete] ||= true if queue_name == '' { queue_name: queue_name, queue_options: , arguments: arguments, subscribe_options: , } end |
#initialize_queue(subscription_settings) ⇒ Object
44 45 46 47 48 49 |
# File 'lib/philotic/subscriber.rb', line 44 def initialize_queue(subscription_settings) queue = connection.channel.queue(subscription_settings[:queue_name], subscription_settings[:queue_options]) queue.bind(connection.exchange, arguments: subscription_settings[:arguments]) if subscription_settings[:arguments] queue end |
#logger ⇒ Object
13 14 15 |
# File 'lib/philotic/subscriber.rb', line 13 def logger connection.logger end |
#reject(message, requeue = true) ⇒ Object
82 83 84 |
# File 'lib/philotic/subscriber.rb', line 82 def reject(, requeue=true) connection.channel.reject(.delivery_tag, requeue) end |
#subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/philotic/subscriber.rb', line 32 def subscribe(subscription = {}, = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block) connection.connect! connection.channel.prefetch(connection.config.prefetch_count) subscription_settings = get_subscription_settings subscription, queue = initialize_queue(subscription_settings) queue.subscribe(subscription_settings[:subscribe_options], &subscription_callback(&block)) end |
#subscribe_to_any(options = {}) ⇒ Object
86 87 88 89 90 |
# File 'lib/philotic/subscriber.rb', line 86 def subscribe_to_any( = {}) if block_given? subscribe(.merge(:'x-match' => :any), &Proc.new) end end |
#subscription_callback(&block) ⇒ Object
21 22 23 24 25 26 27 28 29 30 |
# File 'lib/philotic/subscriber.rb', line 21 def subscription_callback(&block) lambda do |delivery_info, , payload| hash_payload = JSON.parse payload = Philotic::Message.new([:headers], hash_payload) .delivery_info = delivery_info instance_exec(, &block) end end |