Class: Client::TransactionMQProducer

Inherits:
Producer
  • Object
show all
Includes:
Rocketmq::C
Defined in:
lib/rocketmq-client-ruby/client/transaction_mq_producer.rb

Overview

TransactionMQProducer module

Constant Summary

Constants included from Rocketmq::C

Rocketmq::C::ConsumeStatus, Rocketmq::C::MessageModel, Rocketmq::C::MessageProperty, Rocketmq::C::SendStatus, Rocketmq::C::Status, Rocketmq::C::TransactionStatus

Instance Method Summary collapse

Methods included from Rocketmq::C

attach_function_maybe

Methods inherited from Producer

#destroy, #send_oneway, #send_orderly_with_sharding_key, #send_sync, #set_compress_level, #set_group, #set_instance_name, #set_max_message_size, #set_name_server_domain, #set_session_credentials, #set_timeout, #shutdown, #start

Constructor Details

#initialize(group_id, checker_callback, user_args: nil, timeout: nil, compress_level: nil, max_message_size: nil) ⇒ TransactionMQProducer

Returns a new instance of TransactionMQProducer.

Raises:

  • (StandardError)


7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rocketmq-client-ruby/client/transaction_mq_producer.rb', line 7

def initialize(group_id, checker_callback, user_args: nil, timeout: nil, compress_level: nil, max_message_size: nil)
  # super(group_id, timeout, compress_level, max_message_size)
  @callback_refs = []

  on_check =
    FFI::Function.new(:int, %i[pointer pointer pointer]) do |_, msg_ptr, _|
    exc = nil
    begin
      msg = ReceivedMessage.new(msg_ptr)
      check_result = checker_callback.call(msg)
      if check_result != TransactionStatus[:unknown] &&
         check_result != TransactionStatus[:commit] &&
         check_result != TransactionStatus[:rollback]
        raise StandardError.new('Check transaction status error, please use enum \'TransactionStatus\' as response')
      end
      return check_result
    rescue => ex
      exc = ex
      return TransactionStatus[:unknown]
    ensure
      raise exc if exc
    end
  end

  @callback_refs << on_check
  @producer = CreateTransactionProducer(group_id, on_check, user_args)
  raise StandardError.new('Returned null pointer when create transaction producer') unless @producer

  set_timeout(timeout) if timeout.to_i.positive?
  set_compress_level(compress_level) if compress_level
  set_max_message_size(max_message_size) if max_message_size.to_i.positive?
end

Instance Method Details

#send_message_in_transaction(message, local_execute, user_args: nil) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rocketmq-client-ruby/client/transaction_mq_producer.rb', line 44

def send_message_in_transaction(message, local_execute, user_args: nil)

  on_local_exec =
    FFI::Function.new(:int, %i[pointer pointer pointer]) do |_, msg_ptr, user_args|
    exc = nil
    begin
      msg = ReceivedMessage.new(msg_ptr)
      execute_result = local_execute.call(msg)
      if execute_result != TransactionStatus[:unknown] &&
         execute_result != TransactionStatus[:commit] &&
         execute_result != TransactionStatus[:rollback]
        raise StandardError.new('Local transaction status error, please use enum \'TransactionStatus\' as response')
      end
      return execute_result
    rescue => ex
      exc = ex
      return TransactionStatus[:unknown]
    ensure
      raise exc if exc
    end
  end
  @callback_refs << on_local_exec
  c_result = SendResult.new
  begin
    SendMessageTransaction(@producer, message.raw, on_local_exec, user_args, c_result.to_ptr)
  ensure
    @callback_refs.delete(on_local_exec)
  end
  Reponse.new(c_result)
end

#set_name_server_address(addr) ⇒ Object



40
41
42
# File 'lib/rocketmq-client-ruby/client/transaction_mq_producer.rb', line 40

def set_name_server_address(addr)
  SetProducerNameServerAddress(@producer, addr)
end