Class: Core::ConnectorJob

Inherits:
Object
  • Object
show all
Defined in:
lib/core/connector_job.rb

Constant Summary collapse

DEFAULT_PAGE_SIZE =
100
STUCK_THRESHOLD =
60

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.delete_jobs(jobs) ⇒ Object



45
46
47
48
# File 'lib/core/connector_job.rb', line 45

def self.delete_jobs(jobs)
  query = { terms: { '_id': jobs.map(&:id) } }
  ElasticConnectorActions.delete_jobs_by_query(query)
end

.enqueue(_connector_id) ⇒ Object



68
69
70
# File 'lib/core/connector_job.rb', line 68

def self.enqueue(_connector_id)
  nil
end

.fetch_by_id(job_id) ⇒ Object



20
21
22
23
24
25
# File 'lib/core/connector_job.rb', line 20

def self.fetch_by_id(job_id)
  es_response = ElasticConnectorActions.get_job(job_id)
  return nil unless es_response[:found]

  new(es_response)
end

.orphaned_jobs(page_size = DEFAULT_PAGE_SIZE) ⇒ Object



39
40
41
42
43
# File 'lib/core/connector_job.rb', line 39

def self.orphaned_jobs(page_size = DEFAULT_PAGE_SIZE)
  connector_ids = ConnectorSettings.fetch_all_connectors.map(&:id)
  query = { bool: { must_not: { terms: { 'connector.id': connector_ids } } } }
  fetch_jobs_by_query(query, page_size)
end

.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/core/connector_job.rb', line 27

def self.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE)
  status_term = { status: Connectors::SyncStatus::PENDING_STATUSES }

  query = { bool: { must: [{ terms: status_term }] } }

  return fetch_jobs_by_query(query, page_size) if connectors_ids.empty?

  query[:bool][:must] << { terms: { 'connector.id' => connectors_ids } }

  fetch_jobs_by_query(query, page_size)
end

.stuck_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/core/connector_job.rb', line 50

def self.stuck_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE)
  connector_ids = if connector_id
                    [connector_id]
                  else
                    ConnectorSettings.fetch_native_connectors.map(&:id)
                  end
  query = {
    bool: {
      filter: [
          { terms: { 'connector.id': connector_ids } },
          { terms: { status: Connectors::SyncStatus::ACTIVE_STATUSES } },
          { range: { last_seen: { lte: "now-#{STUCK_THRESHOLD}s" } } }
      ]
    }
  }
  fetch_jobs_by_query(query, page_size)
end

Instance Method Details

#[](property_name) ⇒ Object



76
77
78
# File 'lib/core/connector_job.rb', line 76

def [](property_name)
  @elasticsearch_response[:_source][property_name]
end

#active?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/core/connector_job.rb', line 108

def active?
  Connectors::SyncStatus::ACTIVE_STATUSES.include?(status)
end

#cancel!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object



167
168
169
# File 'lib/core/connector_job.rb', line 167

def cancel!(ingestion_stats = {},  = {})
  terminate!(Connectors::SyncStatus::CANCELED, nil, ingestion_stats, )
end

#canceled?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/core/connector_job.rb', line 100

def canceled?
  status == Connectors::SyncStatus::CANCELED
end

#canceling?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/core/connector_job.rb', line 92

def canceling?
  status == Connectors::SyncStatus::CANCELING
end

#configurationObject



136
137
138
# File 'lib/core/connector_job.rb', line 136

def configuration
  connector_snapshot[:configuration]
end

#connectorObject



148
149
150
# File 'lib/core/connector_job.rb', line 148

def connector
  @connector ||= ConnectorSettings.fetch_by_id(connector_id)
end

#connector_idObject



120
121
122
# File 'lib/core/connector_job.rb', line 120

def connector_id
  connector_snapshot[:id]
end

#connector_snapshotObject



116
117
118
# File 'lib/core/connector_job.rb', line 116

def connector_snapshot
  self[:connector] || {}
end

#done!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object



159
160
161
# File 'lib/core/connector_job.rb', line 159

def done!(ingestion_stats = {},  = {})
  terminate!(Connectors::SyncStatus::COMPLETED, nil, ingestion_stats, )
end

#errorObject



80
81
82
# File 'lib/core/connector_job.rb', line 80

def error
  self[:error]
end

#error!(message, ingestion_stats = {}, connector_metadata = {}) ⇒ Object



163
164
165
# File 'lib/core/connector_job.rb', line 163

def error!(message, ingestion_stats = {},  = {})
  terminate!(Connectors::SyncStatus::ERROR, message, ingestion_stats, )
end

#es_sourceObject



191
192
193
# File 'lib/core/connector_job.rb', line 191

def es_source
  @elasticsearch_response[:_source]
end

#filteringObject



140
141
142
# File 'lib/core/connector_job.rb', line 140

def filtering
  connector_snapshot[:filtering]
end

#idObject



72
73
74
# File 'lib/core/connector_job.rb', line 72

def id
  @elasticsearch_response[:_id]
end

#in_progress?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/core/connector_job.rb', line 88

def in_progress?
  status == Connectors::SyncStatus::IN_PROGRESS
end

#index_nameObject



124
125
126
# File 'lib/core/connector_job.rb', line 124

def index_name
  connector_snapshot[:index_name]
end

#languageObject



128
129
130
# File 'lib/core/connector_job.rb', line 128

def language
  connector_snapshot[:language]
end

#make_running!Object



177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/core/connector_job.rb', line 177

def make_running!
  with_concurrency_control do |es_doc, seq_no, primary_term|
    now = Time.now
    doc = {
      status: Connectors::SyncStatus::IN_PROGRESS,
      started_at: now,
      last_seen: now,
      worker_hostname: Socket.gethostname
    }

    ElasticConnectorActions.update_job_fields(es_doc[:_id], doc, seq_no, primary_term)
  end
end

#pending?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/core/connector_job.rb', line 104

def pending?
  Connectors::SyncStatus::PENDING_STATUSES.include?(status)
end

#pipelineObject



144
145
146
# File 'lib/core/connector_job.rb', line 144

def pipeline
  connector_snapshot[:pipeline]
end

#service_typeObject



132
133
134
# File 'lib/core/connector_job.rb', line 132

def service_type
  connector_snapshot[:service_type]
end

#statusObject



84
85
86
# File 'lib/core/connector_job.rb', line 84

def status
  self[:status]
end

#suspended?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/core/connector_job.rb', line 96

def suspended?
  status == Connectors::SyncStatus::SUSPENDED
end

#terminated?Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/core/connector_job.rb', line 112

def terminated?
  Connectors::SyncStatus::TERMINAL_STATUSES.include?(status)
end

#update_metadata(ingestion_stats = {}, connector_metadata = {}) ⇒ Object



152
153
154
155
156
157
# File 'lib/core/connector_job.rb', line 152

def (ingestion_stats = {},  = {})
  ingestion_stats ||= {}
  doc = { :last_seen => Time.now }.merge(ingestion_stats)
  doc[:metadata] =  if &.any?
  ElasticConnectorActions.update_job_fields(id, doc)
end

#with_concurrency_control {|response, , | ... } ⇒ Object

Yields:

  • (response, , )


171
172
173
174
175
# File 'lib/core/connector_job.rb', line 171

def with_concurrency_control
  response = ElasticConnectorActions.get_job(id)

  yield response, response['_seq_no'], response['_primary_term']
end