Class: Protobuf::Nats::JNats

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/jnats.rb

Defined Under Namespace

Classes: Message

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJNats

Returns a new instance of JNats.



28
29
30
31
32
33
34
35
# File 'lib/protobuf/nats/jnats.rb', line 28

def initialize
  @on_error_cb = lambda {|error|}
  @on_reconnect_cb = lambda {}
  @on_disconnect_cb = lambda {}
  @on_close_cb = lambda {}
  @subz_cbs = {}
  @subz_mutex = ::Mutex.new
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



16
17
18
# File 'lib/protobuf/nats/jnats.rb', line 16

def connection
  @connection
end

Instance Method Details

#closeObject

Do not depend on #close for a greaceful disconnect.



72
73
74
75
76
77
78
79
# File 'lib/protobuf/nats/jnats.rb', line 72

def close
  @connection.close
  @connection = nil
  @supervisor.kill rescue nil
  @supervisor = nil
  @consumer.kill rescue nil
  @supervisor = nil
end

#connect(options = {}) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/protobuf/nats/jnats.rb', line 37

def connect(options = {})
  servers = options[:servers] || ["nats://localhost:4222"]
  servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) }
  connection_factory = ::Java::IoNatsClient::ConnectionFactory.new
  connection_factory.setServers(servers)
  connection_factory.setMaxReconnect(options[:max_reconnect_attempts])

  # Shrink the pending buffer to always raise an error and let the caller retry.
  if options[:disable_reconnect_buffer]
    connection_factory.setReconnectBufSize(1)
  end

  # Setup callbacks
  connection_factory.setDisconnectedCallback { |event| @on_disconnect_cb.call }
  connection_factory.setReconnectedCallback { |_event| @on_reconnect_cb.call }
  connection_factory.setClosedCallback { |_event| @on_close_cb.call }
  connection_factory.setExceptionHandler { |error| @on_error_cb.call(error) }

  # Setup ssl context if we're using tls
  if options[:uses_tls]
    ssl_context = create_ssl_context(options)
    connection_factory.setSecure(true)
    connection_factory.setSSLContext(ssl_context)
  end

  @connection = connection_factory.createConnection

  # We're going to spawn a consumer and supervisor
  @work_queue = @connection.createMsgChannel
  spwan_supervisor_and_consumer

  @connection
end

#flush(timeout_sec = 0.5) ⇒ Object



81
82
83
# File 'lib/protobuf/nats/jnats.rb', line 81

def flush(timeout_sec = 0.5)
  @connection.flush(timeout_sec * 1000)
end

#new_inboxObject



133
134
135
# File 'lib/protobuf/nats/jnats.rb', line 133

def new_inbox
  "_INBOX.#{::SecureRandom.hex(13)}"
end

#next_message(sub, timeout_sec) ⇒ Object



85
86
87
88
89
# File 'lib/protobuf/nats/jnats.rb', line 85

def next_message(sub, timeout_sec)
  nats_message = sub.nextMessage(timeout_sec * 1000)
  return nil unless nats_message
  Message.new(nats_message)
end

#on_close(&cb) ⇒ Object



149
150
151
# File 'lib/protobuf/nats/jnats.rb', line 149

def on_close(&cb)
  @on_close_cb = cb
end

#on_disconnect(&cb) ⇒ Object



141
142
143
# File 'lib/protobuf/nats/jnats.rb', line 141

def on_disconnect(&cb)
  @on_disconnect_cb = cb
end

#on_error(&cb) ⇒ Object



145
146
147
# File 'lib/protobuf/nats/jnats.rb', line 145

def on_error(&cb)
  @on_error_cb = cb
end

#on_reconnect(&cb) ⇒ Object



137
138
139
# File 'lib/protobuf/nats/jnats.rb', line 137

def on_reconnect(&cb)
  @on_reconnect_cb = cb
end

#publish(subject, data, mailbox = nil) ⇒ Object



91
92
93
94
# File 'lib/protobuf/nats/jnats.rb', line 91

def publish(subject, data, mailbox = nil)
  # The "true" here is to force flush. May not need this.
  @connection.publish(subject, mailbox, data.to_java_bytes, true)
end

#subscribe(subject, options = {}, &block) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/protobuf/nats/jnats.rb', line 96

def subscribe(subject, options = {}, &block)
  queue = options[:queue]
  max = options[:max]
  work_queue = nil
  # We pass our work queue for processing async work because java nats
  # uses a cahced thread pool: 1 thread per async subscription.
  # Sync subs need their own queue so work is not processed async.
  work_queue = block.nil? ? @connection.createMsgChannel : @work_queue
  sub = @connection.subscribe(subject, queue, nil, work_queue)

  # Register the block callback. We only lock to save the callback.
  if block
    @subz_mutex.synchronize do
      @subz_cbs[sub.getSid] = block
    end
  end

  # Auto unsub if max message option was provided.
  sub.autoUnsubscribe(max) if max

  sub
end

#unsubscribe(sub) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/protobuf/nats/jnats.rb', line 119

def unsubscribe(sub)
  return if sub.nil?

  # Cleanup our async callback
  if @subz_cbs[sub.getSid]
    @subz_mutex.synchronize do
      @subz_cbs.delete(sub.getSid)
    end
  end

  # The "true" here is to ignore and invalid conn.
  sub.unsubscribe(true)
end