Class: Client::PushConsumer
- Inherits:
-
Object
- Object
- Client::PushConsumer
show all
- Includes:
- Rocketmq::C
- Defined in:
- lib/rocketmq-client-ruby/client/push_consumer.rb
Overview
Constant Summary
Constants included
from Rocketmq::C
Rocketmq::C::ConsumeStatus, Rocketmq::C::MessageModel, Rocketmq::C::MessageProperty, Rocketmq::C::SendStatus, Rocketmq::C::Status, Rocketmq::C::TransactionStatus
Instance Method Summary
collapse
attach_function_maybe
Constructor Details
#initialize(group_id, orderly: false, message_model: MessageModel[:clustering]) ⇒ PushConsumer
Returns a new instance of PushConsumer.
8
9
10
11
12
13
14
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 8
def initialize(group_id, orderly: false, message_model: MessageModel[:clustering])
@orderly = orderly
@push_consumer = CreatePushConsumer(group_id)
@callback_refs = []
raise StandardError.new('Returned null pointer when create Producer') unless @push_consumer
set_message_model(message_model)
end
|
Instance Method Details
#get_group ⇒ Object
54
55
56
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 54
def get_group
GetPushConsumerGroupID(@push_consumer)
end
|
#set_group(group_id) ⇒ Object
58
59
60
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 58
def set_group(group_id)
SetPushConsumerGroupID(@push_consumer, group_id)
end
|
#set_instance_name(name) ⇒ Object
70
71
72
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 70
def set_instance_name(name)
SetPushConsumerInstanceName(@push_consumer, name)
end
|
#set_message_batch_max_size(max_size) ⇒ Object
66
67
68
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 66
def set_message_batch_max_size(max_size)
SetPushConsumerMessageBatchMaxSize(@push_consumer, max_size)
end
|
#set_message_model(model) ⇒ Object
16
17
18
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 16
def set_message_model(model)
SetPushConsumerMessageModel(@push_consumer, model)
end
|
#set_name_server_address(addr) ⇒ Object
20
21
22
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 20
def set_name_server_address(addr)
SetPushConsumerNameServerAddress(@push_consumer, addr)
end
|
#set_name_server_domain(domain) ⇒ Object
24
25
26
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 24
def set_name_server_domain(domain)
SetPushConsumerNameServerDomain(@push_consumer, domain)
end
|
#set_session_credentials(access_key, access_secret, channel) ⇒ Object
28
29
30
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 28
def set_session_credentials(access_key, access_secret, channel)
SetPushConsumerSessionCredentials(@push_consumer, access_key, access_secret, channel)
end
|
#set_thread_count(thread_count) ⇒ Object
62
63
64
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 62
def set_thread_count(thread_count)
SetPushConsumerThreadCount(@push_consumer, thread_count)
end
|
#shutdown ⇒ Object
78
79
80
81
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 78
def shutdown
unregister_callback(@push_consumer) if @callback_refs.length.positive?
ShutdownPushConsumer(@push_consumer)
end
|
#start ⇒ Object
74
75
76
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 74
def start
StartPushConsumer(@push_consumer)
end
|
#subscribe(topic, callback, expression: '*') ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/rocketmq-client-ruby/client/push_consumer.rb', line 32
def subscribe(topic, callback, expression: '*')
on_message =
FFI::Function.new(:int, i[pointer pointer]) do |_, msg|
exc = nil
begin
consume_result = callback.call(ReceivedMessage.new(msg))
if consume_result != ConsumeStatus[:consume_success] &&
consume_result != ConsumeStatus[:reconsume_later]
raise StandardError.new('Consume status error, please use enum \'ConsumeStatus\' as response')
end
return consume_result
rescue => ex
exc = ex
return ConsumeStatus[:reconsume_later]
ensure
raise exc if exc
end
end
Subscribe(@push_consumer, topic, expression)
register_callback(on_message)
end
|