Class: Client::Producer

Inherits:
Object
  • Object
show all
Includes:
Rocketmq::C
Defined in:
lib/rocketmq-client-ruby/client/producer.rb

Overview

Producer module

Direct Known Subclasses

TransactionMQProducer

Constant Summary

Constants included from Rocketmq::C

Rocketmq::C::ConsumeStatus, Rocketmq::C::MessageModel, Rocketmq::C::MessageProperty, Rocketmq::C::SendStatus, Rocketmq::C::Status, Rocketmq::C::TransactionStatus

Instance Method Summary collapse

Methods included from Rocketmq::C

attach_function_maybe

Constructor Details

#initialize(group_id, orderly: false, timeout: nil, compress_level: nil, max_message_size: nil) ⇒ Producer

Returns a new instance of Producer.

Raises:

  • (StandardError)


8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 8

def initialize(group_id, orderly: false, timeout: nil, compress_level: nil, max_message_size: nil)
  producer_factory =
    if orderly
      :CreateOrderlyProducer
    else
      :CreateProducer
    end

  @producer = send(producer_factory, group_id)
  @callback_refs = []
  raise StandardError.new('Returned null pointer when create Producer') unless @producer

  set_timeout(timeout) if timeout.to_i.positive?
  set_compress_level(compress_level) if compress_level
  set_max_message_size(max_message_size) if max_message_size.to_i.positive?
end

Instance Method Details

#destroyObject



81
82
83
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 81

def destroy
  DestroyPushConsumer(@producer)
end

#send_oneway(msg) ⇒ Object



55
56
57
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 55

def send_oneway(msg)
  SendMessageOneway(msg)
end

#send_orderly_with_sharding_key(msg, sharding_key) ⇒ Object



59
60
61
62
63
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 59

def send_orderly_with_sharding_key(msg, sharding_key)
  c_result = SendResult.new
  SendMessageOrderlyByShardingKey(@producer, msg.raw, sharding_key, c_result.to_ptr)
  Response.new(c_result)
end

#send_sync(msg) ⇒ Object



49
50
51
52
53
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 49

def send_sync(msg)
  c_result = SendResult.new
  SendMessageSync(@producer, msg.raw, c_result.to_ptr)
  Response.new(c_result)
end

#set_compress_level(compress_level) ⇒ Object



25
26
27
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 25

def set_compress_level(compress_level)
  SetProducerCompressLevel(@producer, compress_level)
end

#set_group(group_name) ⇒ Object



65
66
67
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 65

def set_group(group_name)
  SetProducerGroupName(@producer, group_name)
end

#set_instance_name(instance_name) ⇒ Object



69
70
71
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 69

def set_instance_name(instance_name)
  SetProducerInstanceName(@producer, instance_name)
end

#set_max_message_size(max_message_size) ⇒ Object



29
30
31
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 29

def set_max_message_size(max_message_size)
  SetProducerMaxMessageSize(@producer, max_message_size)
end

#set_name_server_address(addr) ⇒ Object



41
42
43
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 41

def set_name_server_address(addr)
  SetProducerNameServerAddress(@producer, addr)
end

#set_name_server_domain(domain) ⇒ Object



37
38
39
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 37

def set_name_server_domain(domain)
  SetProducerNameServerDomain(@producer, domain)
end

#set_session_credentials(access_key, access_secret, channel) ⇒ Object



45
46
47
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 45

def set_session_credentials(access_key, access_secret, channel)
  SetProducerSessionCredentials(@producer, access_key, access_secret, channel)
end

#set_timeout(timeout) ⇒ Object



33
34
35
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 33

def set_timeout(timeout)
  SetProducerSendMsgTimeout(@producer, timeout)
end

#shutdownObject



77
78
79
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 77

def shutdown
  ShutdownProducer(@producer)
end

#startObject



73
74
75
# File 'lib/rocketmq-client-ruby/client/producer.rb', line 73

def start
  StartProducer(@producer)
end