Class: LogStash::Inputs::CouchDBChanges

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/couchdb_changes.rb

Overview

This CouchDB input allows you to automatically stream events from the CouchDB guide.couchdb.org/draft/notifications.html[_changes] URI. Moreover, any “future” changes will automatically be streamed as well making it easy to synchronize your CouchDB data with any target destination

### Upsert and delete You can use event metadata to allow for document deletion. All non-delete operations are treated as upserts

### Starting at a Specific Sequence The CouchDB input stores the last sequence number value in location defined by ‘sequence_path`. You can use this fact to start or resume the stream at a particular sequence.

Defined Under Namespace

Modules: SequenceDB

Constant Summary collapse

FEED =

Declare these constants here.

'continuous'
INCLUDEDOCS =
'true'

Instance Method Summary collapse

Instance Method Details

#registerObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/logstash/inputs/couchdb_changes.rb', line 97

def register
  require "logstash/util/buftok"
  if @sequence_path.nil?
    if ENV["HOME"].nil?
      @logger.error("No HOME environment variable set, I don't know where " \
                    "to keep track of the files I'm watching. Either set " \
                    "HOME in your environment, or set sequence_path in " \
                    "in your Logstash config.")
      raise ArgumentError
    end
    default_dir = ENV["HOME"]
    @sequence_path = File.join(default_dir, ".couchdb_seq")

    @logger.info("No sequence_path set, generating one...",
                 :sequence_path => @sequence_path)
  end

  @sequencedb   = SequenceDB::File.new(@sequence_path)
  @path         = '/' + @db + '/_changes'

  @scheme = @secure ? 'https' : 'http'

  if !@initial_sequence.nil?
    @logger.info("initial_sequence is set, writing to filesystem ...",
                 :initial_sequence => @initial_sequence, :sequence_path => @sequence_path)
    @sequencedb.write(@initial_sequence)
    @sequence = @initial_sequence
  else
    @logger.info("No initial_sequence set, reading from filesystem ...",
                 :sequence_path => @sequence_path)
    @sequence = @sequencedb.read
  end

end

#run(queue) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/logstash/inputs/couchdb_changes.rb', line 150

def run(queue)
  buffer = FileWatch::BufferedTokenizer.new
  @logger.info("Connecting to CouchDB _changes stream at:", :host => @host.to_s, :port => @port.to_s, :db => @db)
  uri = build_uri
  @logger.info("Using service uri :", :uri => uri)
  until stop?
    begin
      Net::HTTP.start(@host, @port, :use_ssl => (@secure == true), :ca_file => @ca_file) do |http|

        request = Net::HTTP::Get.new(uri.request_uri)
        request.basic_auth(@username, @password.value) if @username && @password
        http.request request do |response|
          raise ArgumentError, :message => "Server error!", :response_code => response.code if response.code >= "500"
          raise ArgumentError, :message => "Authentication error!", :response_code => response.code if response.code == "401"
          raise ArgumentError, :message => "Database not found!", :response_code => response.code if response.code == "404"
          raise ArgumentError, :message => "Request error!", :response_code => response.code if response.code >= "400"
          response.read_body do |chunk|
            buffer.extract(chunk).each do |changes|
              # Put a "stop" check here. If we stop here, anything we've read, but
              # not written, will be read again since the @sequence change won't
              # have been written to the file, ensuring that it will pick up where
              # it left off.
              break if stop?
              # If no changes come since the last heartbeat period, a blank line is
              # sent as a sort of keep-alive.  We should ignore those.
              next if changes.chomp.empty?
              if event = build_event(changes)
                @logger.debug("event", :event => event.) if @logger.debug?
                decorate(event)
                queue << event
                @sequence = event['@metadata']['seq']
                @sequencedb.write(@sequence.to_s)
              end
            end
          end
        end
      end
    rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Errno::EHOSTUNREACH, Errno::ECONNREFUSED,
      Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError, SocketError => e
      @logger.error("Connection problem encountered: Retrying connection in " + @reconnect_delay.to_s + " seconds...", :error => e.to_s, :host => @host.to_s, :port => @port.to_s, :db => @db)
      retry if reconnect?
    rescue Errno::EBADF => e
      @logger.error("Unable to connect: Bad file descriptor: ", :error => e.to_s)
      retry if reconnect?
    rescue ArgumentError => e
      @logger.error("Unable to connect to database", :db => @db, :error => e.to_s)
      retry if reconnect?
    end
  end
end