Class: Gcpc::Interceptors::Subscriber::CheckOrderInterceptor

Inherits:
Subscriber::BaseInterceptor
  • Object
show all
Defined in:
lib/gcpc/interceptors/subscriber/check_order_interceptor.rb

Overview

‘CheckOrderInterceptor` checks the order of messages in each group.

Defined Under Namespace

Classes: BaseStrategy

Instance Method Summary collapse

Constructor Details

#initialize(store:, logger: Logger.new(STDOUT), strategy: BaseStrategy.new) ⇒ CheckOrderInterceptor

Returns a new instance of CheckOrderInterceptor.

Parameters:

  • store (#get, #set)
  • logger (Logger) (defaults to: Logger.new(STDOUT))
  • strategy (BaseStrategy) (defaults to: BaseStrategy.new)


59
60
61
62
63
# File 'lib/gcpc/interceptors/subscriber/check_order_interceptor.rb', line 59

def initialize(store:, logger: Logger.new(STDOUT), strategy: BaseStrategy.new)
  @store    = store
  @logger   = logger
  @strategy = strategy
end

Instance Method Details

#handle(data, attributes, message, &block) {|data, attributes, message| ... } ⇒ Object

Parameters:

  • data (String)
  • attributes (Hash)
  • message (Google::Cloud::Pubsub::ReceivedMessage)
  • block (Proc)

Yields:

  • (data, attributes, message)


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/gcpc/interceptors/subscriber/check_order_interceptor.rb', line 69

def handle(data, attributes, message, &block)
  timestamp = @strategy.timestamp(data, attributes, message)

  if timestamp.nil?
    yield data, attributes, message
    return
  end

  group_id = @strategy.group_id(data, attributes, message)
  if swapped?(group_id, timestamp)
    @strategy.on_swapped(data, attributes, message, &block)
    return
  end

  yield data, attributes, message
end