Class: Analysand::ChangeWatcher
- Inherits:
-
Object
- Object
- Analysand::ChangeWatcher
- Includes:
- ConnectionTesting, Celluloid::IO, Celluloid::Logger, Rack::Utils
- Defined in:
- lib/analysand/change_watcher.rb
Overview
A Celluloid::IO actor that watches the changes feed of a CouchDB database. When a change is received, it passes the change to a #process method.
ChangeWatchers monitor changes using continuous mode and set up a heartbeat to fire approximately every 10 seconds.
ChangeWatchers begin watching for changes as soon as they are initialized. To send a shutdown message:
a.stop
The watcher will terminate on the next heartbeat.
Failure modes
ChangeWatcher deals with the following failures in the following ways:
-
If Errno::ECONNREFUSED is raised whilst connecting to CouchDB, it will retry the connection in 30 seconds.
-
If the connection to CouchDB’s changes feed is abruptly terminated, it dies.
-
If an exception is raised during HTTP or JSON parsing, it dies.
Situations where the actor dies should be handled by a supervisor.
Example usage
class Accumulator < Analysand::ChangeWatcher
attr_accessor :results
def initialize(database)
super(database)
self.results = []
end
def process(change)
results << change
# Once a ChangeWatcher has successfully processed a change, it
# SHOULD invoke #change_processed.
change_processed(change)
end
end
a = Accumulator.new('http://localhost:5984/mydb')
# or with supervision:
a = Accumulator.supervise('http://localhost:5984/mydb')
Defined Under Namespace
Classes: Waiter
Constant Summary collapse
- QUANTUM =
Read at most this many bytes off the socket at a time.
4096
Instance Method Summary collapse
-
#change_processed(change) ⇒ Object
Notify waiters.
-
#changes_feed_uri ⇒ Object
The URI of the changes feed.
- #connect ⇒ Object
-
#connection_ok ⇒ Object
The connection_ok method is called before connecting to the changes feed.
-
#customize_query(query) ⇒ Object
Can be used to set query parameters.
-
#customize_request(req) ⇒ Object
Can be used to add headers.
- #disconnect ⇒ Object
-
#initialize(database) ⇒ ChangeWatcher
constructor
Checks services.
-
#on_body(chunk) ⇒ Object
Http::Parser callback.
-
#on_headers_complete(parser) ⇒ Object
Http::Parser callback.
- #prepare_request ⇒ Object
-
#process(change) ⇒ Object
This method should implement your change-processing logic.
- #start ⇒ Object
- #stop ⇒ Object
-
#waiter_for(id) ⇒ Object
Returns an object that can be used to block a thread until a document with the given ID has been processed.
Methods included from ConnectionTesting
#test_http_connection, #wait_for_http_service
Constructor Details
#initialize(database) ⇒ ChangeWatcher
Checks services. If all services pass muster, enters a read loop.
The database parameter may be either a URL-as-string or a Analysand::Database.
If overriding the initializer, you MUST call super.
81 82 83 84 85 86 87 88 89 |
# File 'lib/analysand/change_watcher.rb', line 81 def initialize(database) @db = database @waiting = {} @http_parser = ::Http::Parser.new(self) @json_parser = Yajl::Parser.new @json_parser.on_parse_complete = lambda { |doc| process(doc) } async.start end |
Instance Method Details
#change_processed(change) ⇒ Object
Notify waiters.
221 222 223 |
# File 'lib/analysand/change_watcher.rb', line 221 def change_processed(change) @waiting.delete(change['id']) end |
#changes_feed_uri ⇒ Object
The URI of the changes feed. This URI incorporates any changes made by customize_query.
93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/analysand/change_watcher.rb', line 93 def changes_feed_uri query = { 'feed' => 'continuous', 'heartbeat' => '10000' } customize_query(query) uri = (@db.respond_to?(:uri) ? @db.uri : URI(@db)).dup uri.path += '/_changes' uri.query = build_query(query) uri end |
#connect ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/analysand/change_watcher.rb', line 245 def connect req = prepare_request uri = changes_feed_uri info "#{self.class} connecting to #{req.path}" @socket = TCPSocket.new(uri.host, uri.port) # Make the request. data = [ "GET #{req.path} HTTP/1.1" ] req.each_header { |k, v| data << "#{k}: #{v}" } @socket.write(data.join("\r\n")) @socket.write("\r\n\r\n") end |
#connection_ok ⇒ Object
The connection_ok method is called before connecting to the changes feed. By default, it checks that there’s an HTTP service listening on the changes feed.
If the method returns true, then we connect to the changes feed and begin processing. If it returns false, a warning message is logged and the connection check will be retried in 30 seconds.
This method can be overridden if you need to check additional services. When you override the method, make sure that you don’t discard the return value of the original definition:
# Wrong
def connection_ok
super
...
end
# Right
def connection_ok
ok = super
ok && my_other_test
end
131 132 133 134 135 |
# File 'lib/analysand/change_watcher.rb', line 131 def connection_ok test_http_connection(changes_feed_uri) do |req| customize_request(req) end end |
#customize_query(query) ⇒ Object
Can be used to set query parameters. query is a Hash. The query hash has two default parameters:
| Key | Value | | feed | continuous | | heartbeat | 10000 |
It is NOT RECOMMENDED that they be changed.
By default, this does nothing. Provide behavior in a subclass.
177 178 |
# File 'lib/analysand/change_watcher.rb', line 177 def customize_query(query) end |
#customize_request(req) ⇒ Object
Can be used to add headers. req is a Net::HTTP::Get instance.
By default, this does nothing. Provide behavior in a subclass.
184 185 |
# File 'lib/analysand/change_watcher.rb', line 184 def customize_request(req) end |
#disconnect ⇒ Object
266 267 268 |
# File 'lib/analysand/change_watcher.rb', line 266 def disconnect @socket.close if @socket && !@socket.closed? end |
#on_body(chunk) ⇒ Object
Http::Parser callback.
239 240 241 |
# File 'lib/analysand/change_watcher.rb', line 239 def on_body(chunk) @json_parser << chunk end |
#on_headers_complete(parser) ⇒ Object
Http::Parser callback.
229 230 231 232 233 |
# File 'lib/analysand/change_watcher.rb', line 229 def on_headers_complete(parser) status = @http_parser.status_code.to_i raise "Request failed: expected status 200, got #{status}" unless status == 200 end |
#prepare_request ⇒ Object
274 275 276 277 278 |
# File 'lib/analysand/change_watcher.rb', line 274 def prepare_request Net::HTTP::Get.new(changes_feed_uri.to_s).tap do |req| customize_request(req) end end |
#process(change) ⇒ Object
This method should implement your change-processing logic.
change is a Hash containing keys id, seq, and changes. See [0] for more information.
By default, this does nothing. Provide behavior in a subclass.
196 197 |
# File 'lib/analysand/change_watcher.rb', line 196 def process(change) end |
#start ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/analysand/change_watcher.rb', line 137 def start return if @started @started = true while !connection_ok error "Some services used by #{self.class.name} did not check out ok; will retry in 30 seconds" sleep 30 end connect info "#{self.class} entering read loop" @running = true while @running @http_parser << @socket.readpartial(QUANTUM) end # Once we're done, close things up. @started = false @socket.close end |
#stop ⇒ Object
162 163 164 |
# File 'lib/analysand/change_watcher.rb', line 162 def stop @running = false end |
#waiter_for(id) ⇒ Object
Returns an object that can be used to block a thread until a document with the given ID has been processed.
Intended for testing.
208 209 210 211 212 213 214 215 216 217 |
# File 'lib/analysand/change_watcher.rb', line 208 def waiter_for(id) @waiting[id] = true Waiter.new do loop do break true if !@waiting[id] sleep 0.1 end end end |