Class: CouchdbToSql::Changes

Inherits:
Object
  • Object
show all
Defined in:
lib/couchdb_to_sql/changes.rb

Overview

rubocop:disable ClassLength

Constant Summary collapse

COUCHDB_HEARTBEAT =
30
INACTIVITY_TIMEOUT =
70
RECONNECT_TIMEOUT =
15

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = '', &block) ⇒ Changes

Start a new Changes instance by connecting to the provided CouchDB to see if the database exists.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/couchdb_to_sql/changes.rb', line 15

def initialize(opts = '', &block)
  raise 'Block required for changes!' unless block_given?

  @schemas  = {}
  @handlers = []
  @source   = CouchRest.database(opts)
  @http     = HTTPClient.new
  @http.debug_dev = STDOUT if ENV.key?('DEBUG')
  @skip_seqs = Set.new

  log_info 'Connected to CouchDB'

  @ember_pouch_mode = false
  @fail_on_unhandled_document = false
  @upsert_mode = false

  # Prepare the definitions
  @dsl_mode = true
  instance_eval(&block)
  @dsl_mode = false
end

Instance Attribute Details

#handlersObject (readonly)

Returns the value of attribute handlers.



9
10
11
# File 'lib/couchdb_to_sql/changes.rb', line 9

def handlers
  @handlers
end

#highest_sequenceObject

Returns the value of attribute highest_sequence.



11
12
13
# File 'lib/couchdb_to_sql/changes.rb', line 11

def highest_sequence
  @highest_sequence
end

#schemasObject (readonly)

Returns the value of attribute schemas.



9
10
11
# File 'lib/couchdb_to_sql/changes.rb', line 9

def schemas
  @schemas
end

#sourceObject (readonly)

Returns the value of attribute source.



9
10
11
# File 'lib/couchdb_to_sql/changes.rb', line 9

def source
  @source
end

Instance Method Details

#database(opts = nil) ⇒ Object

Note:

Dual-purpose method, accepts configuration of database

or returns a previous definition.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/couchdb_to_sql/changes.rb', line 79

def database(opts = nil)
  if opts
    @database ||= begin
      Sequel.connect(opts).tap { |conn|
        next unless ENV.key?('SEQUEL_LOG_LEVEL')

        conn.logger = LoggingLibrary::LoggerFactory.create(self.class.name).tap { |l|
          l.level = ENV['SEQUEL_LOG_LEVEL'].to_s.downcase.to_sym
        }
      }
    end
    find_or_create_sequence_number
  end
  @database
end

#document(filter = {}, &block) ⇒ Object



95
96
97
# File 'lib/couchdb_to_sql/changes.rb', line 95

def document(filter = {}, &block)
  @handlers << DocumentHandler.new(self, filter, &block)
end

#ember_pouch_modeObject

Note:

Dual-purpose method, accepts configuration of setting or returns a previous definition.

Sets the ‘ember_pouch_mode` flag. In `ember-pouch` mode, all the data fields are expected to reside within a `data` node in the document. More information on `ember-pouch` can be found [here](github.com/nolanlawson/ember-pouch).



44
45
46
47
48
49
50
# File 'lib/couchdb_to_sql/changes.rb', line 44

def ember_pouch_mode
  if @dsl_mode
    @ember_pouch_mode ||= true
  else
    @ember_pouch_mode
  end
end

#fail_on_unhandled_documentObject

Note:

Dual-purpose method, accepts configuration of setting or returns a previous definition.

Sets the “fail on unhandled document” flag, which will turn log errors into runtime exceptions if an unhandled document is encountered.



69
70
71
72
73
74
75
# File 'lib/couchdb_to_sql/changes.rb', line 69

def fail_on_unhandled_document
  if @dsl_mode
    @fail_on_unhandled_document ||= true
  else
    @fail_on_unhandled_document
  end
end

#log_debug(message) ⇒ Object



118
119
120
# File 'lib/couchdb_to_sql/changes.rb', line 118

def log_debug(message)
  logger.debug "#{source.name}: #{message}"
end

#log_error(message) ⇒ Object



126
127
128
# File 'lib/couchdb_to_sql/changes.rb', line 126

def log_error(message)
  logger.error "#{source.name}: #{message}"
end

#log_info(message) ⇒ Object



122
123
124
# File 'lib/couchdb_to_sql/changes.rb', line 122

def log_info(message)
  logger.info "#{source.name}: #{message}"
end

#schema(name) ⇒ Object

END DSL



107
108
109
# File 'lib/couchdb_to_sql/changes.rb', line 107

def schema(name)
  @schemas[name.to_sym] ||= Schema.new(database, name)
end

#skip_seqs_file(file_path) ⇒ Object



99
100
101
102
103
# File 'lib/couchdb_to_sql/changes.rb', line 99

def skip_seqs_file(file_path)
  file_contents = File.read(file_path)
  seqs = JSON.parse(file_contents)
  @skip_seqs |= Set.new(seqs)
end

#startObject

Start listening to the CouchDB changes feed. By this stage we should have a sequence id so we know where to start from and all the filters should have been prepared.



114
115
116
# File 'lib/couchdb_to_sql/changes.rb', line 114

def start
  perform_request
end

#upsert_modeObject

Note:

Dual-purpose method, accepts configuration of setting or returns a previous definition.

Sets the ‘upsert_mode` flag. When running in upsert mode, Sequel’s insert_conflict mode is being used. More information about that can be found [here](sequel.jeremyevans.net/rdoc/files/doc/postgresql_rdoc.html#label-INSERT+ON+CONFLICT+Support)



57
58
59
60
61
62
63
# File 'lib/couchdb_to_sql/changes.rb', line 57

def upsert_mode
  if @dsl_mode
    @upsert_mode ||= true
  else
    @upsert_mode
  end
end