Class: Mixpanel::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/mixpanel-ruby/consumer.rb

Overview

A Consumer recieves messages from a Mixpanel::Tracker, and sends them elsewhere- probably to Mixpanel’s analytics services, but can also enqueue them for later processing, log them to a file, or do whatever else you might find useful.

You can provide your own consumer to your Mixpanel::Trackers, either by passing in an argument with a #send method when you construct the tracker, or just passing a block to Mixpanel::Tracker.new

tracker = Mixpanel::Tracker.new(MY_TOKEN) do |type, message|
    # type will be one of :event, :profile_update or :import
    @kestrel.set(ANALYTICS_QUEUE, [type, message].to_json)
end

You can also instantiate the library consumers yourself, and use them wherever you would like. For example, the working that consumes the above queue might work like this:

mixpanel = Mixpanel::Consumer
while true
    message_json = @kestrel.get(ANALYTICS_QUEUE)
    mixpanel.send(*JSON.load(message_json))
end

Mixpanel::Consumer is the default consumer. It sends each message, as the message is recieved, directly to Mixpanel.

Instance Method Summary collapse

Constructor Details

#initialize(events_endpoint = nil, update_endpoint = nil, import_endpoint = nil) ⇒ Consumer

Create a Mixpanel::Consumer. If you provide endpoint arguments, they will be used instead of the default Mixpanel endpoints. This can be useful for proxying, debugging, or if you prefer not to use SSL for your events.



64
65
66
67
68
# File 'lib/mixpanel-ruby/consumer.rb', line 64

def initialize(events_endpoint=nil, update_endpoint=nil, import_endpoint=nil)
  @events_endpoint = events_endpoint || 'https://api.mixpanel.com/track'
  @update_endpoint = update_endpoint || 'https://api.mixpanel.com/engage'
  @import_endpoint = import_endpoint || 'https://api.mixpanel.com/import'
end

Instance Method Details

#generate_tracking_url(endpoint, form_data) ⇒ Object

Generate_tracking_url takes an endpoint HTTP or HTTPS url, and a Hash of data to post to that url. It should return a string for the tracking url:

pixel_tracking_url



136
137
138
139
140
141
# File 'lib/mixpanel-ruby/consumer.rb', line 136

def generate_tracking_url(endpoint, form_data)
  uri = URI(endpoint)
  request = Net::HTTP::Get.new(uri.request_uri)
  request.set_form_data(form_data)
  "#{endpoint}?#{request.body}"
end

#request(endpoint, form_data) ⇒ Object

Request takes an endpoint HTTP or HTTPS url, and a Hash of data to post to that url. It should return a pair of

response code, response body

as the result of the response. Response code should be nil if the request never receives a response for some reason.



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/mixpanel-ruby/consumer.rb', line 116

def request(endpoint, form_data)
  uri = URI(endpoint)
  request = Net::HTTP::Post.new(uri.request_uri)
  request.set_form_data(form_data)

  client = Net::HTTP.new(uri.host, uri.port)
  client.use_ssl = true
  Mixpanel.with_http(client)

  response = client.request(request)
  [response.code, response.body]
end

#send(type, message, as_pixel = false) ⇒ Object

Send the given string message to Mixpanel. Type should be one of :event, :profile_update or :import, which will determine the endpoint.

Mixpanel::Consumer#send sends messages to Mixpanel immediately on each call. To reduce the overall bandwidth you use when communicating with Mixpanel, you can also use Mixpanel::BufferedConsumer



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/mixpanel-ruby/consumer.rb', line 76

def send(type, message, as_pixel = false)
  type = type.to_sym
  endpoint = {
    :event => @events_endpoint,
    :profile_update => @update_endpoint,
    :import => @import_endpoint
  }[type]

  decoded_message = JSON.load(message)
  api_key = decoded_message["api_key"]
  data = Base64.encode64(decoded_message["data"].to_json).gsub("\n", '')

  form_data = {"data" => data, "verbose" => 1}
  form_data.merge!("api_key" => api_key) if api_key

  if as_pixel
    form_data.merge!("img" => 1)
    return generate_tracking_url(endpoint, form_data)
  end

  response_code, response_body = request(endpoint, form_data)

  succeeded = nil
  if response_code.to_i == 200
    result = JSON.load(response_body) rescue {}
    succeeded = result['status'] == 1
  end

  if ! succeeded
    raise ConnectionError.new("Could not write to Mixpanel, server responded with #{response_code} returning: '#{response_body}'")
  end
end