Class: EventStoreClient::HTTP::Client

Inherits:
Object
  • Object
show all
Includes:
Configuration
Defined in:
lib/event_store_client/adapters/http/client.rb

Instance Method Summary collapse

Methods included from Configuration

#config

Instance Method Details

#append_to_stream(stream_name, events, options: {}) ⇒ Object

Appends given events to the stream

Parameters:

  • Stream (String)

    name to append events to

  • (each: (Array)

    EventStoreClient::DeserializedEvent) list of events to publish

Returns:

  • Dry::Monads::Result::Success or Dry::Monads::Result::Failure



16
17
18
19
20
# File 'lib/event_store_client/adapters/http/client.rb', line 16

def append_to_stream(stream_name, events, options: {})
  Commands::Streams::Append.new(connection).call(
    stream_name, events, options: options
  )
end

#delete_stream(stream_name, options: {}) ⇒ Object

Softly deletes the given stream

Parameters:

  • Stream (String)

    name to delete

  • options (Hash) (defaults to: {})

    additional options to the request

Returns:

  • Dry::Monads::Result::Success or Dry::Monads::Result::Failure



27
28
29
30
31
# File 'lib/event_store_client/adapters/http/client.rb', line 27

def delete_stream(stream_name, options: {})
  Commands::Streams::Delete.new(connection).call(
    stream_name, options: options
  )
end

Links given events with the given stream

Parameters:

  • Stream (String)

    name to link events to

  • (each: (Array)

    EventStoreClient::DeserializedEvent) a list of events to link

  • expected_version (Integer)

    expected number of events in the stream

Returns:

  • Dry::Monads::Result::Success or Dry::Monads::Result::Failure



100
101
102
103
104
# File 'lib/event_store_client/adapters/http/client.rb', line 100

def link_to(stream_name, events, options: {})
  Commands::Streams::LinkTo.new(connection).call(
    stream_name, events, options: options
  )
end

#listen(subscription, options: {}) ⇒ Object

Runs the persistent subscription indeinitely

Parameters:

Returns:

    • Nothing, it is a blocking operation, yields the given block with event instead



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/event_store_client/adapters/http/client.rb', line 111

def listen(subscription, options: {})
  loop do
    begin
      consume_feed(subscription) do |event|
        yield event if block_given?
      end
    rescue StandardError => e
      config.error_handler&.call(e)
    end
    sleep(options[:interval] || 5) # wait for events to be processed
  end
end

#read(stream_name, options: {}) ⇒ Object

Reads a page of events from the given stream

Parameters:

  • Stream (String)

    name to read events from

  • options (Hash) (defaults to: {})

    additional options to the request

Returns:

  • Dry::Monads::Result::Success with returned events or Dry::Monads::Result::Failure



49
50
51
# File 'lib/event_store_client/adapters/http/client.rb', line 49

def read(stream_name, options: {})
  Commands::Streams::Read.new(connection).call(stream_name, options: options)
end

#read_all_from_stream(stream_name, options: {}) ⇒ Object

Reads all events from the given stream

Parameters:

  • Stream (String)

    name to read events from

  • options (Hash) (defaults to: {})

    additional options to the request

Returns:

  • Dry::Monads::Result::Success with returned events or Dry::Monads::Result::Failure



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/event_store_client/adapters/http/client.rb', line 58

def read_all_from_stream(stream_name, options: {})
  start ||= options[:start] || 0
  count ||= options[:count] || 20
  events = []
  failed_requests_count = 0

  while failed_requests_count < 3
    res = read(stream_name, options: options.merge(start: start, count: count))
    if res.failure?
      failed_requests_count += 1
    else
      break if res.value!.empty?
      events += res.value!
      failed_requests_count = 0
      start += count
    end
  end
  return Failure(:connection_failed) if failed_requests_count >= 3

  Success(events)
end

#subscribe_to_stream(subscription, options: {}) ⇒ Object

Creates the subscription for the given stream

Parameters:

Returns:

  • Dry::Monads::Result::Success or Dry::Monads::Result::Failure



85
86
87
88
89
90
91
92
# File 'lib/event_store_client/adapters/http/client.rb', line 85

def subscribe_to_stream(subscription, options: {})
  join_streams(subscription.name, subscription.observed_streams)
  Commands::PersistentSubscriptions::Create.new(connection).call(
    subscription.stream,
    subscription.name,
    options: options
  )
end

#tombstone_stream(stream_name, options: {}) ⇒ Object

Completely removes the given stream

Parameters:

  • Stream (String)

    name to delete

  • options (Hash) (defaults to: {})

    additional options to the request

Returns:

  • Dry::Monads::Result::Success or Dry::Monads::Result::Failure



38
39
40
41
42
# File 'lib/event_store_client/adapters/http/client.rb', line 38

def tombstone_stream(stream_name, options: {})
  Commands::Streams::Tombstone.new(connection).call(
    stream_name, options: options
  )
end