Class: PubSubModelSync::MessagePublisher

Inherits:
Base
  • Object
show all
Defined in:
lib/pub_sub_model_sync/message_publisher.rb

Defined Under Namespace

Classes: MissingPublisher

Class Attribute Summary collapse

Class Method Summary collapse

Methods inherited from Base

config, debug?, log

Class Attribute Details

.current_transactionObject

Returns the value of attribute current_transaction.



7
8
9
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 7

def current_transaction
  @current_transaction
end

Class Method Details

.connectorObject



9
10
11
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 9

def connector
  @connector ||= PubSubModelSync::Connector.new
end

.connector_publish(payload) ⇒ Object



96
97
98
99
100
101
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 96

def connector_publish(payload)
  log("Publishing message #{[payload.uuid]}...") if config.debug
  connector.publish(payload)
  log("Published message: #{[payload]}")
  config.on_after_publish.call(payload)
end

.init_transaction(key, settings = {}) ⇒ Transaction

Starts a new transaction

Parameters:

  • key (String, Nil)

Returns:



39
40
41
42
43
44
45
46
47
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 39

def init_transaction(key, settings = {})
  new_transaction = PubSubModelSync::Transaction.new(key, **settings)
  if current_transaction
    current_transaction.add_transaction(new_transaction)
  else
    self.current_transaction = new_transaction
  end
  new_transaction
end

.publish(payload, &block) ⇒ Object

Similar to :publish! method but ignores the error if failed

Returns:

  • Payload



90
91
92
93
94
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 90

def publish(payload, &block)
  publish!(payload, &block)
rescue => e
  config.on_error_publish.call(e, { payload: payload })
end

.publish!(payload, &block) ⇒ Object

Publishes payload to pubsub Raises error if exist

Parameters:

Returns:

  • Payload



76
77
78
79
80
81
82
83
84
85
86
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 76

def publish!(payload, &block)
  add_transaction_headers(payload)
  return unless ensure_publish(payload)

  current_transaction ? current_transaction.add_payload(payload) : connector_publish(payload)
  block&.call
  payload
rescue => e
  print_error(e, payload)
  raise
end

.publish_data(klass, data, action, headers: {}) ⇒ Object

Publishes a class level notification via pubsub

Returns:

  • Payload



52
53
54
55
56
57
58
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 52

def publish_data(klass, data, action, headers: {})
  info = { klass: klass.to_s, action: action.to_sym, mode: :klass }
  log("Building payload for: #{info.inspect}") if config.debug
  payload = PubSubModelSync::Payload.new(data, info, headers)
  define_transaction_key(payload)
  publish(payload)
end

.publish_model(model, action, settings = {}) ⇒ Object

Parameters:



63
64
65
66
67
68
69
70
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 63

def publish_model(model, action, settings = {})
  log("Building payload for: #{[model, action].inspect}") if config.debug
  payload = PubSubModelSync::PayloadBuilder.new(model, action, settings).call
  define_transaction_key(payload)
  transaction(payload.headers[:ordering_key]) do # catch and group all :ps_before_publish syncs
    publish(payload) { model.ps_after_publish(action, payload) } if ensure_model_publish(model, action, payload)
  end
end

.transaction(key, settings = {}, &block) ⇒ Object

Permits to group all payloads with the same ordering_key and be processed in the same order

they are published by the subscribers. Grouping by ordering_key allows us to enable
multiple workers in our Pub/Sub service(s), and still guarantee that related payloads will
be processed in the correct order, despite of the multiple threads. This thanks to the fact
that Pub/Sub services will always send messages with the same `ordering_key` into the same
worker/thread.

Parameters:

  • key (String|Nil)
  • settings (Hash<:headers, :max_buffer>) (defaults to: {})

    @option headers [Hash] Headers to be merged for each payload inside this transaction @option max_buffer [Integer] Deprecated

  • block (Yield)

    block to be executed

See Also:

  • Transaction.new(...)


25
26
27
28
29
30
31
32
33
34
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 25

def transaction(key, settings = {}, &block)
  t = init_transaction(key, settings)
  block.call
  t.finish
rescue
  t.rollback
  raise
ensure
  t.clean_publisher
end