Module: FFI::Nats::Core

Extended by:
Library
Defined in:
lib/ffi/nats/core.rb,
lib/ffi/nats/core/version.rb

Constant Summary collapse

NATS_STATUS =
enum [
  :NATS_OK, 0,                     #< Success
  :NATS_ERR,                       #< Generic error
  :NATS_PROTOCOL_ERROR,            #< Error when parsing a protocol message, or not getting the expected message.
  :NATS_IO_ERROR,                  #< IO Error (network communication).
  :NATS_LINE_TOO_LONG,             #< The protocol message read from the socket does not fit in the read buffer.
  :NATS_CONNECTION_CLOSED,         #< Operation on this connection failed because the connection is closed.
  :NATS_NO_SERVER,                 #< Unable to connect, the server could not be reached or is not running.
  :NATS_STALE_CONNECTION,          #< The server closed our connection because it did not receive PINGs at the expected interval.
  :NATS_SECURE_CONNECTION_WANTED,  #< The client is configured to use TLS, but the server is not.
  :NATS_SECURE_CONNECTION_REQUIRED,#< The server expects a TLS connection.
  :NATS_CONNECTION_DISCONNECTED,   #< The connection was disconnected. Depending on the configuration, the connection may reconnect.
  :NATS_CONNECTION_AUTH_FAILED,    #< The connection failed due to authentication error.
  :NATS_NOT_PERMITTED,             #< The action is not permitted.
  :NATS_NOT_FOUND,                 #< An action could not complete because something was not found. So far, this is an internal error.
  :NATS_ADDRESS_MISSING,           #< Incorrect URL. For instance no host specified in the URL.
  :NATS_INVALID_SUBJECT,           #< Invalid subject, for instance NULL or empty string.
  :NATS_INVALID_ARG,               #< An invalid argument is passed to a function. For instance passing NULL to an API that does not accept this value.
  :NATS_INVALID_SUBSCRIPTION,      #< The call to a subscription function fails because the subscription has previously been closed.
  :NATS_INVALID_TIMEOUT,           #< Timeout must be positive numbers.
  :NATS_ILLEGAL_STATE,             #< An unexpected state, for instance calling #natsSubscription_NextMsg() on an asynchronous subscriber.
  :NATS_SLOW_CONSUMER,             #< The maximum number of messages waiting to be delivered has been reached. Messages are dropped.
  :NATS_MAX_PAYLOAD,               #< Attempt to send a payload larger than the maximum allowed by the NATS Server.
  :NATS_MAX_DELIVERED_MSGS,        #< Attempt to receive more messages than allowed, for instance because of #natsSubscription_AutoUnsubscribe().
  :NATS_INSUFFICIENT_BUFFER,       #< A buffer is not large enough to accommodate the data.
  :NATS_NO_MEMORY,                 #< An operation could not complete because of insufficient memory.
  :NATS_SYS_ERROR,                 #< Some system function returned an error.
  :NATS_TIMEOUT,                   #< An operation timed-out. For instance #natsSubscription_NextMsg().
  :NATS_FAILED_TO_INITIALIZE,      #< The library failed to initialize.
  :NATS_NOT_INITIALIZED,           #< The library is not yet initialized.
  :NATS_SSL_ERROR                  #< An SSL error occurred when trying to establish a connection.
]
SubscribeCallback =
FFI::Function.new(:void, [:pointer, :pointer, :pointer, :pointer], :blocking => true) do |conn, sub, msg, closure|
  #queue_name = closure.read_string
  #queue_name = FFI::Nats::Core.natsMsg_GetSubject(msg)
  #queue_for_and_remove(queue_name) << FFI::Nats::Core.natsMsg_GetData(msg)

  #print "+"
  reply_to, _ = FFI::Nats::Core.natsMsg_GetReply(msg)
  FFI::Nats::Core.natsConnection_PublishString(conn, reply_to, "thanks")
  FFI::Nats::Core.natsConnection_Flush(conn)
  FFI::Nats::Core.natsMsg_Destroy(msg)
end
VERSION =
"0.3.0"

Class Method Summary collapse

Class Method Details

.run_subscribe(connection) ⇒ Object



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/ffi/nats/core.rb', line 269

def self.run_subscribe(connection)
  subscription = FFI::MemoryPointer.new :pointer
  uuid = SecureRandom.uuid
  q = Queue.new
  #q = queue_for(uuid)

  #FFI::Nats::Core.natsConnection_Subscribe(subscription, connection, uuid, FFI::Nats::Core::SubscribeCallback, nil)
  subscribe(connection, subscription, uuid) do |conn, sub, msg, closure|
    print "+"
    data, _ = FFI::Nats::Core.natsMsg_GetData(msg)
    subject, _ = FFI::Nats::Core.natsMsg_GetSubject(msg)
    q << data
    FFI::Nats::Core.natsMsg_Destroy(msg)
    FFI::Nats::Core.natsSubscription_Unsubscribe(sub)
  end

  #FFI::Nats::Core.natsSubscription_AutoUnsubscribe(subscription.get_pointer(0), 1)
  sub = subscription.get_pointer(0)
  #FFI::Nats::Core.natsSubscription_AutoUnsubscribe(sub, 1)
  FFI::Nats::Core.natsConnection_PublishString(connection, uuid, "hello from the other side")
  #FFI::Nats::Core.natsConnection_Flush(connection)

  q.pop
  FFI::Nats::Core.natsSubscription_Destroy(sub)
end

.subscribe(connection, subscription, subject, &blk) ⇒ Object



261
262
263
264
265
266
267
# File 'lib/ffi/nats/core.rb', line 261

def self.subscribe(connection, subscription, subject, &blk)
  if blk.arity == 4
    FFI::Nats::Core.natsConnection_Subscribe(subscription, connection, subject, blk, nil)
  else
    raise "subscribe block arity must be 4 ... ish"
  end
end

.test_request_replyObject



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/ffi/nats/core.rb', line 317

def self.test_request_reply
  start = Time.now
  num_threads = 8
  publish_per_thread = 100_000
  threads = []
  subject = "hello"
  message = "world"
  reply = "thanks"
  message_size = message.size

  subscription = FFI::MemoryPointer.new :pointer
  opts_pointer = FFI::MemoryPointer.new :pointer
  conn_t = FFI::MemoryPointer.new :pointer

  FFI::Nats::Core.natsOptions_Create(opts_pointer)
  opts_pointer = opts_pointer.get_pointer(0)
  FFI::Nats::Core.natsOptions_SetURL(opts_pointer, "nats://localhost:4222")
  FFI::Nats::Core.natsOptions_UseGlobalMessageDelivery(opts_pointer, true)

  FFI::Nats::Core.natsConnection_Connect(conn_t, opts_pointer)
  conn_t = conn_t.get_pointer(0)
  FFI::Nats::Core.natsConnection_Subscribe(subscription, conn_t, subject, FFI::Nats::Core::SubscribeCallback, nil)
  FFI::Nats::Core.natsConnection_Flush(conn_t)

  num_threads.times do
    threads << Thread.new do
      options_pointer = FFI::MemoryPointer.new :pointer
      connection_pointer = FFI::MemoryPointer.new :pointer

      FFI::Nats::Core.natsOptions_Create(options_pointer)
      options_pointer = options_pointer.get_pointer(0)
      FFI::Nats::Core.natsOptions_SetURL(options_pointer, "nats://localhost:4222")

      FFI::Nats::Core.natsConnection_Connect(connection_pointer, options_pointer)
      connection_pointer = connection_pointer.get_pointer(0)

      publish_per_thread.times do
        FFI::MemoryPointer.new(:pointer) do |message_pointer|
          FFI::Nats::Core.natsConnection_RequestString(message_pointer, connection_pointer, subject, message, 1000)
          FFI::Nats::Core.natsMsg_Destroy(message_pointer.get_pointer(0))
        end
      end
    end
  end

  threads.map(&:join)

  FFI::Nats::Core.natsSubscription_Unsubscribe(subscription.get_pointer(0))
  FFI::Nats::Core.natsSubscription_Destroy(subscription.get_pointer(0))

  finish = Time.now
  time_diff = finish.to_i - start.to_i
  throughput = (num_threads * publish_per_thread)
  puts "    THREADS: \#{num_threads}\n    PUBLISH PER THREAD: \#{publish_per_thread}\n    START: \#{start}\n    FINISH: \#{finish}\n    PER SECOND: \#{time_diff == 0 ? throughput : throughput/time_diff}\n    FINISH\nend\n"

.test_subscribeObject



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/ffi/nats/core.rb', line 295

def self.test_subscribe
  threads = []

  1.times do
    threads << Thread.new do
      connection_pointer = FFI::MemoryPointer.new :pointer
      FFI::Nats::Core.natsConnection_ConnectTo(connection_pointer, "nats://localhost:4222")
      connection = connection_pointer.get_pointer(0)

      1_000.times do
        run_subscribe(connection)
      end

      FFI::Nats::Core.natsConnection_Flush(connection)
      FFI::Nats::Core.natsConnection_Close(connection)
      FFI::Nats::Core.natsConnection_Destroy(connection)
    end
  end

  threads.map(&:join)
end

.test_threadedObject



423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/ffi/nats/core.rb', line 423

def self.test_threaded
  start = Time.now
  num_threads = 4
  publish_per_thread = 100_000
  threads = []
  subject = "hello"
  message = "world"
  message_size = message.size

  num_threads.times do
    threads << Thread.new do
      connection_pointer = nil

      if false
        connection_pointer = FFI::MemoryPointer.new :pointer
        FFI::Nats::Core.natsConnection_ConnectTo(connection_pointer, "nats://localhost:4222")
        connection_pointer = connection_pointer.get_pointer(0)
      else
        options_pointer = FFI::MemoryPointer.new :pointer
        connection_pointer = FFI::MemoryPointer.new :pointer

        FFI::Nats::Core.natsOptions_Create(options_pointer)
        options_pointer = options_pointer.get_pointer(0)
        FFI::Nats::Core.natsOptions_SetURL(options_pointer, "nats://0.0.0.0:4222")

        FFI::Nats::Core.natsConnection_Connect(connection_pointer, options_pointer)
        connection_pointer = connection_pointer.get_pointer(0)
      end

      publish_per_thread.times do
        FFI::Nats::Core.natsConnection_PublishString(connection_pointer, subject, message)
      end
    end
  end

  threads.map(&:join)
  finish = Time.now
  total_time = finish.to_i - start.to_i
  total_time = 1 if total_time.zero?
  puts "    THREADS: \#{num_threads}\n    PUBLISH PER THREAD: \#{publish_per_thread}\n    START: \#{start}\n    FINISH: \#{finish}\n    PER SECOND: \#{(num_threads * publish_per_thread)/total_time}\n    FINISH\nend\n"

.test_threaded_single_connectionObject



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# File 'lib/ffi/nats/core.rb', line 379

def self.test_threaded_single_connection
  start = Time.now
  num_threads = 8
  publish_per_thread = 500_000
  publishes = 0
  threads = []
  subject = "hello"
  message = "world"
  message_size = message.size

  options_pointer = FFI::MemoryPointer.new :pointer
  connection_pointer = FFI::MemoryPointer.new :pointer

  FFI::Nats::Core.natsOptions_Create(options_pointer)
  options_pointer = options_pointer.get_pointer(0)
  FFI::Nats::Core.natsOptions_SetURL(options_pointer, "nats://0.0.0.0:4222")

  FFI::Nats::Core.natsConnection_Connect(connection_pointer, options_pointer)
  connection_pointer = connection_pointer.get_pointer(0)

  num_threads.times do
    threads << Thread.new do
      publish_per_thread.times do
        status = FFI::Nats::Core.natsConnection_Publish(connection_pointer, subject, message, message.size)
        puts status unless NATS_STATUS[status] == NATS_STATUS[:NATS_OK]
      end
    end
  end

  threads.map(&:join)
  FFI::Nats::Core.natsConnection_Flush(connection_pointer)
  finish = Time.now
  total_time = finish.to_i - start.to_i
  total_time = 1 if total_time.zero?
  puts "    PUBLISHES: \#{publishes}\n    THREADS: \#{num_threads}\n    PUBLISH PER THREAD: \#{publish_per_thread}\n    START: \#{start}\n    FINISH: \#{finish}\n    PER SECOND: \#{(num_threads * publish_per_thread)/total_time}\n    FINISH\nend\n"