Class: Ons::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/ons/consumer.rb

Overview

the ONS Consumer

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(access_key, secret_key, consumer_id, options = {}) ⇒ Consumer

Create a new aliyun ONS Consumer instance.

Parameters:

  • access_key (String)

    the access key to aliyun ONS

  • secret_key (String)

    the secret key to aliyun ONS

  • consumer_id (String)

    the consumer ID

  • options (Hash{String, Symbol => String}) (defaults to: {})

Options Hash (options):

  • :namesrv_addr (String)

    the nameserver used to fetching ons_addr

  • :ons_addr (String)

    the ONS server address

  • :thread_num (String, Fixnum, Bignum)

    the consumer thread numbers



20
21
22
23
24
25
26
27
28
# File 'lib/ons/consumer.rb', line 20

def initialize(access_key, secret_key, consumer_id, options = {})
  @consumer = Internal::Consumer.new(access_key, secret_key, consumer_id, options)

  # register instance
  self.class.instances << self

  # start the global LMFAO event thread, so it could consume messages properly
  LMFAO.start_event_thread
end

Class Method Details

.instances<Consumer>

Get all the Consumer instances.

Returns:

  • (<Consumer>)

    all the Consumer instances



7
8
9
# File 'lib/ons/consumer.rb', line 7

def self.instances
  @instances ||= []
end

Instance Method Details

#shutdownvoid

Note:

this method should be called before program exit, otherwise it would case a memory leak.

This method returns an undefined value.

Shutdown the Consumer instance.

Please see also Ons.register_cleanup_hooks if you want call it automatically.



67
68
69
# File 'lib/ons/consumer.rb', line 67

def shutdown
  @consumer.shutdown
end

#startself

Note:

this method should be called after subscribe.

Note:

thie method will not block the thread, please see also Ons.loop_forever.

Start the Consumer instance.

Returns:

  • (self)

    returns itself



56
57
58
59
# File 'lib/ons/consumer.rb', line 56

def start
  @consumer.start
  self
end

#subscribe(topic, expression, handler = nil) ⇒ self

Subsribe a topic.

Examples:

subscribe tag :tagA under topic :TopicTestMQ

consumer.subscribe('TopicTestMQ', 'tagA') {}

subscribe tag :tagA and :tagB under topic :TopicTestMQ

consumer.subscribe('TopicTestMQ', 'tagA || tagB') {}

subscribe all tags under topic :TopicTestMQ

consumer.subscribe('TopicTestMQ', '*') {}

Parameters:

  • topic (String)

    the message topic

  • expression (String)

    the subsribe expression used to filter messages

  • handler (#call) (defaults to: nil)

    the handler which will handle the message

Returns:

  • (self)

    returns itself



45
46
47
48
# File 'lib/ons/consumer.rb', line 45

def subscribe(topic, expression, handler = nil)
  @consumer.subscribe(topic, expression, handler || Proc.new)
  self
end