Class: Nsq::Producer
- Inherits:
-
ClientBase
- Object
- ClientBase
- Nsq::Producer
- Defined in:
- lib/nsq/producer.rb
Instance Attribute Summary collapse
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Attributes inherited from ClientBase
Instance Method Summary collapse
-
#deferred_write(delay, *raw_messages) ⇒ Object
Arg ‘delay’ in seconds.
- #deferred_write_to_topic(topic, delay, *raw_messages) ⇒ Object
-
#initialize(opts = {}) ⇒ Producer
constructor
A new instance of Producer.
- #write(*raw_messages) ⇒ Object
- #write_to_topic(topic, *raw_messages) ⇒ Object
Methods inherited from ClientBase
Methods included from AttributeLogger
Constructor Details
#initialize(opts = {}) ⇒ Producer
Returns a new instance of Producer.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/nsq/producer.rb', line 7 def initialize(opts = {}) @connections = {} @topic = opts[:topic] @discovery_interval = opts[:discovery_interval] || 60 @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] nsqlookupds = [] if opts[:nsqlookupd] nsqlookupds = [opts[:nsqlookupd]].flatten discover_repeatedly( nsqlookupds: nsqlookupds, interval: @discovery_interval ) elsif opts[:nsqd] nsqds = [opts[:nsqd]].flatten nsqds.each{|d| add_connection(d)} else add_connection('127.0.0.1:4150') end end |
Instance Attribute Details
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
5 6 7 |
# File 'lib/nsq/producer.rb', line 5 def topic @topic end |
Instance Method Details
#deferred_write(delay, *raw_messages) ⇒ Object
Arg ‘delay’ in seconds
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/nsq/producer.rb', line 41 def deferred_write(delay, *) if !@topic raise 'No topic specified. Either specify a topic when instantiating the Producer or use write_to_topic.' end if delay < 0.0 raise "Delay can't be negative, use a positive float." end deferred_write_to_topic(@topic, delay, *) end |
#deferred_write_to_topic(topic, delay, *raw_messages) ⇒ Object
69 70 71 72 73 74 75 76 |
# File 'lib/nsq/producer.rb', line 69 def deferred_write_to_topic(topic, delay, *) raise ArgumentError, 'message not provided' if .empty? = .map(&:to_s) connection = connection_for_write .each do |msg| connection.dpub(topic, (delay * 1000).to_i, msg) end end |
#write(*raw_messages) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/nsq/producer.rb', line 32 def write(*) if !@topic raise 'No topic specified. Either specify a topic when instantiating the Producer or use write_to_topic.' end write_to_topic(@topic, *) end |
#write_to_topic(topic, *raw_messages) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/nsq/producer.rb', line 52 def write_to_topic(topic, *) # return error if message(s) not provided raise ArgumentError, 'message not provided' if .empty? # stringify the messages = .map(&:to_s) # get a suitable connection to write to connection = connection_for_write if .length > 1 connection.mpub(topic, ) else connection.pub(topic, .first) end end |