Class: Marquise

Inherits:
Object
  • Object
show all
Includes:
FFI::DRY::ErrnoHelper
Defined in:
lib/marquise.rb,
lib/marquise/ffi.rb

Overview

A Vaultaire data point transport

Instances of ‘Marquise` send data points that they are told about to a Vaultaire data store. It has a very simple interface that hides a lot of complexity.

Defined Under Namespace

Modules: FFI Classes: Janitor

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(zmq_url, batch_period = 5) ⇒ Marquise

Create a new ‘Marquise` transport object

‘zmq_url` is the URL to your ZeroMQ broker associated with the Vaultaire system you’re dumping data into. ‘batch_period` is optional (defaults to `5`) is the number of seconds between “flushes” of data points to ZeroMQ. It can be a floating-point number if you wish to have sub-second flushes. Increasing the `batch_period` increases the possibility of losing data points in the event of a spectacular failure, but also improves performance.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/marquise.rb', line 43

def initialize(zmq_url, batch_period = 5)
	@consumer = Marquise::FFI.marquise_consumer_new(zmq_url, batch_period)
	
	if @consumer.nil?
		raise RuntimeError,
		      "libmarquise failed; check syslog (no, seriously)"
	end

	$stderr.puts "Consumer is #{@consumer.inspect}" if $DEBUG
	@connections = {}

	@janitor = Janitor.new(@consumer, @connections)
	ObjectSpace.define_finalizer(self, @janitor)
end

Class Method Details

.open(zmq_url, batch_period = 5) ⇒ Object

Open a Marquise consumer and (optionally) yield it to a block

With no associated block, ‘Marquise.open` is a synonym for `Marquise.new`. If the optional block is given, a newly created `Marquise` object will be passed to the block, and will then be automatically closed when the block terminates, and the value of the block returned.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/marquise.rb', line 19

def self.open(zmq_url, batch_period = 5)
	m = Marquise.new(zmq_url, batch_period)
	rv = m
	
	if block_given?
		begin
			rv = yield m
		ensure
			m.close
		end
	end
	
	rv
end

Instance Method Details

#closeObject

Close a Marquise instance

This must be called when you are done with your Marquise instance, to avoid leaving memory and file descriptors.



139
140
141
142
143
144
# File 'lib/marquise.rb', line 139

def close
	@janitor.call
	ObjectSpace.undefine_finalizer(self)
	@connections = {}
	@consumer = nil
end

#connectObject

:stopdoc: Initialize a connection

You should rarely have to call this method yourself; Marquise will do it automatically for you when required.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/marquise.rb', line 151

def connect
	th = Thread.current
	
	return if @connections[th]
	
	@connections[th] = Marquise::FFI.marquise_connect(@consumer)
	
	if @connections[th].nil?
		raise RuntimeError.new("marquise_connect() failed... consult syslog (no, seriously)")
	end

	$stderr.puts "Created new connection #{@connections[th].inspect} for #{th.inspect}" if $DEBUG

	nil
end

#connectionObject

Get the connection pointer for the current thread



168
169
170
171
172
# File 'lib/marquise.rb', line 168

def connection
	self.connect
	
	@connections[Thread.current]
end

#tell(*args) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/marquise.rb', line 58

def tell(*args)
	if @consumer.nil?
		raise IOError,
		      "Connection has been closed"
	end

	val, ts, opts = parse_tell_opts(args)
			
	k, v, len = if opts.length == 0
		[nil, nil, 0]
	else
		[
		 Marquise::FFI.pointer_list_from_string_array(opts.keys),
		 Marquise::FFI.pointer_list_from_string_array(opts.values),
		 opts.length
		]
	end
		
	rv = if val.nil?
		Marquise::FFI.marquise_send_counter(
		                connection,
		                k,
		                v,
		                len,
		                ts.to_f * 1_000_000_000
		              )
	elsif val.respond_to? :to_str and val.respond_to? :encoding
		s = val.to_str
		method = nil
		
		if s.encoding.to_s == 'ASCII-8BIT' or !s.force_encoding('UTF-8').valid_encoding?
			method = :marquise_send_binary
		else
			method = :marquise_send_text
			s = s.encode('UTF-8')
		end

		Marquise::FFI.send(
		                method,
		                connection,
		                k,
		                v,
		                len,
		                s,
		                s.length,
		                ts.to_f * 1_000_000_000
		              )
	elsif val.respond_to? :integer? and val.integer?
		if val < -(2**63)+1 or val > (2**63)-1
			raise ArgumentError,
			      "Integer out of range for Marquise#tell"
		end
		
		Marquise::FFI.marquise_send_int(
		                connection,
		                k,
		                v,
		                len,
		                val,
		                ts.to_f * 1_000_000_000
		              )
	elsif val.respond_to? :integer? and !val.integer?
		Marquise::FFI.marquise_send_real(
		                connection,
		                k,
		                v,
		                len,
		                val,
		                ts.to_f * 1_000_000_000
		              )
	end
	
	if rv == -1
		raise errno_exception
	end
end