Class: Marquise
- Inherits:
-
Object
- Object
- Marquise
- Includes:
- FFI::DRY::ErrnoHelper
- Defined in:
- lib/marquise.rb,
lib/marquise/ffi.rb
Overview
A Vaultaire data point transport
Instances of ‘Marquise` send data points that they are told about to a Vaultaire data store. It has a very simple interface that hides a lot of complexity.
Defined Under Namespace
Class Method Summary collapse
-
.open(zmq_url, batch_period = 5) ⇒ Object
Open a Marquise consumer and (optionally) yield it to a block.
Instance Method Summary collapse
-
#close ⇒ Object
Close a Marquise instance.
-
#connect ⇒ Object
:stopdoc: Initialize a connection.
-
#connection ⇒ Object
Get the connection pointer for the current thread.
-
#initialize(zmq_url, batch_period = 5) ⇒ Marquise
constructor
Create a new ‘Marquise` transport object.
- #tell(*args) ⇒ Object
Constructor Details
#initialize(zmq_url, batch_period = 5) ⇒ Marquise
Create a new ‘Marquise` transport object
‘zmq_url` is the URL to your ZeroMQ broker associated with the Vaultaire system you’re dumping data into. ‘batch_period` is optional (defaults to `5`) is the number of seconds between “flushes” of data points to ZeroMQ. It can be a floating-point number if you wish to have sub-second flushes. Increasing the `batch_period` increases the possibility of losing data points in the event of a spectacular failure, but also improves performance.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/marquise.rb', line 43 def initialize(zmq_url, batch_period = 5) @consumer = Marquise::FFI.marquise_consumer_new(zmq_url, batch_period) if @consumer.nil? raise RuntimeError, "libmarquise failed; check syslog (no, seriously)" end $stderr.puts "Consumer is #{@consumer.inspect}" if $DEBUG @connections = {} @janitor = Janitor.new(@consumer, @connections) ObjectSpace.define_finalizer(self, @janitor) end |
Class Method Details
.open(zmq_url, batch_period = 5) ⇒ Object
Open a Marquise consumer and (optionally) yield it to a block
With no associated block, ‘Marquise.open` is a synonym for `Marquise.new`. If the optional block is given, a newly created `Marquise` object will be passed to the block, and will then be automatically closed when the block terminates, and the value of the block returned.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/marquise.rb', line 19 def self.open(zmq_url, batch_period = 5) m = Marquise.new(zmq_url, batch_period) rv = m if block_given? begin rv = yield m ensure m.close end end rv end |
Instance Method Details
#close ⇒ Object
Close a Marquise instance
This must be called when you are done with your Marquise instance, to avoid leaving memory and file descriptors.
139 140 141 142 143 144 |
# File 'lib/marquise.rb', line 139 def close @janitor.call ObjectSpace.undefine_finalizer(self) @connections = {} @consumer = nil end |
#connect ⇒ Object
:stopdoc: Initialize a connection
You should rarely have to call this method yourself; Marquise will do it automatically for you when required.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/marquise.rb', line 151 def connect th = Thread.current return if @connections[th] @connections[th] = Marquise::FFI.marquise_connect(@consumer) if @connections[th].nil? raise RuntimeError.new("marquise_connect() failed... consult syslog (no, seriously)") end $stderr.puts "Created new connection #{@connections[th].inspect} for #{th.inspect}" if $DEBUG nil end |
#connection ⇒ Object
Get the connection pointer for the current thread
168 169 170 171 172 |
# File 'lib/marquise.rb', line 168 def connection self.connect @connections[Thread.current] end |
#tell(*args) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/marquise.rb', line 58 def tell(*args) if @consumer.nil? raise IOError, "Connection has been closed" end val, ts, opts = parse_tell_opts(args) k, v, len = if opts.length == 0 [nil, nil, 0] else [ Marquise::FFI.pointer_list_from_string_array(opts.keys), Marquise::FFI.pointer_list_from_string_array(opts.values), opts.length ] end rv = if val.nil? Marquise::FFI.marquise_send_counter( connection, k, v, len, ts.to_f * 1_000_000_000 ) elsif val.respond_to? :to_str and val.respond_to? :encoding s = val.to_str method = nil if s.encoding.to_s == 'ASCII-8BIT' or !s.force_encoding('UTF-8').valid_encoding? method = :marquise_send_binary else method = :marquise_send_text s = s.encode('UTF-8') end Marquise::FFI.send( method, connection, k, v, len, s, s.length, ts.to_f * 1_000_000_000 ) elsif val.respond_to? :integer? and val.integer? if val < -(2**63)+1 or val > (2**63)-1 raise ArgumentError, "Integer out of range for Marquise#tell" end Marquise::FFI.marquise_send_int( connection, k, v, len, val, ts.to_f * 1_000_000_000 ) elsif val.respond_to? :integer? and !val.integer? Marquise::FFI.marquise_send_real( connection, k, v, len, val, ts.to_f * 1_000_000_000 ) end if rv == -1 raise errno_exception end end |