Class: Pyper::Pipes::Cassandra::Reader

Inherits:
Struct
  • Object
show all
Defined in:
lib/pyper/pipes/cassandra/reader.rb

Overview

A pipe for reading items from a single row in cassandra

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#clientObject

Returns the value of attribute client

Returns:

  • (Object)

    the current value of client



6
7
8
# File 'lib/pyper/pipes/cassandra/reader.rb', line 6

def client
  @client
end

#optionsObject

Returns the value of attribute options

Returns:

  • (Object)

    the current value of options



6
7
8
# File 'lib/pyper/pipes/cassandra/reader.rb', line 6

def options
  @options
end

#tableObject

Returns the value of attribute table

Returns:

  • (Object)

    the current value of table



6
7
8
# File 'lib/pyper/pipes/cassandra/reader.rb', line 6

def table
  @table
end

Instance Method Details

#pipe(arguments, status = {}) ⇒ Enumerator::Lazy<Hash>

Returns enumerator of items.

Parameters:

  • arguments (Hash)

    Arguments passed to the cassandra client where statement

  • status (Hash) (defaults to: {})

    The mutable status field

Options Hash (arguments):

  • :limit (Integer)
  • :order (Array)

    A pair [clustering_column, :desc|:asc] determining how to order the results.

  • :paging_state (Object)
  • :page_size (Integer)
  • :consistency (Symbol)

    The consistency for the request. Must be one of Cassandra::CONSISTENCIES.

Returns:

  • (Enumerator::Lazy<Hash>)

    enumerator of items



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pyper/pipes/cassandra/reader.rb', line 16

def pipe(arguments, status = {})
  limit = arguments.delete(:limit)
  page_size = arguments.delete(:page_size)
  paging_state = arguments.delete(:paging_state)
  order = arguments.delete(:order)
  columns = arguments.delete(:columns)
  consistency = arguments.delete(:consistency)

  opts = options.nil? ? {} : options.dup
  opts[:page_size] = page_size if page_size
  opts[:paging_state] = paging_state if paging_state
  opts[:consistency] = consistency if consistency

  query = client.select(table, columns).where(arguments)
  query = query.limit(limit) if limit
  query = query.order(order.first, order.last) if order

  result = query.execute(opts)

  status[:paging_state] = result.paging_state
  status[:last_page] = result.last_page?

  result.rows.lazy
end