Class: PubSubModelSync::MessagePublisher
- Defined in:
- lib/pub_sub_model_sync/message_publisher.rb
Defined Under Namespace
Classes: MissingPublisher
Class Attribute Summary collapse
-
.current_transaction ⇒ Object
Returns the value of attribute current_transaction.
Class Method Summary collapse
- .connector ⇒ Object
- .connector_publish(payload) ⇒ Object
-
.init_transaction(key, settings = {}) ⇒ Transaction
Starts a new transaction.
-
.publish(payload, &block) ⇒ Object
Similar to :publish! method but ignores the error if failed.
-
.publish!(payload, &block) ⇒ Object
Publishes payload to pubsub Raises error if exist.
-
.publish_data(klass, data, action, headers: {}) ⇒ Object
Publishes a class level notification via pubsub.
- .publish_model(model, action, settings = {}) ⇒ Object
-
.transaction(key, settings = {}, &block) ⇒ Object
Permits to group all payloads with the same ordering_key and be processed in the same order they are published by the subscribers.
Methods inherited from Base
Class Attribute Details
.current_transaction ⇒ Object
Returns the value of attribute current_transaction.
7 8 9 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 7 def current_transaction @current_transaction end |
Class Method Details
.connector ⇒ Object
9 10 11 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 9 def connector @connector ||= PubSubModelSync::Connector.new end |
.connector_publish(payload) ⇒ Object
96 97 98 99 100 101 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 96 def connector_publish(payload) log("Publishing message #{[payload.uuid]}...") if config.debug connector.publish(payload) log("Published message: #{[payload]}") config.on_after_publish.call(payload) end |
.init_transaction(key, settings = {}) ⇒ Transaction
Starts a new transaction
39 40 41 42 43 44 45 46 47 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 39 def init_transaction(key, settings = {}) new_transaction = PubSubModelSync::Transaction.new(key, **settings) if current_transaction current_transaction.add_transaction(new_transaction) else self.current_transaction = new_transaction end new_transaction end |
.publish(payload, &block) ⇒ Object
Similar to :publish! method but ignores the error if failed
90 91 92 93 94 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 90 def publish(payload, &block) publish!(payload, &block) rescue => e config.on_error_publish.call(e, { payload: payload }) end |
.publish!(payload, &block) ⇒ Object
Publishes payload to pubsub Raises error if exist
76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 76 def publish!(payload, &block) add_transaction_headers(payload) return unless ensure_publish(payload) current_transaction ? current_transaction.add_payload(payload) : connector_publish(payload) block&.call payload rescue => e print_error(e, payload) raise end |
.publish_data(klass, data, action, headers: {}) ⇒ Object
Publishes a class level notification via pubsub
52 53 54 55 56 57 58 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 52 def publish_data(klass, data, action, headers: {}) info = { klass: klass.to_s, action: action.to_sym, mode: :klass } log("Building payload for: #{info.inspect}") if config.debug payload = PubSubModelSync::Payload.new(data, info, headers) define_transaction_key(payload) publish(payload) end |
.publish_model(model, action, settings = {}) ⇒ Object
63 64 65 66 67 68 69 70 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 63 def publish_model(model, action, settings = {}) log("Building payload for: #{[model, action].inspect}") if config.debug payload = PubSubModelSync::PayloadBuilder.new(model, action, settings).call define_transaction_key(payload) transaction(payload.headers[:ordering_key]) do # catch and group all :ps_before_publish syncs publish(payload) { model.ps_after_publish(action, payload) } if ensure_model_publish(model, action, payload) end end |
.transaction(key, settings = {}, &block) ⇒ Object
Permits to group all payloads with the same ordering_key and be processed in the same order
they are published by the subscribers. Grouping by ordering_key allows us to enable
multiple workers in our Pub/Sub service(s), and still guarantee that related payloads will
be processed in the correct order, despite of the multiple threads. This thanks to the fact
that Pub/Sub services will always send messages with the same `ordering_key` into the same
worker/thread.
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/pub_sub_model_sync/message_publisher.rb', line 25 def transaction(key, settings = {}, &block) t = init_transaction(key, settings) block.call t.finish rescue t.rollback raise ensure t.clean_publisher end |