Class: EventStoreClient::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/event_store_client/connection.rb

Instance Method Summary collapse

Instance Method Details

#consume_feed(stream, subscription) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/event_store_client/connection.rb', line 34

def consume_feed(stream, subscription)
  response = client.consume_feed(stream, subscription)
  return [] unless response.body
  body = JSON.parse(response.body)
  ack_uri =
    body['links'].find { |link| link['relation'] == 'ackAll' }.
      try(:[], 'uri')
  events = body['entries'].map do |entry|
    event = EventStoreClient::Event.new(
      id: entry['eventId'],
      type: entry['eventType'],
      data: entry['data'] || '{}',
      metadata: entry['isMetaData'] ? entry['metaData'] : '{}'
    )
    mapper.deserialize(event)
  end
  client.ack(ack_uri)
  events
end

#delete_stream(stream) ⇒ Object



28
# File 'lib/event_store_client/connection.rb', line 28

def delete_stream(stream); end

#publish(stream:, events:, expected_version: nil) ⇒ Object



5
6
7
8
9
10
11
# File 'lib/event_store_client/connection.rb', line 5

def publish(stream:, events:, expected_version: nil)
  serialized_events = events.map { |event| mapper.serialize(event) }
  client.append_to_stream(
    stream, serialized_events, expected_version: expected_version
  )
  serialized_events
end

#read(stream, direction: 'forward') ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/event_store_client/connection.rb', line 13

def read(stream, direction: 'forward')
  response =
    client.read(stream, start: 0, direction: direction)
  return [] unless response.body&.present?
  JSON.parse(response.body)['entries'].map do |entry|
    event = EventStoreClient::Event.new(
      id: entry['eventId'],
      type: entry['eventType'],
      data: entry['data'],
      metadata: entry['metaData']
    )
    mapper.deserialize(event)
  end
end

#subscribe(stream, name:) ⇒ Object



30
31
32
# File 'lib/event_store_client/connection.rb', line 30

def subscribe(stream, name:)
  client.subscribe_to_stream(stream, name)
end