Class: Rubyists::Dapr::Service::Subscriber

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/dapr/service/subscriber.rb

Overview

The Subscriber class is a simple implementation of a Dapr subscriber service.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pubsub_name:, topics:, handler: nil, service_proto: ::Dapr::Proto::Runtime::V1::AppCallback::Service, runtime_proto: ::Dapr::Proto::Runtime::V1) ⇒ Subscriber

Create a new Subscriber instance.

Parameters:

  • pubsub_name (String)

    name of the pubsub component

  • topics (String|Array<String>)

    topic (or topics) to subscribe to

  • handler (Proc|Object) (defaults to: nil)

    handler to call when an event is received. Must respond to #call

  • service_proto (Class) (defaults to: ::Dapr::Proto::Runtime::V1::AppCallback::Service)

    Dapr Runtime Service Class to use for the subscriber

  • runtime_proto (Module) (defaults to: ::Dapr::Proto::Runtime::V1)

    Dapr Runtime Proto Module to use for the subscriber



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/dapr/service/subscriber.rb', line 25

def initialize(pubsub_name:,
               topics:,
               handler: nil,
               service_proto: ::Dapr::Proto::Runtime::V1::AppCallback::Service,
               runtime_proto: ::Dapr::Proto::Runtime::V1)
  @topics = Array(topics)
  @pubsub_name = pubsub_name
  @service_proto = service_proto
  @runtime_proto = runtime_proto
  @handler = handler
end

Instance Attribute Details

#handlerObject (readonly)

Returns the value of attribute handler.



14
15
16
# File 'lib/dapr/service/subscriber.rb', line 14

def handler
  @handler
end

#pubsub_nameObject (readonly)

Returns the value of attribute pubsub_name.



14
15
16
# File 'lib/dapr/service/subscriber.rb', line 14

def pubsub_name
  @pubsub_name
end

#runtime_protoObject (readonly)

Returns the value of attribute runtime_proto.



14
15
16
# File 'lib/dapr/service/subscriber.rb', line 14

def runtime_proto
  @runtime_proto
end

#service_protoObject (readonly)

Returns the value of attribute service_proto.



14
15
16
# File 'lib/dapr/service/subscriber.rb', line 14

def service_proto
  @service_proto
end

#topicsObject (readonly)

Returns the value of attribute topics.



14
15
16
# File 'lib/dapr/service/subscriber.rb', line 14

def topics
  @topics
end

Instance Method Details

#handle_event!(topic_event, topic_call) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/dapr/service/subscriber.rb', line 58

def handle_event!(topic_event, topic_call)
  return handler&.call(topic_event, topic_call) if handler.respond_to?(:call)

  logger.warn('Unhandled event: event handler does not respond to #call',
              topic_event:,
              topic_call:,
              handler:)
end

#start!(grpc_port: nil, listen_address: '0.0.0.0') ⇒ Subscriber

Note:

if grpc_port is not provided, the service will listen on the port returned by the #port method.

Note:

the service will listen on all interfaces by default.

Start the subscriber service. This method will block until the service is terminated. The service will listen on the specified port and address.

Parameters:

  • grpc_port (Integer) (defaults to: nil)

    the port to listen on

  • listen_address (String) (defaults to: '0.0.0.0')

    the address to listen on

Returns:



47
48
49
50
51
52
53
54
55
56
# File 'lib/dapr/service/subscriber.rb', line 47

def start!(grpc_port: nil, listen_address: '0.0.0.0')
  server = GRPC::RpcServer.new
  grpc_port ||= port
  listener = "#{listen_address}:#{grpc_port}"
  server.add_http2_port(listener, :this_port_is_insecure)
  server.handle(service)
  logger.warn('Starting Dapr Subscriber service', listen_address:, grpc_port:)
  server.run_till_terminated_or_interrupted([1, +'int', +'SIGQUIT'])
  self
end

#subscriptionsArray

Returns The list of subscriptions for the Subscriber.

Returns:

  • (Array)

    The list of subscriptions for the Subscriber



68
69
70
71
72
# File 'lib/dapr/service/subscriber.rb', line 68

def subscriptions
  @subscriptions ||= topics.map do |topic|
    runtime_proto::TopicSubscription.new(pubsub_name:, topic:)
  end
end