Class: Philotic::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/philotic/subscriber.rb

Direct Known Subclasses

Consumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Subscriber

Returns a new instance of Subscriber.



9
10
11
# File 'lib/philotic/subscriber.rb', line 9

def initialize(connection)
  @connection = connection
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



7
8
9
# File 'lib/philotic/subscriber.rb', line 7

def connection
  @connection
end

Instance Method Details

#acknowledge(message, up_to_and_including = false) ⇒ Object



78
79
80
# File 'lib/philotic/subscriber.rb', line 78

def acknowledge(message, up_to_and_including=false)
  connection.channel.acknowledge(message.delivery_tag, up_to_and_including)
end

#configObject



17
18
19
# File 'lib/philotic/subscriber.rb', line 17

def config
  connection.config
end

#endureObject



92
93
94
95
96
# File 'lib/philotic/subscriber.rb', line 92

def endure
  while true
    Thread.pass
  end
end

#get_subscription_settings(subscription, subscribe_options) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/philotic/subscriber.rb', line 51

def get_subscription_settings(subscription, subscribe_options)

  if [Symbol, String].include? subscription.class
    queue_name    = subscription
    subscription  = subscribe_options
    queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS

  else
    queue_name           = subscription[:queue_name] || ''
    queue_options        = Philotic::DEFAULT_ANONYMOUS_QUEUE_OPTIONS
    subscribe_options    = subscribe_options.merge(subscription[:subscribe_options]) if subscription[:subscribe_options]
    arguments            = subscription[:arguments] || subscription
    arguments['x-match'] ||= 'all'
  end

  queue_options.merge!(subscription[:queue_options] || {})

  queue_options[:auto_delete] ||= true if queue_name == ''

  {
      queue_name:        queue_name,
      queue_options:     queue_options,
      arguments:         arguments,
      subscribe_options: subscribe_options,
  }
end

#initialize_queue(subscription_settings) ⇒ Object



44
45
46
47
48
49
# File 'lib/philotic/subscriber.rb', line 44

def initialize_queue(subscription_settings)
  queue = connection.channel.queue(subscription_settings[:queue_name], subscription_settings[:queue_options])

  queue.bind(connection.exchange, arguments: subscription_settings[:arguments]) if subscription_settings[:arguments]
  queue
end

#loggerObject



13
14
15
# File 'lib/philotic/subscriber.rb', line 13

def logger
  connection.logger
end

#reject(message, requeue = true) ⇒ Object



82
83
84
# File 'lib/philotic/subscriber.rb', line 82

def reject(message, requeue=true)
  connection.channel.reject(message.delivery_tag, requeue)
end

#subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/philotic/subscriber.rb', line 32

def subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block)
  connection.connect!
  connection.channel.prefetch(connection.config.prefetch_count)

  subscription_settings = get_subscription_settings subscription, subscribe_options

  queue = initialize_queue(subscription_settings)

  queue.subscribe(subscription_settings[:subscribe_options], &subscription_callback(&block))

end

#subscribe_to_any(options = {}) ⇒ Object



86
87
88
89
90
# File 'lib/philotic/subscriber.rb', line 86

def subscribe_to_any(options = {})
  if block_given?
    subscribe(options.merge(:'x-match' => :any), &Proc.new)
  end
end

#subscription_callback(&block) ⇒ Object



21
22
23
24
25
26
27
28
29
30
# File 'lib/philotic/subscriber.rb', line 21

def subscription_callback(&block)
  lambda do |delivery_info, , payload|
    hash_payload = JSON.parse payload

    message = Philotic::Message.new([:headers], hash_payload)
    message.delivery_info = delivery_info

    instance_exec(message, &block)
  end
end