Class: Philotic::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/philotic/subscriber.rb

Direct Known Subclasses

Consumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Subscriber

Returns a new instance of Subscriber.



10
11
12
# File 'lib/philotic/subscriber.rb', line 10

def initialize(connection)
  @connection = connection
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



8
9
10
# File 'lib/philotic/subscriber.rb', line 8

def connection
  @connection
end

Instance Method Details

#acknowledge(message, up_to_and_including = false) ⇒ Object



79
80
81
# File 'lib/philotic/subscriber.rb', line 79

def acknowledge(message, up_to_and_including=false)
  connection.channel.acknowledge(message.delivery_tag, up_to_and_including)
end

#configObject



18
19
20
# File 'lib/philotic/subscriber.rb', line 18

def config
  connection.config
end

#endureObject



93
94
95
# File 'lib/philotic/subscriber.rb', line 93

def endure
  Thread.stop
end

#get_subscription_settings(subscription, subscribe_options) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/philotic/subscriber.rb', line 52

def get_subscription_settings(subscription, subscribe_options)

  if [Symbol, String].include? subscription.class
    queue_name    = subscription
    subscription  = subscribe_options
    queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS

  else
    queue_name           = subscription[:queue_name] || ''
    queue_options        = Philotic::DEFAULT_ANONYMOUS_QUEUE_OPTIONS
    subscribe_options    = subscribe_options.merge(subscription[:subscribe_options]) if subscription[:subscribe_options]
    arguments            = subscription[:arguments] || subscription
    arguments['x-match'] ||= 'all'
  end

  queue_options.merge!(subscription[:queue_options] || {})

  queue_options[:auto_delete] ||= true if queue_name == ''

  {
    queue_name:        queue_name,
    queue_options:     queue_options,
    arguments:         arguments,
    subscribe_options: subscribe_options,
  }
end

#initialize_queue(subscription_settings) ⇒ Object



45
46
47
48
49
50
# File 'lib/philotic/subscriber.rb', line 45

def initialize_queue(subscription_settings)
  queue = connection.channel.queue(subscription_settings[:queue_name], subscription_settings[:queue_options])

  queue.bind(connection.exchange, arguments: subscription_settings[:arguments]) if subscription_settings[:arguments]
  queue
end

#loggerObject



14
15
16
# File 'lib/philotic/subscriber.rb', line 14

def logger
  connection.logger
end

#reject(message, requeue = true) ⇒ Object



83
84
85
# File 'lib/philotic/subscriber.rb', line 83

def reject(message, requeue=true)
  connection.channel.reject(message.delivery_tag, requeue)
end

#subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/philotic/subscriber.rb', line 33

def subscribe(subscription = {}, subscribe_options = Philotic::DEFAULT_SUBSCRIBE_OPTIONS, &block)
  connection.connect!
  connection.channel.prefetch(connection.config.prefetch_count)

  subscription_settings = get_subscription_settings subscription, subscribe_options

  queue = initialize_queue(subscription_settings)

  queue.subscribe(subscription_settings[:subscribe_options], &subscription_callback(&block))

end

#subscribe_to_any(options = {}) ⇒ Object



87
88
89
90
91
# File 'lib/philotic/subscriber.rb', line 87

def subscribe_to_any(options = {})
  if block_given?
    subscribe(options.merge(:'x-match' => :any), &Proc.new)
  end
end

#subscription_callback(&block) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/philotic/subscriber.rb', line 22

def subscription_callback(&block)
  lambda do |delivery_info, , payload|
    hash_payload = Philotic::Serialization::Serializer.load(payload, )

    message               = Class.new(Philotic::Message).new([:headers], hash_payload)
    message.delivery_info = delivery_info

    instance_exec(message, &block)
  end
end