Class: Client::PushConsumer

Inherits:
Object
  • Object
show all
Includes:
Rocketmq::C
Defined in:
lib/rocketmq-client-ruby/client/push_consumer.rb

Overview

PushConsumer

Constant Summary

Constants included from Rocketmq::C

Rocketmq::C::ConsumeStatus, Rocketmq::C::MessageModel, Rocketmq::C::MessageProperty, Rocketmq::C::SendStatus, Rocketmq::C::Status, Rocketmq::C::TransactionStatus

Instance Method Summary collapse

Methods included from Rocketmq::C

attach_function_maybe

Constructor Details

#initialize(group_id, orderly: false, message_model: MessageModel[:clustering]) ⇒ PushConsumer

Returns a new instance of PushConsumer.

Raises:

  • (StandardError)


8
9
10
11
12
13
14
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 8

def initialize(group_id, orderly: false, message_model: MessageModel[:clustering])
  @orderly = orderly
  @push_consumer = CreatePushConsumer(group_id)
  @callback_refs = []
  raise StandardError.new('Returned null pointer when create Producer') unless @push_consumer
  set_message_model(message_model)
end

Instance Method Details

#get_groupObject



54
55
56
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 54

def get_group
  GetPushConsumerGroupID(@push_consumer)
end

#set_group(group_id) ⇒ Object



58
59
60
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 58

def set_group(group_id)
  SetPushConsumerGroupID(@push_consumer, group_id)
end

#set_instance_name(name) ⇒ Object



70
71
72
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 70

def set_instance_name(name)
  SetPushConsumerInstanceName(@push_consumer, name)
end

#set_message_batch_max_size(max_size) ⇒ Object



66
67
68
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 66

def set_message_batch_max_size(max_size)
  SetPushConsumerMessageBatchMaxSize(@push_consumer, max_size)
end

#set_message_model(model) ⇒ Object



16
17
18
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 16

def set_message_model(model)
  SetPushConsumerMessageModel(@push_consumer, model)
end

#set_name_server_address(addr) ⇒ Object



20
21
22
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 20

def set_name_server_address(addr)
  SetPushConsumerNameServerAddress(@push_consumer, addr)
end

#set_name_server_domain(domain) ⇒ Object



24
25
26
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 24

def set_name_server_domain(domain)
  SetPushConsumerNameServerDomain(@push_consumer, domain)
end

#set_session_credentials(access_key, access_secret, channel) ⇒ Object



28
29
30
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 28

def set_session_credentials(access_key, access_secret, channel)
  SetPushConsumerSessionCredentials(@push_consumer, access_key, access_secret, channel)
end

#set_thread_count(thread_count) ⇒ Object



62
63
64
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 62

def set_thread_count(thread_count)
  SetPushConsumerThreadCount(@push_consumer, thread_count)
end

#shutdownObject



78
79
80
81
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 78

def shutdown
  unregister_callback(@push_consumer) if @callback_refs.length.positive?
  ShutdownPushConsumer(@push_consumer)
end

#startObject



74
75
76
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 74

def start
  StartPushConsumer(@push_consumer)
end

#subscribe(topic, callback, expression: '*') ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 32

def subscribe(topic, callback, expression: '*')
  on_message =
    FFI::Function.new(:int, i[pointer pointer]) do |_, msg|
    exc = nil
    begin
      consume_result = callback.call(ReceivedMessage.new(msg))
      if consume_result != ConsumeStatus[:consume_success] &&
         consume_result != ConsumeStatus[:reconsume_later]
        raise StandardError.new('Consume status error, please use enum \'ConsumeStatus\' as response')
      end
      return consume_result
    rescue => ex
      exc = ex
      return ConsumeStatus[:reconsume_later]
    ensure
      raise exc if exc
    end
  end
  Subscribe(@push_consumer, topic, expression)
  register_callback(on_message)
end