Class: Marquise

Inherits:
Object
  • Object
show all
Includes:
FFI::DRY::ErrnoHelper
Defined in:
lib/marquise.rb,
lib/marquise/ffi.rb

Overview

A Vaultaire data point transport

Instances of ‘Marquise` send data points that they are told about to a Vaultaire data store. It has a very simple interface that hides a lot of complexity.

Defined Under Namespace

Modules: FFI Classes: Janitor

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(zmq_url, batch_period = 5) ⇒ Marquise

Create a new ‘Marquise` transport object

‘zmq_url` is the URL to your ZeroMQ broker associated with the Vaultaire system you’re dumping data into. ‘batch_period` is optional (defaults to `5`) is the number of seconds between “flushes” of data points to ZeroMQ. It can be a floating-point number if you wish to have sub-second flushes. Increasing the `batch_period` increases the possibility of losing data points in the event of a spectacular failure, but also improves performance.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/marquise.rb', line 43

def initialize(zmq_url, batch_period = 5)
	if ENV['LIBMARQUISE_ORIGIN'].nil?
		raise RuntimeError,
		      "LIBMARQUISE_ORIGIN env var not set -- Marquise will not work"
	end

	@consumer = Marquise::FFI.marquise_consumer_new(zmq_url, batch_period)

	if @consumer.nil?
		raise RuntimeError,
		      "libmarquise failed; check syslog (no, seriously)"
	end

	$stderr.puts "Consumer is #{@consumer.inspect}" if $DEBUG
	@connections = {}

	@janitor = Janitor.new(@consumer, @connections)
	ObjectSpace.define_finalizer(self, @janitor)
end

Class Method Details

.open(zmq_url, batch_period = 5) ⇒ Object

Open a Marquise consumer and (optionally) yield it to a block

With no associated block, ‘Marquise.open` is a synonym for `Marquise.new`. If the optional block is given, a newly created `Marquise` object will be passed to the block, and will then be automatically closed when the block terminates, and the value of the block returned.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/marquise.rb', line 19

def self.open(zmq_url, batch_period = 5)
	m = Marquise.new(zmq_url, batch_period)
	rv = m
	
	if block_given?
		begin
			rv = yield m
		ensure
			m.close
		end
	end
	
	rv
end

Instance Method Details

#closeObject

Close a Marquise instance

This must be called when you are done with your Marquise instance, to avoid leaving memory and file descriptors.



149
150
151
152
153
154
# File 'lib/marquise.rb', line 149

def close
	@janitor.call
	ObjectSpace.undefine_finalizer(self)
	@connections = {}
	@consumer = nil
end

#connectObject

:stopdoc: Initialize a connection

You should rarely have to call this method yourself; Marquise will do it automatically for you when required.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/marquise.rb', line 161

def connect
	th = Thread.current
	
	return if @connections[th]
	
	@connections[th] = Marquise::FFI.marquise_connect(@consumer)
	
	if @connections[th].nil?
		raise RuntimeError.new("marquise_connect() failed... consult syslog (no, seriously)")
	end

	$stderr.puts "Created new connection #{@connections[th].inspect} for #{th.inspect}" if $DEBUG

	nil
end

#connectionObject

Get the connection pointer for the current thread



178
179
180
181
182
# File 'lib/marquise.rb', line 178

def connection
	self.connect
	
	@connections[Thread.current]
end

#tell(*args) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/marquise.rb', line 63

def tell(*args)
	if @consumer.nil?
		raise IOError,
		      "Connection has been closed"
	end

	if connection.nil?
		raise RuntimeError,
		      "Connection is nil -- this is a bad thing"
	end

	val, ts, opts = parse_tell_opts(args)
			
	k, v, len = if opts.length == 0
		[nil, nil, 0]
	else
		[
		 Marquise::FFI.pointer_list_from_string_array(opts.keys),
		 Marquise::FFI.pointer_list_from_string_array(opts.values),
		 opts.length
		]
	end
		
	rv = if val.nil?
		Marquise::FFI.marquise_send_counter(
		                connection,
		                k,
		                v,
		                len,
		                (ts.to_f * 1_000_000_000).to_i
		              )
	elsif val.respond_to? :to_str and val.respond_to? :encoding
		s = val.to_str
		method = nil
		
		if s.encoding.to_s == 'ASCII-8BIT' or !s.force_encoding('UTF-8').valid_encoding?
			method = :marquise_send_binary
		else
			method = :marquise_send_text
			s = s.encode('UTF-8')
		end

		Marquise::FFI.send(
		                method,
		                connection,
		                k,
		                v,
		                len,
		                s,
		                s.length,
		                ts.to_f * 1_000_000_000
		              )
	elsif val.respond_to? :integer? and val.integer?
		if val < -(2**63)+1 or val > (2**63)-1
			raise ArgumentError,
			      "Integer out of range for Marquise#tell"
		end
		
		Marquise::FFI.marquise_send_int(
		                connection,
		                k,
		                v,
		                len,
		                val,
		                ts.to_f * 1_000_000_000
		              )
	elsif val.respond_to? :integer? and !val.integer?
		Marquise::FFI.marquise_send_real(
		                connection,
		                k,
		                v,
		                len,
		                val,
		                ts.to_f * 1_000_000_000
		              )
	end
	
	if rv == -1
		raise errno_exception
	end
end