Class: TableSync::Publisher

Inherits:
BasePublisher show all
Defined in:
lib/table_sync/publisher.rb

Constant Summary collapse

DEBOUNCE_TIME =
1.minute

Constants inherited from BasePublisher

BasePublisher::BASE_SAFE_JSON_TYPES, BasePublisher::NOT_MAPPED

Instance Method Summary collapse

Constructor Details

#initialize(object_class, original_attributes, **opts) ⇒ Publisher

‘original_attributes’ are not published, they are used to resolve the routing key



7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/table_sync/publisher.rb', line 7

def initialize(object_class, original_attributes, **opts)
  @object_class = object_class.constantize
  @original_attributes = filter_safe_for_serialization(original_attributes.deep_symbolize_keys)
  @confirm = opts.fetch(:confirm, true)
  @debounce_time = opts[:debounce_time]&.seconds || DEBOUNCE_TIME

  if opts[:destroyed].nil?
    @state = opts.fetch(:state, :updated).to_sym
    validate_state
  else
    # TODO Legacy job support, remove
    @state = opts[:destroyed] ? :destroyed : :updated
  end
end

Instance Method Details

#publishObject



22
23
24
25
26
27
28
29
30
# File 'lib/table_sync/publisher.rb', line 22

def publish
  return enqueue_job if destroyed? || debounce_time.zero?

  sync_time = Rails.cache.read(cache_key) || current_time - debounce_time - 1.second
  return if sync_time > current_time

  next_sync_time = sync_time + debounce_time
  next_sync_time <= current_time ? enqueue_job : enqueue_job(next_sync_time)
end

#publish_nowObject



32
33
34
35
36
37
38
39
40
# File 'lib/table_sync/publisher.rb', line 32

def publish_now
  # Update request and object does not exist -> skip publishing
  return if !object && !destroyed?

  Rabbit.publish(params)
  model_naming = TableSync.orm.model_naming(object_class)
  TableSync::Instrument.notify table: model_naming.table, schema: model_naming.schema,
                               event: event, direction: :publish
end