Class: Core::ConnectorJob
- Inherits:
-
Object
- Object
- Core::ConnectorJob
- Defined in:
- lib/core/connector_job.rb
Constant Summary collapse
- DEFAULT_PAGE_SIZE =
100
- STUCK_THRESHOLD =
60
Class Method Summary collapse
- .delete_jobs(jobs) ⇒ Object
- .enqueue(_connector_id) ⇒ Object
- .fetch_by_id(job_id) ⇒ Object
- .orphaned_jobs(page_size = DEFAULT_PAGE_SIZE) ⇒ Object
- .pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) ⇒ Object
- .stuck_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) ⇒ Object
Instance Method Summary collapse
- #[](property_name) ⇒ Object
- #active? ⇒ Boolean
- #cancel!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #canceled? ⇒ Boolean
- #canceling? ⇒ Boolean
- #configuration ⇒ Object
- #connector ⇒ Object
- #connector_id ⇒ Object
- #connector_snapshot ⇒ Object
- #done!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #error ⇒ Object
- #error!(message, ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #es_source ⇒ Object
- #filtering ⇒ Object
- #id ⇒ Object
- #in_progress? ⇒ Boolean
- #index_name ⇒ Object
- #language ⇒ Object
- #make_running! ⇒ Object
- #pending? ⇒ Boolean
- #pipeline ⇒ Object
- #service_type ⇒ Object
- #status ⇒ Object
- #suspended? ⇒ Boolean
- #terminated? ⇒ Boolean
- #update_metadata(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #with_concurrency_control {|response, , | ... } ⇒ Object
Class Method Details
.delete_jobs(jobs) ⇒ Object
45 46 47 48 |
# File 'lib/core/connector_job.rb', line 45 def self.delete_jobs(jobs) query = { terms: { '_id': jobs.map(&:id) } } ElasticConnectorActions.delete_jobs_by_query(query) end |
.enqueue(_connector_id) ⇒ Object
68 69 70 |
# File 'lib/core/connector_job.rb', line 68 def self.enqueue(_connector_id) nil end |
.fetch_by_id(job_id) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/core/connector_job.rb', line 20 def self.fetch_by_id(job_id) es_response = ElasticConnectorActions.get_job(job_id) return nil unless es_response[:found] new(es_response) end |
.orphaned_jobs(page_size = DEFAULT_PAGE_SIZE) ⇒ Object
39 40 41 42 43 |
# File 'lib/core/connector_job.rb', line 39 def self.orphaned_jobs(page_size = DEFAULT_PAGE_SIZE) connector_ids = ConnectorSettings.fetch_all_connectors.map(&:id) query = { bool: { must_not: { terms: { 'connector.id': connector_ids } } } } fetch_jobs_by_query(query, page_size) end |
.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/core/connector_job.rb', line 27 def self.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) status_term = { status: Connectors::SyncStatus::PENDING_STATUSES } query = { bool: { must: [{ terms: status_term }] } } return fetch_jobs_by_query(query, page_size) if connectors_ids.empty? query[:bool][:must] << { terms: { 'connector.id' => connectors_ids } } fetch_jobs_by_query(query, page_size) end |
.stuck_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/core/connector_job.rb', line 50 def self.stuck_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) connector_ids = if connector_id [connector_id] else ConnectorSettings.fetch_native_connectors.map(&:id) end query = { bool: { filter: [ { terms: { 'connector.id': connector_ids } }, { terms: { status: Connectors::SyncStatus::ACTIVE_STATUSES } }, { range: { last_seen: { lte: "now-#{STUCK_THRESHOLD}s" } } } ] } } fetch_jobs_by_query(query, page_size) end |
Instance Method Details
#[](property_name) ⇒ Object
76 77 78 |
# File 'lib/core/connector_job.rb', line 76 def [](property_name) @elasticsearch_response[:_source][property_name] end |
#active? ⇒ Boolean
108 109 110 |
# File 'lib/core/connector_job.rb', line 108 def active? Connectors::SyncStatus::ACTIVE_STATUSES.include?(status) end |
#cancel!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
167 168 169 |
# File 'lib/core/connector_job.rb', line 167 def cancel!(ingestion_stats = {}, = {}) terminate!(Connectors::SyncStatus::CANCELED, nil, ingestion_stats, ) end |
#canceled? ⇒ Boolean
100 101 102 |
# File 'lib/core/connector_job.rb', line 100 def canceled? status == Connectors::SyncStatus::CANCELED end |
#canceling? ⇒ Boolean
92 93 94 |
# File 'lib/core/connector_job.rb', line 92 def canceling? status == Connectors::SyncStatus::CANCELING end |
#configuration ⇒ Object
136 137 138 |
# File 'lib/core/connector_job.rb', line 136 def configuration connector_snapshot[:configuration] end |
#connector ⇒ Object
148 149 150 |
# File 'lib/core/connector_job.rb', line 148 def connector @connector ||= ConnectorSettings.fetch_by_id(connector_id) end |
#connector_id ⇒ Object
120 121 122 |
# File 'lib/core/connector_job.rb', line 120 def connector_id connector_snapshot[:id] end |
#connector_snapshot ⇒ Object
116 117 118 |
# File 'lib/core/connector_job.rb', line 116 def connector_snapshot self[:connector] || {} end |
#done!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
159 160 161 |
# File 'lib/core/connector_job.rb', line 159 def done!(ingestion_stats = {}, = {}) terminate!(Connectors::SyncStatus::COMPLETED, nil, ingestion_stats, ) end |
#error ⇒ Object
80 81 82 |
# File 'lib/core/connector_job.rb', line 80 def error self[:error] end |
#error!(message, ingestion_stats = {}, connector_metadata = {}) ⇒ Object
163 164 165 |
# File 'lib/core/connector_job.rb', line 163 def error!(, ingestion_stats = {}, = {}) terminate!(Connectors::SyncStatus::ERROR, , ingestion_stats, ) end |
#es_source ⇒ Object
191 192 193 |
# File 'lib/core/connector_job.rb', line 191 def es_source @elasticsearch_response[:_source] end |
#filtering ⇒ Object
140 141 142 |
# File 'lib/core/connector_job.rb', line 140 def filtering connector_snapshot[:filtering] end |
#id ⇒ Object
72 73 74 |
# File 'lib/core/connector_job.rb', line 72 def id @elasticsearch_response[:_id] end |
#in_progress? ⇒ Boolean
88 89 90 |
# File 'lib/core/connector_job.rb', line 88 def in_progress? status == Connectors::SyncStatus::IN_PROGRESS end |
#index_name ⇒ Object
124 125 126 |
# File 'lib/core/connector_job.rb', line 124 def index_name connector_snapshot[:index_name] end |
#language ⇒ Object
128 129 130 |
# File 'lib/core/connector_job.rb', line 128 def language connector_snapshot[:language] end |
#make_running! ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/core/connector_job.rb', line 177 def make_running! with_concurrency_control do |es_doc, seq_no, primary_term| now = Time.now doc = { status: Connectors::SyncStatus::IN_PROGRESS, started_at: now, last_seen: now, worker_hostname: Socket.gethostname } ElasticConnectorActions.update_job_fields(es_doc[:_id], doc, seq_no, primary_term) end end |
#pending? ⇒ Boolean
104 105 106 |
# File 'lib/core/connector_job.rb', line 104 def pending? Connectors::SyncStatus::PENDING_STATUSES.include?(status) end |
#pipeline ⇒ Object
144 145 146 |
# File 'lib/core/connector_job.rb', line 144 def pipeline connector_snapshot[:pipeline] end |
#service_type ⇒ Object
132 133 134 |
# File 'lib/core/connector_job.rb', line 132 def service_type connector_snapshot[:service_type] end |
#status ⇒ Object
84 85 86 |
# File 'lib/core/connector_job.rb', line 84 def status self[:status] end |
#suspended? ⇒ Boolean
96 97 98 |
# File 'lib/core/connector_job.rb', line 96 def suspended? status == Connectors::SyncStatus::SUSPENDED end |
#terminated? ⇒ Boolean
112 113 114 |
# File 'lib/core/connector_job.rb', line 112 def terminated? Connectors::SyncStatus::TERMINAL_STATUSES.include?(status) end |
#update_metadata(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
152 153 154 155 156 157 |
# File 'lib/core/connector_job.rb', line 152 def (ingestion_stats = {}, = {}) ingestion_stats ||= {} doc = { :last_seen => Time.now }.merge(ingestion_stats) doc[:metadata] = if &.any? ElasticConnectorActions.update_job_fields(id, doc) end |
#with_concurrency_control {|response, , | ... } ⇒ Object
171 172 173 174 175 |
# File 'lib/core/connector_job.rb', line 171 def with_concurrency_control response = ElasticConnectorActions.get_job(id) yield response, response['_seq_no'], response['_primary_term'] end |