Class: Greenfinch::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/greenfinch-ruby/consumer.rb

Overview

A Consumer receives messages from a Greenfinch::Tracker, and sends them elsewhere- probably to Greenfinch’s analytics services, but can also enqueue them for later processing, log them to a file, or do whatever else you might find useful.

You can provide your own consumer to your Greenfinch::Trackers, either by passing in an argument with a #send! method when you construct the tracker, or just passing a block to Greenfinch::Tracker.new

tracker = Greenfinch::Tracker.new(YOUR_GREENFINCH_TOKEN) do |type, message|
    # type will be one of :event, :profile_update or :import
    @kestrel.set(ANALYTICS_QUEUE, [type, message].to_json)
end

You can also instantiate the library consumers yourself, and use them wherever you would like. For example, the working that consumes the above queue might work like this:

greenfinch = Greenfinch::Consumer
while true
    message_json = @kestrel.get(ANALYTICS_QUEUE)
    greenfinch.send!(*JSON.load(message_json))
end

Greenfinch::Consumer is the default consumer. It sends each message, as the message is recieved, directly to Greenfinch.

Instance Method Summary collapse

Constructor Details

#initialize(events_endpoint = nil, update_endpoint = nil, groups_endpoint = nil, import_endpoint = nil) ⇒ Consumer

Create a Greenfinch::Consumer. If you provide endpoint arguments, they will be used instead of the default Greenfinch endpoints. This can be useful for proxying, debugging, or if you prefer not to use SSL for your events.



61
62
63
64
65
66
67
68
69
70
# File 'lib/greenfinch-ruby/consumer.rb', line 61

def initialize(events_endpoint=nil,
               update_endpoint=nil,
               groups_endpoint=nil,
               import_endpoint=nil)

  @events_endpoint = events_endpoint || 'https://event.kcd.partners/api/publish'
  @update_endpoint = update_endpoint || 'https://api.greenfinch.com/engage'
  @groups_endpoint = groups_endpoint || 'https://api.greenfinch.com/groups'
  @import_endpoint = import_endpoint || 'https://api.greenfinch.com/import'
end

Instance Method Details

#request(endpoint, form_data, jwt_token = nil) ⇒ Object

Request takes an endpoint HTTP or HTTPS url, and a Hash of data to post to that url. It should return a pair of

response code, response body

as the result of the response. Response code should be nil if the request never receives a response for some reason.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/greenfinch-ruby/consumer.rb', line 126

def request(endpoint, form_data, jwt_token=nil)
  uri = URI(endpoint)

  headers = {
    "jwt" => jwt_token,
    "content-type" => "application/json",
    "label" => "ruby"
  }

  client = Net::HTTP.new(uri.host, uri.port)
  client.use_ssl = true
  client.open_timeout = 10
  client.continue_timeout = 10
  client.read_timeout = 10
  client.ssl_timeout = 10

  Greenfinch.with_http(client)

  response = client.request_post uri.request_uri, form_data.to_json, headers

  [response.code, response.body]
end

#send(type, message) ⇒ Object

This method was deprecated in release 2.0.0, please use send! instead



114
115
116
117
# File 'lib/greenfinch-ruby/consumer.rb', line 114

def send(type, message)
    warn '[DEPRECATION] send has been deprecated as of release 2.0.0, please use send! instead'
    send!(type, message)
end

#send!(type, message) ⇒ Object

Send the given string message to Greenfinch. Type should be one of :event, :profile_update or :import, which will determine the endpoint.

Greenfinch::Consumer#send! sends messages to Greenfinch immediately on each call. To reduce the overall bandwidth you use when communicating with Greenfinch, you can also use Greenfinch::BufferedConsumer



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/greenfinch-ruby/consumer.rb', line 78

def send!(type, message)
  type = type.to_sym
  endpoint = {
    :event => @events_endpoint,
    :profile_update => @update_endpoint,
    :group_update => @groups_endpoint,
    :import => @import_endpoint,
  }[type]

  decoded_message = JSON.load(message)
  jwt_token = decoded_message["jwt_token"]
  service_name = decoded_message["service_name"]

  endpoint = "#{endpoint}/#{service_name}"

  form_data = decoded_message["data"]

  begin
    response_code, response_body = request(endpoint, form_data, jwt_token)
  rescue => e
    raise ConnectionError.new("Could not connect to Greenfinch, with error \"#{e.message}\".")
  end

  result = {}
  if response_code.to_i == 200
    begin
      result = JSON.parse(response_body.to_s)
    rescue JSON::JSONError
      raise ServerError.new("Could not interpret Greenfinch server response: '#{response_body}'")
    end
  else
    raise ServerError.new("Could not write to Greenfinch, server responded with #{response_code} returning: '#{response_body}'")
  end
end