Class: Akasha::Checkpoint::MetadataCheckpoint

Inherits:
Object
  • Object
show all
Defined in:
lib/akasha/checkpoint/metadata_checkpoint.rb

Overview

Stores stream position in stream metadata.

Instance Method Summary collapse

Constructor Details

#initialize(stream, interval: 1) ⇒ MetadataCheckpoint

Creates a new checkpoint, storing position in ‘stream` every `interval` events. Use `interval` greater than zero for idempotent event listeners.

Raises:

  • (UnsupportedStorageError)


7
8
9
10
11
12
# File 'lib/akasha/checkpoint/metadata_checkpoint.rb', line 7

def initialize(stream, interval: 1)
  @stream = stream
  @interval = interval
  return if @stream.respond_to?(:metadata) && @stream.respond_to?(:metadata=)
  raise UnsupportedStorageError, "Storage does not support checkpoints: #{stream.class}"
end

Instance Method Details

#ack(position) ⇒ Object

Returns the next position, conditionally storing it (based on the configurable interval).



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/akasha/checkpoint/metadata_checkpoint.rb', line 22

def ack(position)
  @next_position = position + 1
  if (@next_position % @interval).zero?
    # TODO: Race condition; use optimistic cocurrency.
    @stream. = @stream..merge(next_position: @next_position)
  end
  @next_position
rescue Akasha::HttpClientError => e
  raise if e.status_code != 404
  raise CheckpointStreamNotFoundError, "Stream cannot be checkpointed; it does not exist: #{@stream.name}"
end

#latestObject

rubocop:disable Naming/MemoizedInstanceVariableName Returns the most recently stored next position.



16
17
18
# File 'lib/akasha/checkpoint/metadata_checkpoint.rb', line 16

def latest
  @next_position ||= (read_position || 0)
end