Class: Client::TransactionMQProducer
- Includes:
- Rocketmq::C
- Defined in:
- lib/rocketmq-client-ruby/client/transaction_mq_producer.rb
Overview
TransactionMQProducer module
Constant Summary
Constants included from Rocketmq::C
Rocketmq::C::ConsumeStatus, Rocketmq::C::MessageModel, Rocketmq::C::MessageProperty, Rocketmq::C::SendStatus, Rocketmq::C::Status, Rocketmq::C::TransactionStatus
Instance Method Summary collapse
-
#initialize(group_id, checker_callback, user_args: nil, timeout: nil, compress_level: nil, max_message_size: nil) ⇒ TransactionMQProducer
constructor
A new instance of TransactionMQProducer.
- #send_message_in_transaction(message, local_execute, user_args: nil) ⇒ Object
- #set_name_server_address(addr) ⇒ Object
Methods included from Rocketmq::C
Methods inherited from Producer
#destroy, #send_oneway, #send_orderly_with_sharding_key, #send_sync, #set_compress_level, #set_group, #set_instance_name, #set_max_message_size, #set_name_server_domain, #set_session_credentials, #set_timeout, #shutdown, #start
Constructor Details
#initialize(group_id, checker_callback, user_args: nil, timeout: nil, compress_level: nil, max_message_size: nil) ⇒ TransactionMQProducer
Returns a new instance of TransactionMQProducer.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/rocketmq-client-ruby/client/transaction_mq_producer.rb', line 7 def initialize(group_id, checker_callback, user_args: nil, timeout: nil, compress_level: nil, max_message_size: nil) # super(group_id, timeout, compress_level, max_message_size) @callback_refs = [] on_check = FFI::Function.new(:int, %i[pointer pointer pointer]) do |_, msg_ptr, _| exc = nil begin msg = ReceivedMessage.new(msg_ptr) check_result = checker_callback.call(msg) if check_result != TransactionStatus[:unknown] && check_result != TransactionStatus[:commit] && check_result != TransactionStatus[:rollback] raise StandardError.new('Check transaction status error, please use enum \'TransactionStatus\' as response') end return check_result rescue => ex exc = ex return TransactionStatus[:unknown] ensure raise exc if exc end end @callback_refs << on_check @producer = CreateTransactionProducer(group_id, on_check, user_args) raise StandardError.new('Returned null pointer when create transaction producer') unless @producer set_timeout(timeout) if timeout.to_i.positive? set_compress_level(compress_level) if compress_level () if .to_i.positive? end |
Instance Method Details
#send_message_in_transaction(message, local_execute, user_args: nil) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/rocketmq-client-ruby/client/transaction_mq_producer.rb', line 44 def (, local_execute, user_args: nil) on_local_exec = FFI::Function.new(:int, %i[pointer pointer pointer]) do |_, msg_ptr, user_args| exc = nil begin msg = ReceivedMessage.new(msg_ptr) execute_result = local_execute.call(msg) if execute_result != TransactionStatus[:unknown] && execute_result != TransactionStatus[:commit] && execute_result != TransactionStatus[:rollback] raise StandardError.new('Local transaction status error, please use enum \'TransactionStatus\' as response') end return execute_result rescue => ex exc = ex return TransactionStatus[:unknown] ensure raise exc if exc end end @callback_refs << on_local_exec c_result = SendResult.new begin SendMessageTransaction(@producer, .raw, on_local_exec, user_args, c_result.to_ptr) ensure @callback_refs.delete(on_local_exec) end Reponse.new(c_result) end |
#set_name_server_address(addr) ⇒ Object
40 41 42 |
# File 'lib/rocketmq-client-ruby/client/transaction_mq_producer.rb', line 40 def set_name_server_address(addr) SetProducerNameServerAddress(@producer, addr) end |