Class: RubyEventStore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/client.rb

Defined Under Namespace

Classes: Within

Instance Method Summary collapse

Constructor Details

#initialize(repository:, mapper: Mappers::Default.new, subscriptions: Subscriptions.new, dispatcher: Dispatcher.new, clock: default_clock, correlation_id_generator: default_correlation_id_generator) ⇒ Client

Returns a new instance of Client.


7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/ruby_event_store/client.rb', line 7

def initialize(repository:,
               mapper: Mappers::Default.new,
               subscriptions: Subscriptions.new,
               dispatcher: Dispatcher.new,
               clock: default_clock,
               correlation_id_generator: default_correlation_id_generator)


  @repository     = repository
  @mapper         = mapper
  @subscriptions  = subscriptions
  @broker         = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
  @clock          = clock
  @metadata       = Concurrent::ThreadLocalVar.new
  @correlation_id_generator = correlation_id_generator
end

Instance Method Details

#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists new event(s) without notifying any subscribed handlers

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)

50
51
52
53
54
55
56
57
# File 'lib/ruby_event_store/client.rb', line 50

def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  append_records_to_stream(
    transform((events)),
    stream_name: stream_name,
    expected_version: expected_version
  )
  self
end

#delete_stream(stream_name) ⇒ self

Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.

Parameters:

  • stream_name (String)

    name of the stream to be cleared.

Returns:

  • (self)

77
78
79
80
# File 'lib/ruby_event_store/client.rb', line 77

def delete_stream(stream_name)
  repository.delete_stream(Stream.new(stream_name))
  self
end

#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event

Deserialize event which was serialized for async event handlers Read more

Returns:

  • (Event)

    deserialized event


267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/ruby_event_store/client.rb', line 267

def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
  extract_timestamp = lambda do |m|
    (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601
  end

  mapper.record_to_event(
    SerializedRecord.new(
      event_type: event_type,
      event_id:   event_id,
      data:       data,
      metadata:   ,
      timestamp:  timestamp || timestamp_ = extract_timestamp[serializer.load()],
      valid_at:   valid_at  || timestamp_,
    ).deserialize(serializer)
  )
end

#global_position(event_id) ⇒ Integer

Gets position of the event in global stream

The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.

Parameters:

  • event_id (String)

Returns:

  • (Integer)

    nonnegno ative integer position of event in global stream

Raises:


120
121
122
# File 'lib/ruby_event_store/client.rb', line 120

def global_position(event_id)
  repository.global_position(event_id)
end

#inspectObject


326
327
328
# File 'lib/ruby_event_store/client.rb', line 326

def inspect
  "#<#{self.class}:0x#{__id__.to_s(16)}>"
end

Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.

Parameters:

  • event_ids (String, Array<String>)

    ids of events

  • stream_name (String)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)

66
67
68
69
# File 'lib/ruby_event_store/client.rb', line 66

def link(event_ids, stream_name:, expected_version: :any)
  repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version))
  self
end

#metadataHash

Read additional metadata which will be added for published events Read more

Returns:

  • (Hash)

288
289
290
# File 'lib/ruby_event_store/client.rb', line 288

def 
  @metadata.value || EMPTY_HASH
end

#overwrite(events_or_event) ⇒ self

Overwrite existing event(s) with the same ID.

Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more

Examples:

Add data and metadata to existing events


events = event_store.read.limit(10).to_a
events.each do |ev|
  ev.data[:tenant_id] = 1
  ev.[:server_id] = "eu-west-2"
end
event_store.overwrite(events)

Change event type


events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
  NewType.new(
    event_id: ev.event_id,
    data: ev.data,
    metadata: ev.,
  )
end
event_store.overwrite(events)

Parameters:

  • events (Array<Event>, Event)

    event(s) to serialize and overwrite again

Returns:

  • (self)

321
322
323
324
# File 'lib/ruby_event_store/client.rb', line 321

def overwrite(events_or_event)
  repository.update_messages(transform(Array(events_or_event)))
  self
end

#position_in_stream(event_id, stream_name) ⇒ Integer

Gets position of the event in given stream

The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.

Parameters:

  • event_id (String)
  • stream_name (String)

Returns:

  • (Integer)

    nonnegative integer position of event in stream

Raises:

  • (EventNotInStream)

107
108
109
# File 'lib/ruby_event_store/client.rb', line 107

def position_in_stream(event_id, stream_name)
  repository.position_in_stream(event_id, Stream.new(stream_name))
end

#publish(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists events and notifies subscribed handlers about them

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. / Read more

Returns:

  • (self)

31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/ruby_event_store/client.rb', line 31

def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  enriched_events = (events)
  records         = transform(enriched_events)
  append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
  enriched_events.zip(records) do |event, record|
    (
      correlation_id: event..fetch(:correlation_id),
      causation_id:   event.event_id,
    ) do
      broker.(event, record)
    end
  end
  self
end

#readSpecification

Starts building a query specification for reading events. / More info.

Returns:


86
87
88
# File 'lib/ruby_event_store/client.rb', line 86

def read
  Specification.new(SpecificationReader.new(repository, mapper))
end

#streams_of(event_id) ⇒ Array<Stream>

Gets list of streams where event is stored or linked

Returns:

  • (Array<Stream>)

    where event is stored or linked


93
94
95
# File 'lib/ruby_event_store/client.rb', line 93

def streams_of(event_id)
  repository.streams_of(event_id)
end

#subscribe(subscriber, to: ) ⇒ Proc #subscribe(to: , &subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for published events of provided type.

Overloads:

  • #subscribe(subscriber, to: ) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe(to: , &subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>) (defaults to: )

      types of events to subscribe

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)

136
137
138
139
140
# File 'lib/ruby_event_store/client.rb', line 136

def subscribe(subscriber = nil, to:, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  subscriber ||= proc
  broker.add_subscription(subscriber, to)
end

#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for all published events

Overloads:

  • #subscribe_to_all_events(subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe_to_all_events(&subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)

152
153
154
155
# File 'lib/ruby_event_store/client.rb', line 152

def subscribe_to_all_events(subscriber = nil, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  broker.add_global_subscription(subscriber || proc)
end

#subscribers_for(event_class) ⇒ Array<Object, Class>

Get list of handlers subscribed to an event

Parameters:

  • to (Class, String)

    type of events to get list of sybscribed handlers

Returns:

  • (Array<Object, Class>)

161
162
163
# File 'lib/ruby_event_store/client.rb', line 161

def subscribers_for(event_class)
  subscriptions.all_for(event_type_resolver.call(event_class))
end

#with_metadata(metadata, &block) ⇒ Object

Set additional metadata for all events published within the provided block Read more

Parameters:

  • metadata (Hash)

    metadata to set for events

  • block (Proc)

    block of code during which the metadata will be added

Returns:

  • (Object)

    last value returned by the provided block


255
256
257
258
259
260
261
# File 'lib/ruby_event_store/client.rb', line 255

def (, &block)
   = ()
  self. = .merge()
  block.call if block_given?
ensure
  self. = 
end

#within(&block) ⇒ Within

Use for starting temporary subscriptions. Read more

Parameters:

  • block (Proc)

    block of code during which the temporary subscriptions will be active

Returns:

  • (Within)

    builder object which collects temporary subscriptions

Raises:

  • (ArgumentError)

244
245
246
247
# File 'lib/ruby_event_store/client.rb', line 244

def within(&block)
  raise ArgumentError if block.nil?
  Within.new(block, broker)
end