Class: Connectors::MongoDB::Connector

Inherits:
Base::Connector show all
Defined in:
lib/connectors/mongodb/connector.rb

Constant Summary collapse

ALLOWED_TOP_LEVEL_FILTER_KEYS =
%w[find aggregate]
AGGREGATE =
'aggregate'
FIND =
'find'
PAGE_SIZE =
100

Instance Attribute Summary

Attributes inherited from Base::Connector

#advanced_filter_config, #rules

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base::Connector

configurable_fields_indifferent_access, #do_health_check!, #is_healthy?, kibana_features, #metadata, simple_rules_validators, validate_filtering, #yield_with_handling_tolerable_errors

Constructor Details

#initialize(configuration: {}, job_description: nil) ⇒ Connector

Returns a new instance of Connector.



69
70
71
72
73
74
75
76
77
78
# File 'lib/connectors/mongodb/connector.rb', line 69

def initialize(configuration: {}, job_description: nil)
  super

  @host = @configuration.dig(:host, :value)
  @database = @configuration.dig(:database, :value)
  @collection = @configuration.dig(:collection, :value)
  @user = @configuration.dig(:user, :value)
  @password = @configuration.dig(:password, :value)
  @direct_connection = @configuration.dig(:direct_connection, :value)
end

Class Method Details

.advanced_snippet_validatorsObject



59
60
61
# File 'lib/connectors/mongodb/connector.rb', line 59

def self.advanced_snippet_validators
  MongoAdvancedSnippetAgainstSchemaValidator
end

.configurable_fieldsObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/connectors/mongodb/connector.rb', line 36

def self.configurable_fields
  {
    :host => {
      :label => 'Server Hostname'
    },
    :user => {
      :label => 'Username'
    },
    :password => {
      :label => 'Password'
    },
    :database => {
      :label => 'Database'
    },
    :collection => {
      :label => 'Collection'
    },
    :direct_connection => {
      :label => 'Direct connection? (true/false)'
    }
  }
end

.display_nameObject



32
33
34
# File 'lib/connectors/mongodb/connector.rb', line 32

def self.display_name
  'MongoDB'
end

.filter_transformersObject



63
64
65
66
67
# File 'lib/connectors/mongodb/connector.rb', line 63

def self.filter_transformers
  {
    Core::Filtering::Transform::TransformationTarget::ADVANCED_SNIPPET => [MongoAdvancedSnippetSnakeCaseTransformer]
  }
end

.service_typeObject



28
29
30
# File 'lib/connectors/mongodb/connector.rb', line 28

def self.service_type
  'mongodb'
end

Instance Method Details

#yield_documentsObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/connectors/mongodb/connector.rb', line 80

def yield_documents
  with_client do |client|
    # We do paging using skip().limit() here to make Ruby recycle the memory for each page pulled from the server after it's not needed any more.
    # This gives us more control on the usage of the memory (we can adjust PAGE_SIZE constant for that to decrease max memory consumption).
    # It's done due to the fact that usage of .find.each leads to memory leaks or overuse of memory - the whole result set seems to stay in memory
    # during the sync. Sometimes (not 100% sure) it even leads to a real leak, when the memory for these objects is never recycled.
    cursor_type, cursor_with_options = create_db_cursor_on_collection(client[@collection])
    cursor, options = cursor_with_options

    case cursor_type
    when FIND
      skip = 0
      found_overall = 0

      # if no overall limit is specified by filtering use -1 to not break ingestion, when no overall limit is specified (found_overall is only increased,
      # thus can never reach -1)
      overall_limit = Float::INFINITY

      if options.present?
        # there could be a skip parameter defined for filtering
        skip = options.fetch(:skip, skip)
        # there could be a limit parameter defined for filtering -> used for an overall limit (not a page limit, which was introduced for memory optimization)
        overall_limit = options.fetch(:limit, overall_limit)
      end

      overall_limit_reached = false

      loop do
        found_in_page = 0

        Utility::Logger.info("Requesting #{PAGE_SIZE} documents from MongoDB (Starting at #{skip})")
        view = cursor.skip(skip).limit(PAGE_SIZE)
        view.each do |document|
          yield_with_handling_tolerable_errors do
            yield serialize(document)
            found_in_page += 1
            found_overall += 1
            overall_limit_reached = found_overall >= overall_limit && overall_limit != Float::INFINITY
          end
          break if overall_limit_reached
        end

        page_was_empty = found_in_page == 0

        break if page_was_empty || overall_limit_reached

        skip += PAGE_SIZE
      end
    when AGGREGATE
      cursor.each do |document|
        yield_with_handling_tolerable_errors do
          yield serialize(document)
        end
      end
    else
      raise "Unknown retrieval function #{cursor_type} for MongoDB."
    end
  end
end