Class: RubyEventStore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/client.rb

Defined Under Namespace

Classes: Within

Instance Method Summary collapse

Constructor Details

#initialize(repository:, mapper: Mappers::Default.new, subscriptions: Subscriptions.new, dispatcher: Dispatcher.new, clock: default_clock, correlation_id_generator: default_correlation_id_generator) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/ruby_event_store/client.rb', line 7

def initialize(repository:,
               mapper: Mappers::Default.new,
               subscriptions: Subscriptions.new,
               dispatcher: Dispatcher.new,
               clock: default_clock,
               correlation_id_generator: default_correlation_id_generator)


  @repository     = repository
  @mapper         = mapper
  @subscriptions  = subscriptions
  @broker         = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
  @clock          = clock
  @metadata       = Concurrent::ThreadLocalVar.new
  @correlation_id_generator = correlation_id_generator
end

Instance Method Details

#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists new event(s) without notifying any subscribed handlers

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


50
51
52
53
54
55
56
57
# File 'lib/ruby_event_store/client.rb', line 50

def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  append_records_to_stream(
    transform((events)),
    stream_name: stream_name,
    expected_version: expected_version
  )
  self
end

#delete_stream(stream_name) ⇒ self

Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.

Parameters:

  • stream_name (String)

    name of the stream to be cleared.

Returns:

  • (self)


77
78
79
80
# File 'lib/ruby_event_store/client.rb', line 77

def delete_stream(stream_name)
  repository.delete_stream(Stream.new(stream_name))
  self
end

#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event

Deserialize event which was serialized for async event handlers Read more

Returns:

  • (Event)

    deserialized event



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/ruby_event_store/client.rb', line 240

def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
  extract_timestamp = lambda do |m|
    (m[:timestamp] || Time.parse(m.fetch('timestamp'))).iso8601
  end

  mapper.record_to_event(
    SerializedRecord.new(
      event_type: event_type,
      event_id:   event_id,
      data:       data,
      metadata:   ,
      timestamp:  timestamp || timestamp_ = extract_timestamp[serializer.load()],
      valid_at:   valid_at  || timestamp_,
    ).deserialize(serializer)
  )
end

#inspectObject



299
300
301
# File 'lib/ruby_event_store/client.rb', line 299

def inspect
  "#<#{self.class}:0x#{__id__.to_s(16)}>"
end

Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.

Parameters:

  • event_ids (String, Array<String>)

    ids of events

  • stream_name (String)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


66
67
68
69
# File 'lib/ruby_event_store/client.rb', line 66

def link(event_ids, stream_name:, expected_version: :any)
  repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version))
  self
end

#metadataHash

Read additional metadata which will be added for published events Read more

Returns:

  • (Hash)


261
262
263
# File 'lib/ruby_event_store/client.rb', line 261

def 
  @metadata.value || EMPTY_HASH
end

#overwrite(events_or_event) ⇒ self

Overwrite existing event(s) with the same ID.

Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more

Examples:

Add data and metadata to existing events


events = event_store.read.limit(10).to_a
events.each do |ev|
  ev.data[:tenant_id] = 1
  ev.[:server_id] = "eu-west-2"
end
event_store.overwrite(events)

Change event type


events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
  NewType.new(
    event_id: ev.event_id,
    data: ev.data,
    metadata: ev.,
  )
end
event_store.overwrite(events)

Parameters:

  • events (Array<Event>, Event)

    event(s) to serialize and overwrite again

Returns:

  • (self)


294
295
296
297
# File 'lib/ruby_event_store/client.rb', line 294

def overwrite(events_or_event)
  repository.update_messages(transform(Array(events_or_event)))
  self
end

#publish(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists events and notifies subscribed handlers about them

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/ruby_event_store/client.rb', line 31

def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  enriched_events = (events)
  records         = transform(enriched_events)
  append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
  enriched_events.zip(records) do |event, record|
    (
      correlation_id: event..fetch(:correlation_id),
      causation_id:   event.event_id,
    ) do
      broker.(event, record)
    end
  end
  self
end

#readSpecification

Starts building a query specification for reading events. / More info.

Returns:



86
87
88
# File 'lib/ruby_event_store/client.rb', line 86

def read
  Specification.new(SpecificationReader.new(repository, mapper))
end

#streams_of(event_id) ⇒ Array<Stream>

Gets list of streams where event is stored or linked

Returns:

  • (Array<Stream>)

    where event is stored or linked



93
94
95
# File 'lib/ruby_event_store/client.rb', line 93

def streams_of(event_id)
  repository.streams_of(event_id)
end

#subscribe(subscriber, to: ) ⇒ Proc #subscribe(to: , &subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for published events of provided type.

Overloads:

  • #subscribe(subscriber, to: ) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe(to: , &subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


109
110
111
112
113
# File 'lib/ruby_event_store/client.rb', line 109

def subscribe(subscriber = nil, to:, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  subscriber ||= proc
  broker.add_subscription(subscriber, to)
end

#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for all published events

Overloads:

  • #subscribe_to_all_events(subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe_to_all_events(&subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


125
126
127
128
# File 'lib/ruby_event_store/client.rb', line 125

def subscribe_to_all_events(subscriber = nil, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  broker.add_global_subscription(subscriber || proc)
end

#subscribers_for(event_class) ⇒ Array<Object, Class>

Get list of handlers subscribed to an event

Parameters:

  • to (Class, String)

    type of events to get list of sybscribed handlers

Returns:

  • (Array<Object, Class>)


134
135
136
# File 'lib/ruby_event_store/client.rb', line 134

def subscribers_for(event_class)
  subscriptions.all_for(event_type_resolver.call(event_class))
end

#with_metadata(metadata, &block) ⇒ Object

Set additional metadata for all events published within the provided block Read more

Parameters:

  • metadata (Hash)

    metadata to set for events

  • block (Proc)

    block of code during which the metadata will be added

Returns:

  • (Object)

    last value returned by the provided block



228
229
230
231
232
233
234
# File 'lib/ruby_event_store/client.rb', line 228

def (, &block)
   = ()
  self. = .merge()
  block.call if block_given?
ensure
  self. = 
end

#within(&block) ⇒ Within

Use for starting temporary subscriptions. Read more

Parameters:

  • block (Proc)

    block of code during which the temporary subscriptions will be active

Returns:

  • (Within)

    builder object which collects temporary subscriptions

Raises:

  • (ArgumentError)


217
218
219
220
# File 'lib/ruby_event_store/client.rb', line 217

def within(&block)
  raise ArgumentError if block.nil?
  Within.new(block, broker)
end