Class: PowerTrack::Stream

Inherits:
Object
  • Object
show all
Includes:
API, VoidLogger::LoggerMixin
Defined in:
lib/powertrack/streaming/stream.rb

Overview

A PowerTrack stream to be used for both updating the rules and collecting new messages.

Constant Summary collapse

FEATURE_URL_FORMAT =

The format of the URLs to connect to the various stream services

{
  # [ hostname, account, source, mode, label, feature ]
  v1: "https://%s.gnip.com/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze,
  # [ hostname, feature, account, source, label, sub-feature ]
  v2: "https://gnip-%s.twitter.com/%s/powertrack/accounts/%s/publishers/%s/%s%s.json".freeze
}.freeze
DEFAULT_CONNECTION_TIMEOUT =

The default timeout on a connection to PowerTrack. Can be overriden per call.

30
DEFAULT_INACTIVITY_TIMEOUT =

The default timeout for inactivity on a connection to PowerTrack. Can be overriden per call.

50
DEFAULT_STREAM_OPTIONS =

The default options for using the stream.

{
  # enable PowerTrack v2 API (using v1 by default)
  v2: false,
  # override the default connection timeout
  connect_timeout: DEFAULT_CONNECTION_TIMEOUT,
  # override the default inactivity timeout
  inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT,
  # use a client id if you want to leverage the Backfill feature in v1
  client_id: nil,
  # enable the replay mode to get activities over the last 5 days
  # see http://support.gnip.com/apis/replay/api_reference.html
  replay: false
}.freeze
DEFAULT_OK_RESPONSE_STATUS =
200
HEARTBEAT_MESSAGE_PATTERN =

The patterns used to identify the various types of message received from GNIP everything else is an activity

/\A\s*\z/.freeze
SYSTEM_MESSAGE_PATTERN =
/\A\s*\{\s*"(info|warn|error)":/mi.freeze
REPLAY_TIMESTAMP_FORMAT =

The format used to send UTC timestamps in Replay mode

'%Y%m%d%H%M'.freeze
DEFAULT_LIST_RULES_OPTIONS =
{
  compressed: true,
  objectify: true
}.freeze
DEFAULT_TRACK_OPTIONS =
{
  # receive GZip-compressed payloads ?
  compressed: true,
  # max number of retries after a disconnection
  max_retries: 2,
  # advanced options to configure exponential backoff used for retries
  backoff: nil,
  # max number of seconds to wait for last message handlers to complete
  stop_timeout: 10,
  # pass message in raw form (JSON formatted string) instead of JSON-decoded
  # Ruby objects to message handlers
  raw: false,
  # the starting date from which the activities will be recovered (replay mode only)
  from: nil,
  # the ending date to which the activities will be recovered (replay mode only)
  to: nil,
  # specify a number of minutes to leverage the Backfill feature (v2 only)
  backfill_minutes: nil,
  # called for each message received, except heartbeats
  on_message: nil,
  # called for each activity received
  on_activity: nil,
  # called for each system message received
  on_system: nil,
  # called for each heartbeat received
  on_heartbeat: nil,
  # called periodically to detect if the tracked has to be closed
  close_now: nil
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from API

#add_rule, #delete_rule

Constructor Details

#initialize(username, password, account_name, data_source, label, options = nil) ⇒ Stream

Returns a new instance of Stream.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/powertrack/streaming/stream.rb', line 62

def initialize(username, password, , data_source, label, options=nil)
  @username = username
  @password = password
  @account_name = 
  @data_source = data_source
  @label = label
  @options = DEFAULT_STREAM_OPTIONS.merge(options || {})
  @replay = !!@options[:replay]
  @client_id = @options[:client_id]
  @stream_mode = @replay ? 'replay' : 'streams'

  # force v1 if Replay activated
  @v2 = !@replay && !!@options[:v2]
end

Instance Attribute Details

#account_nameObject (readonly)

Returns the value of attribute account_name.



60
61
62
# File 'lib/powertrack/streaming/stream.rb', line 60

def 
  @account_name
end

#data_sourceObject (readonly)

Returns the value of attribute data_source.



60
61
62
# File 'lib/powertrack/streaming/stream.rb', line 60

def data_source
  @data_source
end

#labelObject (readonly)

Returns the value of attribute label.



60
61
62
# File 'lib/powertrack/streaming/stream.rb', line 60

def label
  @label
end

#usernameObject (readonly)

Returns the value of attribute username.



60
61
62
# File 'lib/powertrack/streaming/stream.rb', line 60

def username
  @username
end

Instance Method Details

#add_rules(*rules) ⇒ Object

Adds many rules to your PowerTrack stream’s ruleset.

POST /rules

See support.gnip.com/apis/powertrack/api_reference.html#AddRules



82
83
84
85
86
87
# File 'lib/powertrack/streaming/stream.rb', line 82

def add_rules(*rules)
  # flatten the rules in case it was provided as an array
  make_rules_request(:post,
    body: MultiJson.encode('rules' => rules.flatten),
    ok: 201)
end

#delete_rules(*rules) ⇒ Object

Removes the specified rules from the stream.

DELETE /rules

See support.gnip.com/apis/powertrack/api_reference.html#DeleteRules



94
95
96
97
98
99
100
101
102
103
# File 'lib/powertrack/streaming/stream.rb', line 94

def delete_rules(*rules)
  # v2 does not use DELETE anymore
  delete_verb = @v2 ? :post : :delete
  # flatten the rules in case it was provided as an array
  delete_options = { body: MultiJson.encode('rules' => rules.flatten) }
  # v2 uses a query parameter
  delete_options[:query] = { '_method' => 'delete' } if @v2

  make_rules_request(delete_verb, delete_options)
end

#list_rules(options = nil) ⇒ Object

Retrieves all existing rules for a stream.

Returns an array of PowerTrack::Rule objects when the response permits so.

GET /rules

See support.gnip.com/apis/powertrack/api_reference.html#ListRules



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/powertrack/streaming/stream.rb', line 117

def list_rules(options=nil)
  options = DEFAULT_LIST_RULES_OPTIONS.merge(options || {})
  res = make_rules_request(:get, headers: gzip_compressed_header(options[:compressed]))

  # return Rule objects when required and feasible/appropriate
  if options[:objectify] &&
     res.is_a?(Hash) &&
     (rules = res['rules']).is_a?(Array) &&
     rules.all? { |rule| rule.is_a?(Hash) && rule.key?('value') }
    rules.map do |rule|
      PowerTrack::Rule.new(rule['value'], tag: rule['tag'], id: rule['id'])
    end
  else
    res
  end
end

#track(options = nil) ⇒ Object

Establishes a persistent connection to the PowerTrack data stream, through which the social data will be delivered.

Manages reconnections when being disconnected.

GET /track/:stream

See support.gnip.com/apis/powertrack/api_reference.html#Stream



172
173
174
175
176
# File 'lib/powertrack/streaming/stream.rb', line 172

def track(options=nil)
  options = DEFAULT_TRACK_OPTIONS.merge(options || {})
  retrier = PowerTrack::Retrier.new(options[:max_retries])
  handle_api_response(*retrier.retry { track_once(options, retrier) })
end