Class: Analysand::ChangeWatcher

Inherits:
Object
  • Object
show all
Includes:
ConnectionTesting, Celluloid::IO, Celluloid::Logger, Rack::Utils
Defined in:
lib/analysand/change_watcher.rb

Overview

A Celluloid::IO actor that watches the changes feed of a CouchDB database. When a change is received, it passes the change to a #process method.

ChangeWatchers monitor changes using continuous mode and set up a heartbeat to fire approximately every 10 seconds.

ChangeWatchers begin watching for changes as soon as they are initialized. To send a shutdown message:

a.stop

The watcher will terminate on the next heartbeat.

Failure modes

ChangeWatcher deals with the following failures in the following ways:

  • If Errno::ECONNREFUSED is raised whilst connecting to CouchDB, it will retry the connection in 30 seconds.

  • If the connection to CouchDB’s changes feed is abruptly terminated, it dies.

  • If an exception is raised during HTTP or JSON parsing, it dies.

Situations where the actor dies should be handled by a supervisor.

Example usage

class Accumulator < Analysand::ChangeWatcher
  attr_accessor :results

  def initialize(database)
    super(database)

    self.results = []
  end

  def process(change)
    results << change

    # Once a ChangeWatcher has successfully processed a change, it
    # SHOULD invoke #change_processed.
    change_processed(change)
  end
end

a = Accumulator.new('http://localhost:5984/mydb')

# or with supervision:
a = Accumulator.supervise('http://localhost:5984/mydb')

Defined Under Namespace

Classes: Waiter

Constant Summary collapse

QUANTUM =

Read at most this many bytes off the socket at a time.

4096

Instance Method Summary collapse

Methods included from ConnectionTesting

#test_http_connection, #wait_for_http_service

Constructor Details

#initialize(database) ⇒ ChangeWatcher

Checks services. If all services pass muster, enters a read loop.

The database parameter may be either a URL-as-string or a Analysand::Database.

If overriding the initializer, you MUST call super.



81
82
83
84
85
86
87
88
89
# File 'lib/analysand/change_watcher.rb', line 81

def initialize(database)
  @db = database
  @waiting = {}
  @http_parser = ::Http::Parser.new(self)
  @json_parser = Yajl::Parser.new
  @json_parser.on_parse_complete = lambda { |doc| process(doc) }

  async.start
end

Instance Method Details

#change_processed(change) ⇒ Object

Notify waiters.



221
222
223
# File 'lib/analysand/change_watcher.rb', line 221

def change_processed(change)
  @waiting.delete(change['id'])
end

#changes_feed_uriObject

The URI of the changes feed. This URI incorporates any changes made by customize_query.



93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/analysand/change_watcher.rb', line 93

def changes_feed_uri
  query = {
    'feed' => 'continuous',
    'heartbeat' => '10000'
  }

  customize_query(query)

  uri = (@db.respond_to?(:uri) ? @db.uri : URI(@db)).dup
  uri.path += '/_changes'
  uri.query = build_query(query)
  uri
end

#connectObject



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/analysand/change_watcher.rb', line 245

def connect
  req = prepare_request
  uri = changes_feed_uri

  info "#{self.class} connecting to #{req.path}"

  @socket = TCPSocket.new(uri.host, uri.port)

  # Make the request.
  data = [
    "GET #{req.path} HTTP/1.1"
  ]

  req.each_header { |k, v| data << "#{k}: #{v}" }

  @socket.write(data.join("\r\n"))
  @socket.write("\r\n\r\n")
end

#connection_okObject

The connection_ok method is called before connecting to the changes feed. By default, it checks that there’s an HTTP service listening on the changes feed.

If the method returns true, then we connect to the changes feed and begin processing. If it returns false, a warning message is logged and the connection check will be retried in 30 seconds.

This method can be overridden if you need to check additional services. When you override the method, make sure that you don’t discard the return value of the original definition:

# Wrong
def connection_ok
  super
  ...
end

# Right
def connection_ok
  ok = super

  ok && my_other_test
end


131
132
133
134
135
# File 'lib/analysand/change_watcher.rb', line 131

def connection_ok
  test_http_connection(changes_feed_uri) do |req|
    customize_request(req)
  end
end

#customize_query(query) ⇒ Object

Can be used to set query parameters. query is a Hash. The query hash has two default parameters:

| Key | Value | | feed | continuous | | heartbeat | 10000 |

It is NOT RECOMMENDED that they be changed.

By default, this does nothing. Provide behavior in a subclass.



177
178
# File 'lib/analysand/change_watcher.rb', line 177

def customize_query(query)
end

#customize_request(req) ⇒ Object

Can be used to add headers. req is a Net::HTTP::Get instance.

By default, this does nothing. Provide behavior in a subclass.



184
185
# File 'lib/analysand/change_watcher.rb', line 184

def customize_request(req)
end

#disconnectObject



266
267
268
# File 'lib/analysand/change_watcher.rb', line 266

def disconnect
  @socket.close if @socket && !@socket.closed?
end

#on_body(chunk) ⇒ Object

Http::Parser callback.



239
240
241
# File 'lib/analysand/change_watcher.rb', line 239

def on_body(chunk)
  @json_parser << chunk
end

#on_headers_complete(parser) ⇒ Object

Http::Parser callback.



229
230
231
232
233
# File 'lib/analysand/change_watcher.rb', line 229

def on_headers_complete(parser)
  status = @http_parser.status_code.to_i

  raise "Request failed: expected status 200, got #{status}" unless status == 200
end

#prepare_requestObject



274
275
276
277
278
# File 'lib/analysand/change_watcher.rb', line 274

def prepare_request
  Net::HTTP::Get.new(changes_feed_uri.to_s).tap do |req|
    customize_request(req)
  end
end

#process(change) ⇒ Object

This method should implement your change-processing logic.

change is a Hash containing keys id, seq, and changes. See [0] for more information.

By default, this does nothing. Provide behavior in a subclass.

[0]: guide.couchdb.org/draft/notifications.html#continuous



196
197
# File 'lib/analysand/change_watcher.rb', line 196

def process(change)
end

#startObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/analysand/change_watcher.rb', line 137

def start
  return if @started

  @started = true

  while !connection_ok
    error "Some services used by #{self.class.name} did not check out ok; will retry in 30 seconds"
    sleep 30
  end

  connect

  info "#{self.class} entering read loop"

  @running = true

  while @running
    @http_parser << @socket.readpartial(QUANTUM)
  end

  # Once we're done, close things up.
  @started = false
  @socket.close
end

#stopObject



162
163
164
# File 'lib/analysand/change_watcher.rb', line 162

def stop
  @running = false
end

#waiter_for(id) ⇒ Object

Returns an object that can be used to block a thread until a document with the given ID has been processed.

Intended for testing.



208
209
210
211
212
213
214
215
216
217
# File 'lib/analysand/change_watcher.rb', line 208

def waiter_for(id)
  @waiting[id] = true

  Waiter.new do
    loop do
      break true if !@waiting[id]
      sleep 0.1
    end
  end
end