Class: Rubyists::Dapr::Service::Subscriber
- Inherits:
-
Object
- Object
- Rubyists::Dapr::Service::Subscriber
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/dapr/service/subscriber.rb
Overview
The Subscriber class is a simple implementation of a Dapr subscriber service.
Instance Attribute Summary collapse
-
#handler ⇒ Object
readonly
Returns the value of attribute handler.
-
#pubsub_name ⇒ Object
readonly
Returns the value of attribute pubsub_name.
-
#runtime_proto ⇒ Object
readonly
Returns the value of attribute runtime_proto.
-
#service_proto ⇒ Object
readonly
Returns the value of attribute service_proto.
-
#topics ⇒ Object
readonly
Returns the value of attribute topics.
Instance Method Summary collapse
- #handle_event!(topic_event, topic_call) ⇒ Object
-
#initialize(pubsub_name:, topics:, handler: nil, service_proto: ::Dapr::Proto::Runtime::V1::AppCallback::Service, runtime_proto: ::Dapr::Proto::Runtime::V1) ⇒ Subscriber
constructor
Create a new Subscriber instance.
-
#start!(grpc_port: nil, listen_address: '0.0.0.0') ⇒ Subscriber
Start the subscriber service.
-
#subscriptions ⇒ Array
The list of subscriptions for the Subscriber.
Constructor Details
#initialize(pubsub_name:, topics:, handler: nil, service_proto: ::Dapr::Proto::Runtime::V1::AppCallback::Service, runtime_proto: ::Dapr::Proto::Runtime::V1) ⇒ Subscriber
Create a new Subscriber instance.
25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/dapr/service/subscriber.rb', line 25 def initialize(pubsub_name:, topics:, handler: nil, service_proto: ::Dapr::Proto::Runtime::V1::AppCallback::Service, runtime_proto: ::Dapr::Proto::Runtime::V1) @topics = Array(topics) @pubsub_name = pubsub_name @service_proto = service_proto @runtime_proto = runtime_proto @handler = handler end |
Instance Attribute Details
#handler ⇒ Object (readonly)
Returns the value of attribute handler.
14 15 16 |
# File 'lib/dapr/service/subscriber.rb', line 14 def handler @handler end |
#pubsub_name ⇒ Object (readonly)
Returns the value of attribute pubsub_name.
14 15 16 |
# File 'lib/dapr/service/subscriber.rb', line 14 def pubsub_name @pubsub_name end |
#runtime_proto ⇒ Object (readonly)
Returns the value of attribute runtime_proto.
14 15 16 |
# File 'lib/dapr/service/subscriber.rb', line 14 def runtime_proto @runtime_proto end |
#service_proto ⇒ Object (readonly)
Returns the value of attribute service_proto.
14 15 16 |
# File 'lib/dapr/service/subscriber.rb', line 14 def service_proto @service_proto end |
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
14 15 16 |
# File 'lib/dapr/service/subscriber.rb', line 14 def topics @topics end |
Instance Method Details
#handle_event!(topic_event, topic_call) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/dapr/service/subscriber.rb', line 58 def handle_event!(topic_event, topic_call) return handler&.call(topic_event, topic_call) if handler.respond_to?(:call) logger.warn('Unhandled event: event handler does not respond to #call', topic_event:, topic_call:, handler:) end |
#start!(grpc_port: nil, listen_address: '0.0.0.0') ⇒ Subscriber
if grpc_port is not provided, the service will listen on the port returned by the #port method.
the service will listen on all interfaces by default.
Start the subscriber service. This method will block until the service is terminated. The service will listen on the specified port and address.
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/dapr/service/subscriber.rb', line 47 def start!(grpc_port: nil, listen_address: '0.0.0.0') server = GRPC::RpcServer.new grpc_port ||= port listener = "#{listen_address}:#{grpc_port}" server.add_http2_port(listener, :this_port_is_insecure) server.handle(service) logger.warn('Starting Dapr Subscriber service', listen_address:, grpc_port:) server.run_till_terminated_or_interrupted([1, +'int', +'SIGQUIT']) self end |
#subscriptions ⇒ Array
Returns The list of subscriptions for the Subscriber.
68 69 70 71 72 |
# File 'lib/dapr/service/subscriber.rb', line 68 def subscriptions @subscriptions ||= topics.map do |topic| runtime_proto::TopicSubscription.new(pubsub_name:, topic:) end end |