Module: Emque::Producing::Message
- Defined in:
- lib/emque/producing/message/message.rb
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- InvalidMessageError =
Class.new(StandardError)
- MessagesNotSentError =
Class.new(StandardError)
Class Method Summary collapse
Instance Method Summary collapse
- #add_metadata ⇒ Object
- #ignored_exceptions ⇒ Object
- #invalid_attributes ⇒ Object
- #message_type ⇒ Object
- #publish(publisher = nil) ⇒ Object
- #raise_on_failure? ⇒ Boolean
- #to_json ⇒ Object
- #topic ⇒ Object
- #valid? ⇒ Boolean
Class Method Details
.included(base) ⇒ Object
75 76 77 78 79 |
# File 'lib/emque/producing/message/message.rb', line 75 def self.included(base) base.extend(ClassMethods) base.send(:include, Virtus.value_object) base.send(:attribute, :partition_key, String, :default => nil, :required => false) end |
Instance Method Details
#add_metadata ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/emque/producing/message/message.rb', line 81 def { :metadata => { :host => host_name, :app => app_name, :topic => topic, :created_at => formatted_time, :uuid => uuid, :type => , :partition_key => partition_key } }.merge(public_attributes) end |
#ignored_exceptions ⇒ Object
108 109 110 |
# File 'lib/emque/producing/message/message.rb', line 108 def ignored_exceptions self.class.read_ignored_exceptions end |
#invalid_attributes ⇒ Object
116 117 118 119 120 121 122 |
# File 'lib/emque/producing/message/message.rb', line 116 def invalid_attributes invalid_attrs = self.class.attribute_set.inject([]) do |attrs, attr| attrs << attr.name if attr.required? && self.attributes.fetch(attr.name).nil? attrs end Array(invalid_attrs) - self.class.private_attrs end |
#message_type ⇒ Object
100 101 102 |
# File 'lib/emque/producing/message/message.rb', line 100 def self.class. end |
#publish(publisher = nil) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/emque/producing/message/message.rb', line 129 def publish(publisher=nil) publisher ||= Emque::Producing.publisher log "publishing...", true if valid? log "valid...", true if Emque::Producing.configuration. = process_middleware(to_json) sent = publisher.publish(topic, , , partition_key, raise_on_failure?) log "sent #{sent}" raise MessagesNotSentError.new unless sent end else log "failed...", true raise InvalidMessageError.new() end rescue *ignored_exceptions => error if raise_on_failure? raise else log "failed ignoring exception... #{error}", true end end |
#raise_on_failure? ⇒ Boolean
104 105 106 |
# File 'lib/emque/producing/message/message.rb', line 104 def raise_on_failure? self.class.read_raise_on_failure end |
#to_json ⇒ Object
124 125 126 127 |
# File 'lib/emque/producing/message/message.rb', line 124 def to_json data = self. Oj.dump(data, :mode => :compat) end |
#topic ⇒ Object
96 97 98 |
# File 'lib/emque/producing/message/message.rb', line 96 def topic self.class.read_topic end |
#valid? ⇒ Boolean
112 113 114 |
# File 'lib/emque/producing/message/message.rb', line 112 def valid? invalid_attributes.empty? && topic && end |