Class: Core::ConnectorJob

Inherits:
Object
  • Object
show all
Defined in:
lib/core/connector_job.rb

Constant Summary collapse

DEFAULT_PAGE_SIZE =
100
IDLE_THRESHOLD =
60

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.delete_jobs(jobs) ⇒ Object



44
45
46
47
# File 'lib/core/connector_job.rb', line 44

def self.delete_jobs(jobs)
  query = { terms: { '_id': jobs.map(&:id) } }
  ElasticConnectorActions.delete_jobs_by_query(query)
end

.enqueue(_connector_id) ⇒ Object



67
68
69
# File 'lib/core/connector_job.rb', line 67

def self.enqueue(_connector_id)
  nil
end

.fetch_by_id(job_id) ⇒ Object



20
21
22
23
24
25
# File 'lib/core/connector_job.rb', line 20

def self.fetch_by_id(job_id)
  es_response = ElasticConnectorActions.get_job(job_id)
  return nil unless es_response[:found]

  new(es_response)
end

.idle_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/core/connector_job.rb', line 49

def self.idle_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE)
  connector_ids = if connector_id
                    [connector_id]
                  else
                    ConnectorSettings.fetch_native_connectors.map(&:id)
                  end
  query = {
    bool: {
      filter: [
          { terms: { 'connector.id': connector_ids } },
          { terms: { status: Connectors::SyncStatus::ACTIVE_STATUSES } },
          { range: { last_seen: { lte: "now-#{IDLE_THRESHOLD}s" } } }
      ]
    }
  }
  fetch_jobs_by_query(query, page_size)
end

.orphaned_jobs(connector_ids = [], page_size = DEFAULT_PAGE_SIZE) ⇒ Object



39
40
41
42
# File 'lib/core/connector_job.rb', line 39

def self.orphaned_jobs(connector_ids = [], page_size = DEFAULT_PAGE_SIZE)
  query = { bool: { must_not: { terms: { 'connector.id': connector_ids } } } }
  fetch_jobs_by_query(query, page_size)
end

.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/core/connector_job.rb', line 27

def self.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE)
  status_term = { status: Connectors::SyncStatus::PENDING_STATUSES }

  query = { bool: { must: [{ terms: status_term }] } }

  return fetch_jobs_by_query(query, page_size) if connectors_ids.empty?

  query[:bool][:must] << { terms: { 'connector.id' => connectors_ids } }

  fetch_jobs_by_query(query, page_size)
end

Instance Method Details

#[](property_name) ⇒ Object



75
76
77
# File 'lib/core/connector_job.rb', line 75

def [](property_name)
  @elasticsearch_response[:_source][property_name]
end

#active?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/core/connector_job.rb', line 107

def active?
  Connectors::SyncStatus::ACTIVE_STATUSES.include?(status)
end

#cancel!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object



178
179
180
# File 'lib/core/connector_job.rb', line 178

def cancel!(ingestion_stats = {},  = {})
  terminate!(Connectors::SyncStatus::CANCELED, nil, ingestion_stats, )
end

#canceled?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/core/connector_job.rb', line 99

def canceled?
  status == Connectors::SyncStatus::CANCELED
end

#canceling?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/core/connector_job.rb', line 91

def canceling?
  status == Connectors::SyncStatus::CANCELING
end

#configurationObject



135
136
137
# File 'lib/core/connector_job.rb', line 135

def configuration
  connector_snapshot[:configuration]
end

#connectorObject



159
160
161
# File 'lib/core/connector_job.rb', line 159

def connector
  @connector ||= ConnectorSettings.fetch_by_id(connector_id)
end

#connector_idObject



119
120
121
# File 'lib/core/connector_job.rb', line 119

def connector_id
  connector_snapshot[:id]
end

#connector_snapshotObject



115
116
117
# File 'lib/core/connector_job.rb', line 115

def connector_snapshot
  self[:connector] || {}
end

#done!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object



170
171
172
# File 'lib/core/connector_job.rb', line 170

def done!(ingestion_stats = {},  = {})
  terminate!(Connectors::SyncStatus::COMPLETED, nil, ingestion_stats, )
end

#errorObject



79
80
81
# File 'lib/core/connector_job.rb', line 79

def error
  self[:error]
end

#error!(message, ingestion_stats = {}, connector_metadata = {}) ⇒ Object



174
175
176
# File 'lib/core/connector_job.rb', line 174

def error!(message, ingestion_stats = {},  = {})
  terminate!(Connectors::SyncStatus::ERROR, message, ingestion_stats, )
end

#es_sourceObject



202
203
204
# File 'lib/core/connector_job.rb', line 202

def es_source
  @elasticsearch_response[:_source]
end

#extract_binary_content?Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/core/connector_job.rb', line 147

def extract_binary_content?
  pipeline[:extract_binary_content]
end

#filteringObject



139
140
141
# File 'lib/core/connector_job.rb', line 139

def filtering
  connector_snapshot[:filtering]
end

#idObject



71
72
73
# File 'lib/core/connector_job.rb', line 71

def id
  @elasticsearch_response[:_id]
end

#in_progress?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/core/connector_job.rb', line 87

def in_progress?
  status == Connectors::SyncStatus::IN_PROGRESS
end

#index_nameObject



123
124
125
# File 'lib/core/connector_job.rb', line 123

def index_name
  connector_snapshot[:index_name]
end

#languageObject



127
128
129
# File 'lib/core/connector_job.rb', line 127

def language
  connector_snapshot[:language]
end

#make_running!Object



188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/core/connector_job.rb', line 188

def make_running!
  with_concurrency_control do |es_doc, seq_no, primary_term|
    now = Time.now
    doc = {
      status: Connectors::SyncStatus::IN_PROGRESS,
      started_at: now,
      last_seen: now,
      worker_hostname: Socket.gethostname
    }

    ElasticConnectorActions.update_job_fields(es_doc[:_id], doc, seq_no, primary_term)
  end
end

#pending?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/core/connector_job.rb', line 103

def pending?
  Connectors::SyncStatus::PENDING_STATUSES.include?(status)
end

#pipelineObject



143
144
145
# File 'lib/core/connector_job.rb', line 143

def pipeline
  connector_snapshot[:pipeline] || {}
end

#reduce_whitespace?Boolean

Returns:

  • (Boolean)


151
152
153
# File 'lib/core/connector_job.rb', line 151

def reduce_whitespace?
  pipeline[:reduce_whitespace]
end

#run_ml_inference?Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/core/connector_job.rb', line 155

def run_ml_inference?
  pipeline[:run_ml_inference]
end

#service_typeObject



131
132
133
# File 'lib/core/connector_job.rb', line 131

def service_type
  connector_snapshot[:service_type]
end

#statusObject



83
84
85
# File 'lib/core/connector_job.rb', line 83

def status
  self[:status]
end

#suspended?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/core/connector_job.rb', line 95

def suspended?
  status == Connectors::SyncStatus::SUSPENDED
end

#terminated?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/core/connector_job.rb', line 111

def terminated?
  Connectors::SyncStatus::TERMINAL_STATUSES.include?(status)
end

#update_metadata(ingestion_stats = {}, connector_metadata = {}) ⇒ Object



163
164
165
166
167
168
# File 'lib/core/connector_job.rb', line 163

def (ingestion_stats = {},  = {})
  ingestion_stats ||= {}
  doc = { :last_seen => Time.now }.merge(ingestion_stats)
  doc[:metadata] =  if &.any?
  ElasticConnectorActions.update_job_fields(id, doc)
end

#with_concurrency_control {|response, , | ... } ⇒ Object

Yields:

  • (response, , )


182
183
184
185
186
# File 'lib/core/connector_job.rb', line 182

def with_concurrency_control
  response = ElasticConnectorActions.get_job(id)

  yield response, response['_seq_no'], response['_primary_term']
end