Module: RailsPipeline::Subscriber::InstanceMethods

Defined in:
lib/rails-pipeline/subscriber.rb

Instance Method Summary collapse

Instance Method Details

#_api_keysObject



168
169
170
# File 'lib/rails-pipeline/subscriber.rb', line 168

def _api_keys
  return ENV.fetch('PIPELINE_API_KEYS', "").split(',')
end

#_version(payload) ⇒ Object



163
164
165
166
# File 'lib/rails-pipeline/subscriber.rb', line 163

def _version(payload)
  _, version = payload.class.name.split('_', 2)
  return RailsPipeline::PipelineVersion.new(version)
end

#handle_envelope(envelope) ⇒ Object

Take an EncryptedMessage envelope, and decrypt the cipher text, then get the protobuf object out of it



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/rails-pipeline/subscriber.rb', line 51

def handle_envelope(envelope)
  if ENV.has_key?("DISABLE_RAILS_PIPELINE") || ENV.has_key?("DISABLE_RAILS_PIPELINE_PROCESSING")
    RailsPipeline.logger.debug "Skipping incoming pipeline messages (disabled by env vars)"
    return
  end
  verify_api_key(envelope)
  payload_str = self.class.decrypt(envelope)

  # Find the registered minor version & its related handler to parse and
  # process this message.
  clazz = registered_class_on_same_major_version(envelope.type_info)

  if clazz.nil?
    # No compatible version of this message is registered for this app.
    RailsPipeline.logger.info "Dropping unclaimed message #{envelope.type_info} (no compatible version registered)."
    return
  end

  # Parse and handle the payload.
  payload = clazz.parse(payload_str)
  handle_payload(payload, envelope)
end

#handle_payload(payload, envelope) ⇒ Object

Take a protobuf object (payload) and forward it to the appropriate handler/method/proc



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/rails-pipeline/subscriber.rb', line 76

def handle_payload(payload, envelope)
  version = _version(payload)
  clazz = target_class(payload)
  handler_class = target_handler(payload)
  event_type = envelope.event_type
  method = most_suitable_handler_method_name(version, clazz)

  if clazz.is_a?(Class)
    if handler_class
      # If a built in handler_class is registered, then just use it
      handler_class.new(payload, target_class: clazz, envelope: envelope).handle_payload
    elsif method
      # Target class had a from_pipeline method, so just call it and move on
      target = clazz.send(method, payload, event_type)
    else
      RailsPipeline.logger.info "No handler set, dropping message #{payload.class.name}"
    end
    return target
  elsif clazz.is_a?(Proc)
    return clazz.call(payload)
  end
end

#most_suitable_handler_method_name(version, receiver_class) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/rails-pipeline/subscriber.rb', line 127

def most_suitable_handler_method_name(version, receiver_class)
  # Returns the closest lower implemented method in target_class for the given version
  cached_method = self.class.handler_method_cache[version]
  if cached_method
    return cached_method
  end
  available_methods = receiver_class.methods.grep(%r{^from_pipeline_#{version.major}})
    .reject { |method_name| method_name.to_s.split('_').last.to_i > version.minor }
    .sort
    .reverse

  # cache handler method for this version
  self.class.handler_method_cache[version] = available_methods.first
  return available_methods.first
end

#registered_class_on_same_major_version(payload_class_name) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/rails-pipeline/subscriber.rb', line 99

def registered_class_on_same_major_version(payload_class_name)
  if Subscriber.registered_handlers.has_key?(payload_class_name)
    # The version we've been given has been registered. Return that.
    return Object.const_get(payload_class_name)
  end

  # Lops off the minor version from the payload class name so
  # we can find the registered message type with the same major version.
  class_name_with_major_version =
    payload_class_name
    .split("_", 3)[0, 2]
    .join("_")

  # Look for message types with the same major version.
  available_classes = Subscriber.registered_handlers
    .keys
    .map(&:to_s)
    .select { |class_name| class_name.start_with?(class_name_with_major_version) }

  if available_classes.empty?
    # No message types with the same major version.
    return nil
  else
    # There's a message type with the same major version.
    return Object.const_get(available_classes.first)
  end
end

#target_class(payload) ⇒ Object



155
156
157
# File 'lib/rails-pipeline/subscriber.rb', line 155

def target_class(payload)
  RailsPipeline::Subscriber.target_class(payload.class)
end

#target_handler(payload) ⇒ Object



159
160
161
# File 'lib/rails-pipeline/subscriber.rb', line 159

def target_handler(payload)
  RailsPipeline::Subscriber.target_handler(payload.class)
end

#verify_api_key(envelope) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
# File 'lib/rails-pipeline/subscriber.rb', line 143

def verify_api_key(envelope)
  if envelope.api_key.present?
    if _api_keys.include?(envelope.api_key)
      return true
    else
      raise WrongApiKeyError.new
    end
  else
    raise NoApiKeyError.new
  end
end