Module: RailsPipeline::Subscriber::InstanceMethods
- Defined in:
- lib/rails-pipeline/subscriber.rb
Instance Method Summary collapse
- #_api_keys ⇒ Object
- #_version(payload) ⇒ Object
-
#handle_envelope(envelope) ⇒ Object
Take an EncryptedMessage envelope, and decrypt the cipher text, then get the protobuf object out of it.
-
#handle_payload(payload, envelope) ⇒ Object
Take a protobuf object (payload) and forward it to the appropriate handler/method/proc.
- #most_suitable_handler_method_name(version, receiver_class) ⇒ Object
- #registered_class_on_same_major_version(payload_class_name) ⇒ Object
- #target_class(payload) ⇒ Object
- #target_handler(payload) ⇒ Object
- #verify_api_key(envelope) ⇒ Object
Instance Method Details
#_api_keys ⇒ Object
168 169 170 |
# File 'lib/rails-pipeline/subscriber.rb', line 168 def _api_keys return ENV.fetch('PIPELINE_API_KEYS', "").split(',') end |
#_version(payload) ⇒ Object
163 164 165 166 |
# File 'lib/rails-pipeline/subscriber.rb', line 163 def _version(payload) _, version = payload.class.name.split('_', 2) return RailsPipeline::PipelineVersion.new(version) end |
#handle_envelope(envelope) ⇒ Object
Take an EncryptedMessage envelope, and decrypt the cipher text, then get the protobuf object out of it
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/rails-pipeline/subscriber.rb', line 51 def handle_envelope(envelope) if ENV.has_key?("DISABLE_RAILS_PIPELINE") || ENV.has_key?("DISABLE_RAILS_PIPELINE_PROCESSING") RailsPipeline.logger.debug "Skipping incoming pipeline messages (disabled by env vars)" return end verify_api_key(envelope) payload_str = self.class.decrypt(envelope) # Find the registered minor version & its related handler to parse and # process this message. clazz = registered_class_on_same_major_version(envelope.type_info) if clazz.nil? # No compatible version of this message is registered for this app. RailsPipeline.logger.info "Dropping unclaimed message #{envelope.type_info} (no compatible version registered)." return end # Parse and handle the payload. payload = clazz.parse(payload_str) handle_payload(payload, envelope) end |
#handle_payload(payload, envelope) ⇒ Object
Take a protobuf object (payload) and forward it to the appropriate handler/method/proc
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/rails-pipeline/subscriber.rb', line 76 def handle_payload(payload, envelope) version = _version(payload) clazz = target_class(payload) handler_class = target_handler(payload) event_type = envelope.event_type method = most_suitable_handler_method_name(version, clazz) if clazz.is_a?(Class) if handler_class # If a built in handler_class is registered, then just use it handler_class.new(payload, target_class: clazz, envelope: envelope).handle_payload elsif method # Target class had a from_pipeline method, so just call it and move on target = clazz.send(method, payload, event_type) else RailsPipeline.logger.info "No handler set, dropping message #{payload.class.name}" end return target elsif clazz.is_a?(Proc) return clazz.call(payload) end end |
#most_suitable_handler_method_name(version, receiver_class) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/rails-pipeline/subscriber.rb', line 127 def most_suitable_handler_method_name(version, receiver_class) # Returns the closest lower implemented method in target_class for the given version cached_method = self.class.handler_method_cache[version] if cached_method return cached_method end available_methods = receiver_class.methods.grep(%r{^from_pipeline_#{version.major}}) .reject { |method_name| method_name.to_s.split('_').last.to_i > version.minor } .sort .reverse # cache handler method for this version self.class.handler_method_cache[version] = available_methods.first return available_methods.first end |
#registered_class_on_same_major_version(payload_class_name) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/rails-pipeline/subscriber.rb', line 99 def registered_class_on_same_major_version(payload_class_name) if Subscriber.registered_handlers.has_key?(payload_class_name) # The version we've been given has been registered. Return that. return Object.const_get(payload_class_name) end # Lops off the minor version from the payload class name so # we can find the registered message type with the same major version. class_name_with_major_version = payload_class_name .split("_", 3)[0, 2] .join("_") # Look for message types with the same major version. available_classes = Subscriber.registered_handlers .keys .map(&:to_s) .select { |class_name| class_name.start_with?(class_name_with_major_version) } if available_classes.empty? # No message types with the same major version. return nil else # There's a message type with the same major version. return Object.const_get(available_classes.first) end end |
#target_class(payload) ⇒ Object
155 156 157 |
# File 'lib/rails-pipeline/subscriber.rb', line 155 def target_class(payload) RailsPipeline::Subscriber.target_class(payload.class) end |
#target_handler(payload) ⇒ Object
159 160 161 |
# File 'lib/rails-pipeline/subscriber.rb', line 159 def target_handler(payload) RailsPipeline::Subscriber.target_handler(payload.class) end |
#verify_api_key(envelope) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/rails-pipeline/subscriber.rb', line 143 def verify_api_key(envelope) if envelope.api_key.present? if _api_keys.include?(envelope.api_key) return true else raise WrongApiKeyError.new end else raise NoApiKeyError.new end end |