Class: NatsMessaging::NatsService

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_messaging/nats_service.rb

Instance Method Summary collapse

Constructor Details

#initialize(nats_url, stream_name: nil, subjects: []) ⇒ NatsService

Returns a new instance of NatsService.



6
7
8
9
10
11
12
13
14
# File 'lib/nats_messaging/nats_service.rb', line 6

def initialize(nats_url, stream_name: nil, subjects: [])
  # para correr en local usar "nats://localhost:4222" como nats_url
  @nats = NATS.connect(nats_url)
  if stream_name
    @js = @nats.jetstream
    create_stream(stream_name, subjects)
  end
  @subscriptions = {} # Store active subscriptions
end

Instance Method Details

#create_stream(stream_name, subjects) ⇒ Object

Create a stream



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/nats_messaging/nats_service.rb', line 17

def create_stream(stream_name, subjects)
  begin
    stream = @js.stream_info(stream_name)
    puts "NATS: Stream #{stream_name} already exists"
  rescue NATS::JetStream::Error::NotFound
    @js.add_stream(name: stream_name, subjects: subjects)
    puts "NATS: Stream #{stream_name} created"
  rescue => e
    puts "NATS: Failed to create stream: #{e.message}"
  end
end

#listen_and_reply(subject, reply_message) ⇒ Object

Listen to a subject and reply with a message using MessagePack



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/nats_messaging/nats_service.rb', line 101

def listen_and_reply(subject, reply_message)
  # Cancel active subscription if it already exists for this subject
  if @subscriptions[subject]
    @subscriptions[subject].unsubscribe
  end
  begin
    # Create a new subscription
    puts "NATS: Suscribing to #{subject}"
    subscription = @nats.subscribe(subject) do |msg|
      puts "NATS: process reply?"
      process_reply(msg, subject, reply_message)
    end
  rescue => e
    puts "NATS: Error while replying: #{e.message}"
  end

  # Store the new subscription in the hash
  @subscriptions[subject] = subscription
  puts "NATS: Stored subscription for #{subject}: #{@subscriptions[subject].inspect}"
  puts "NATS: Listening on #{subject} with reply message: #{reply_message}"
end

#process_received_message(msg, subject) ⇒ Object

Process received message



71
72
73
74
75
76
77
78
79
80
# File 'lib/nats_messaging/nats_service.rb', line 71

def process_received_message(msg, subject)
  begin
    unpacked_data = MessagePack.unpack(msg.data)
    puts "NATS: Message received on #{subject}: #{unpacked_data}"
    msg.ack if msg.respond_to?(:ack)
    unpacked_data
  rescue => e
    puts "NATS: Error while processing message: #{e.message}"
  end
end

#publish_message(subject, message) ⇒ Object

Publish a message to a subject using MessagePack



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/nats_messaging/nats_service.rb', line 30

def publish_message(subject, message)
  packed_message = message.to_msgpack # Serializar a MessagePack

  if @js
    begin
      ack = @js.publish(subject, packed_message)
      puts "NATS: Message sent to #{subject}: #{message}, ACK: #{ack.inspect}"
    rescue NATS::JetStream::Error::NotFound
      @nats.publish(subject, packed_message)
      puts "NATS: Stream not found, falling back to regular NATS publish"
    rescue => e
      puts "NATS: Unexpected error: #{e.message}"
    end
  else
    begin
      @nats.publish(subject, packed_message)
      puts "NATS: Message sent to #{subject}: #{message}, ACK: #{ack.inspect}"
    rescue => e
      puts "NATS: Unexpected error: #{e.message}"
    end
  end
end

#send_request(subject, message) ⇒ Object

Send a request to a subject and wait for response using MessagePack



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/nats_messaging/nats_service.rb', line 83

def send_request(subject, message)
  puts "NATS: Sending request to #{subject} with message: #{message}"
  begin
    packed_message = message.to_msgpack # Serializar el mensaje
    response = @nats.request(subject, packed_message, timeout: 5) # Timeout of 5 seconds
    unpacked_response = MessagePack.unpack(response.data) # Deserializar respuesta
    puts "NATS: Received reply: #{unpacked_response}"
    unpacked_response
  rescue NATS::IO::Timeout
    puts "NATS: Request timed out for subject: #{subject}"
    nil
  rescue => e
    puts "NATS: Unexpected error: #{e.message}"
    nil
  end
end

#subscribe_to_subject(subject, durable_name = "durable_name") ⇒ Object

Subscribe to a subject using MessagePack



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/nats_messaging/nats_service.rb', line 54

def subscribe_to_subject(subject, durable_name = "durable_name")
  puts "NATS: Subscribing to #{subject}"

  if @js
    subscription = @js.subscribe(subject, durable: durable_name) do |msg|
                    process_received_message(msg, subject)
                   end
  else
    subscription = @nats.subscribe(subject) do |msg|
                    process_received_message(msg, subject)
                   end
  end

  @subscriptions[subject] = subscription
end