Class: NatsMessaging::NatsService
- Inherits:
-
Object
- Object
- NatsMessaging::NatsService
- Defined in:
- lib/nats_messaging/nats_service.rb
Instance Method Summary collapse
-
#create_stream(stream_name, subjects) ⇒ Object
Create a stream.
-
#initialize(nats_url, stream_name: nil, subjects: []) ⇒ NatsService
constructor
A new instance of NatsService.
-
#listen_and_reply(subject, reply_message) ⇒ Object
Listen to a subject and reply with a message using MessagePack.
-
#process_received_message(msg, subject) ⇒ Object
Process received message.
-
#publish_message(subject, message) ⇒ Object
Publish a message to a subject using MessagePack.
-
#send_request(subject, message) ⇒ Object
Send a request to a subject and wait for response using MessagePack.
-
#subscribe_to_subject(subject, durable_name = "durable_name") ⇒ Object
Subscribe to a subject using MessagePack.
Constructor Details
#initialize(nats_url, stream_name: nil, subjects: []) ⇒ NatsService
Returns a new instance of NatsService.
6 7 8 9 10 11 12 13 14 |
# File 'lib/nats_messaging/nats_service.rb', line 6 def initialize(nats_url, stream_name: nil, subjects: []) # para correr en local usar "nats://localhost:4222" como nats_url @nats = NATS.connect(nats_url) if stream_name @js = @nats.jetstream create_stream(stream_name, subjects) end @subscriptions = {} # Store active subscriptions end |
Instance Method Details
#create_stream(stream_name, subjects) ⇒ Object
Create a stream
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/nats_messaging/nats_service.rb', line 17 def create_stream(stream_name, subjects) begin stream = @js.stream_info(stream_name) puts "NATS: Stream #{stream_name} already exists" rescue NATS::JetStream::Error::NotFound @js.add_stream(name: stream_name, subjects: subjects) puts "NATS: Stream #{stream_name} created" rescue => e puts "NATS: Failed to create stream: #{e.}" end end |
#listen_and_reply(subject, reply_message) ⇒ Object
Listen to a subject and reply with a message using MessagePack
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/nats_messaging/nats_service.rb', line 101 def listen_and_reply(subject, ) # Cancel active subscription if it already exists for this subject if @subscriptions[subject] @subscriptions[subject].unsubscribe end begin # Create a new subscription puts "NATS: Suscribing to #{subject}" subscription = @nats.subscribe(subject) do |msg| puts "NATS: process reply?" process_reply(msg, subject, ) end rescue => e puts "NATS: Error while replying: #{e.}" end # Store the new subscription in the hash @subscriptions[subject] = subscription puts "NATS: Stored subscription for #{subject}: #{@subscriptions[subject].inspect}" puts "NATS: Listening on #{subject} with reply message: #{}" end |
#process_received_message(msg, subject) ⇒ Object
Process received message
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/nats_messaging/nats_service.rb', line 71 def (msg, subject) begin unpacked_data = MessagePack.unpack(msg.data) puts "NATS: Message received on #{subject}: #{unpacked_data}" msg.ack if msg.respond_to?(:ack) unpacked_data rescue => e puts "NATS: Error while processing message: #{e.}" end end |
#publish_message(subject, message) ⇒ Object
Publish a message to a subject using MessagePack
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/nats_messaging/nats_service.rb', line 30 def (subject, ) = .to_msgpack # Serializar a MessagePack if @js begin ack = @js.publish(subject, ) puts "NATS: Message sent to #{subject}: #{}, ACK: #{ack.inspect}" rescue NATS::JetStream::Error::NotFound @nats.publish(subject, ) puts "NATS: Stream not found, falling back to regular NATS publish" rescue => e puts "NATS: Unexpected error: #{e.}" end else begin @nats.publish(subject, ) puts "NATS: Message sent to #{subject}: #{}, ACK: #{ack.inspect}" rescue => e puts "NATS: Unexpected error: #{e.}" end end end |
#send_request(subject, message) ⇒ Object
Send a request to a subject and wait for response using MessagePack
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/nats_messaging/nats_service.rb', line 83 def send_request(subject, ) puts "NATS: Sending request to #{subject} with message: #{}" begin = .to_msgpack # Serializar el mensaje response = @nats.request(subject, , timeout: 5) # Timeout of 5 seconds unpacked_response = MessagePack.unpack(response.data) # Deserializar respuesta puts "NATS: Received reply: #{unpacked_response}" unpacked_response rescue NATS::IO::Timeout puts "NATS: Request timed out for subject: #{subject}" nil rescue => e puts "NATS: Unexpected error: #{e.}" nil end end |
#subscribe_to_subject(subject, durable_name = "durable_name") ⇒ Object
Subscribe to a subject using MessagePack
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/nats_messaging/nats_service.rb', line 54 def subscribe_to_subject(subject, durable_name = "durable_name") puts "NATS: Subscribing to #{subject}" if @js subscription = @js.subscribe(subject, durable: durable_name) do |msg| (msg, subject) end else subscription = @nats.subscribe(subject) do |msg| (msg, subject) end end @subscriptions[subject] = subscription end |