Class: Steem::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/steem/stream.rb

Overview

Steem::Stream allows a live view of the STEEM blockchain.

Example streaming blocks:

stream = Steem::Stream.new

stream.blocks do |block, block_num|
  puts "#{block_num} :: #{block.witness}"
end

Example streaming transactions:

stream = Steem::Stream.new

stream.transactions do |trx, trx_id, block_num|
  puts "#{block_num} :: #{trx_id} :: operations: #{trx.operations.size}"
end

Example streaming operations:

stream = Steem::Stream.new

stream.operations do |op, trx_id, block_num|
  puts "#{block_num} :: #{trx_id} :: #{op.type}: #{op.value.to_json}"
end

Allows streaming of block headers, full blocks, transactions, operations and virtual operations.

Constant Summary collapse

BLOCK_INTERVAL =
3
MAX_BACKOFF_BLOCK_INTERVAL =
30
MAX_RETRY_COUNT =
10
VOP_TRX_ID =
('0' * 40).freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {mode: :irreversible}) ⇒ Stream

Returns a new instance of Stream.

Parameters:

  • options (Hash) (defaults to: {mode: :irreversible})

    additional options

Options Hash (options):

  • :database_api (Steem::DatabaseApi)
  • :block_api (Steem::BlockApi)
  • :account_history_api (Steem::AccountHistoryApi || Steem::CondenserApi)
  • :mode (Symbol)

    we have the choice between

    • :head the last block

    • :irreversible the block that is confirmed by 2/3 of all block producers and is thus irreversible!

  • :no_warn (Boolean)

    do not generate warnings



47
48
49
50
51
52
53
54
# File 'lib/steem/stream.rb', line 47

def initialize(options = {mode: :irreversible})
  @instance_options = options
  @database_api = options[:database_api] || Steem::DatabaseApi.new(options)
  @block_api = options[:block_api] || Steem::BlockApi.new(options)
  @account_history_api = options[:account_history_api]
  @mode = options[:mode] || :irreversible
  @no_warn = !!options[:no_warn]
end

Instance Attribute Details

#account_history_apiObject (readonly)

Returns the value of attribute account_history_api.



31
32
33
# File 'lib/steem/stream.rb', line 31

def 
  @account_history_api
end

#block_apiObject (readonly)

Returns the value of attribute block_api.



31
32
33
# File 'lib/steem/stream.rb', line 31

def block_api
  @block_api
end

#database_apiObject (readonly)

Returns the value of attribute database_api.



31
32
33
# File 'lib/steem/stream.rb', line 31

def database_api
  @database_api
end

#modeObject (readonly)

Returns the value of attribute mode.



31
32
33
# File 'lib/steem/stream.rb', line 31

def mode
  @mode
end

Instance Method Details

#block_headers(options = {}, &block) ⇒ Object

Use this method to stream block headers. This is quite a bit faster than requesting full blocks.

Parameters:

  • options (Hash) (defaults to: {})

    additional options

Options Hash (options):

  • :at_block_num (Integer)

    Starts the stream at the given block number. Default: nil.

  • :until_block_num (Integer)

    Ends the stream at the given block number. Default: nil.



74
75
76
# File 'lib/steem/stream.rb', line 74

def block_headers(options = {}, &block)
  block_objects(options.merge(object: :block_headers), block)
end

#block_numbers(options = {}, &block) ⇒ Object

Use this method to stream block numbers. This is significantly faster than requesting full blocks and even block headers. Basically, the only thing this method does is call Database#get_dynamic_global_properties at 3 second intervals.

Parameters:

  • options (Hash) (defaults to: {})

    additional options

Options Hash (options):

  • :at_block_num (Integer)

    Starts the stream at the given block number. Default: nil.

  • :until_block_num (Integer)

    Ends the stream at the given block number. Default: nil.



64
65
66
# File 'lib/steem/stream.rb', line 64

def block_numbers(options = {}, &block)
  block_objects(options.merge(object: :block_numbers), block)
end

#blocks(options = {}, &block) ⇒ Object

Use this method to stream full blocks.

Parameters:

  • options (Hash) (defaults to: {})

    additional options

Options Hash (options):

  • :at_block_num (Integer)

    Starts the stream at the given block number. Default: nil.

  • :until_block_num (Integer)

    Ends the stream at the given block number. Default: nil.



83
84
85
# File 'lib/steem/stream.rb', line 83

def blocks(options = {}, &block)
  block_objects(options.merge(object: :blocks), block)
end

#operations(*args, &block) ⇒ Object

Returns the latest operations from the blockchain.

stream = Steem::Stream.new
stream.operations do |op|
  puts op.to_json
end

If symbol are passed to ‘types` option, then only that operation is returned. Expected symbols are:






cancel_transfer_from_savings_operation

claim_reward_balance_operation
comment_operation
comment_options_operation
convert_operation
custom_operation
custom_json_operation
decline_voting_rights_operation
delegate_vesting_shares_operation
delete_comment_operation
escrow_approve_operation
escrow_dispute_operation
escrow_release_operation
escrow_transfer_operation
feed_publish_operation
limit_order_cancel_operation
limit_order_create_operation
limit_order_create2_operation
pow_operation
pow2_operation


set_withdraw_vesting_route_operation
transfer_operation
transfer_from_savings_operation
transfer_to_savings_operation
transfer_to_vesting_operation
vote_operation
withdraw_vesting_operation
witness_update_operation

For example, to stream only votes:

stream = Steem::Stream.new
stream.operations(types: :vote_operation) do |vote|
  puts vote.to_json
end

… Or …

stream = Steem::Stream.new
stream.operations(:vote_operation) do |vote|
  puts vote.to_json
end

You can also stream virtual operations:

stream = Steem::Stream.new
stream.operations(types: :author_reward_operation, only_virtual: true) do |vop|
  v = vop.value
  puts "#{v.author} got paid for #{v.permlink}: #{[v.sbd_payout, v.steem_payout, v.vesting_payout]}"
end

… or multiple virtual operation types;

stream = Steem::Stream.new
stream.operations(types: [:producer_reward_operation, :author_reward_operation], only_virtual: true) do |vop|
  puts vop.to_json
end

… or all types, including virtual operation types from the head block number:

stream = Steem::Stream.new(mode: :head)
stream.operations(include_virtual: true) do |op|
  puts op.to_json
end

Expected virtual operation types:

producer_reward_operation
author_reward_operation
curation_reward_operation
fill_convert_request_operation
fill_order_operation
fill_vesting_withdraw_operation
interest_operation
shutdown_witness_operation

Parameters:

  • args (Symbol || Array<Symbol> || Hash)

    the type(s) of operation or hash of expanded options, optional.

  • block

    the block to execute for each result. Yields: |op, trx_id, block_num|

Options Hash (*args):

  • :at_block_num (Integer)

    Starts the stream at the given block number. Default: nil.

  • :until_block_num (Integer)

    Ends the stream at the given block number. Default: nil.

  • :types (Symbol || Array<Symbol>)

    the type(s) of operation, optional.

  • :only_virtual (Boolean)

    Only stream virtual options. Setting this true will improve performance because the stream only needs block numbers to then retrieve virtual operations. Default: false.

  • :include_virtual (Boolean)

    Also stream virtual options. Setting this true will impact performance. Default: false.



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/steem/stream.rb', line 211

def operations(*args, &block)
  options = {}
  types = []
  only_virtual = false
  include_virtual = false
  last_block_num = nil
  
  case args.first
  when Hash
    options = args.first
    types = transform_types(options[:types])
    only_virtual = !!options[:only_virtual] || false
    include_virtual = !!options[:include_virtual] || only_virtual || false
  when Symbol, Array then types = transform_types(args)
  end
  
  if only_virtual
    block_numbers(options) do |block_num|
      get_virtual_ops(types, block_num, block)
    end
  else
    transactions(options) do |transaction, trx_id, block_num|
      transaction.operations.each do |op|
        yield op, trx_id, block_num if types.none? || types.include?(op.type)
        
        next unless last_block_num != block_num
        
        last_block_num = block_num
        
        get_virtual_ops(types, block_num, block) if include_virtual
      end
    end
  end
end

#transactions(options = {}, &block) ⇒ Object

Use this method to stream each transaction.

Parameters:

  • options (Hash) (defaults to: {})

    additional options

Options Hash (options):

  • :at_block_num (Integer)

    Starts the stream at the given block number. Default: nil.

  • :until_block_num (Integer)

    Ends the stream at the given block number. Default: nil.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/steem/stream.rb', line 92

def transactions(options = {}, &block)
  blocks(options) do |block, block_num|
    if block.nil?
      warn "Batch missing block_num: #{block_num}, retrying ..."
      
      block = block_api.get_block(block_num: block_num) do |result|
        result.block
      end
    end
    
    block.transactions.each_with_index do |transaction, index|
      trx_id = block.transaction_ids[index]
      
      yield transaction, trx_id, block_num
    end
  end
end