Class: Magellan::Gcs::Proxy::PubsubSubscription

Inherits:
Object
  • Object
show all
Defined in:
lib/magellan/gcs/proxy/pubsub_subscription.rb

Defined Under Namespace

Classes: MessageWrapper

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, delay: 1) ⇒ PubsubSubscription

Returns a new instance of PubsubSubscription.



9
10
11
12
# File 'lib/magellan/gcs/proxy/pubsub_subscription.rb', line 9

def initialize(name, delay: 1)
  @name = name
  @delay = delay
end

Instance Attribute Details

#delayObject (readonly)

Returns the value of attribute delay.



8
9
10
# File 'lib/magellan/gcs/proxy/pubsub_subscription.rb', line 8

def delay
  @delay
end

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/magellan/gcs/proxy/pubsub_subscription.rb', line 8

def name
  @name
end

Instance Method Details

#listenObject



14
15
16
17
18
19
20
21
22
# File 'lib/magellan/gcs/proxy/pubsub_subscription.rb', line 14

def listen
  loop do
    if msg = wait_for_message
      yield msg
    else
      sleep delay
    end
  end
end

#pull_reqObject



24
25
26
# File 'lib/magellan/gcs/proxy/pubsub_subscription.rb', line 24

def pull_req
  @pull_req ||= Google::Apis::PubsubV1::PullRequest.new(max_messages: 1, return_immediately: true)
end

#wait_for_messageObject



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/magellan/gcs/proxy/pubsub_subscription.rb', line 28

def wait_for_message
  # #<Google::Apis::PubsubV1::ReceivedMessage:0x007fdc440b58d8
  #   @ack_id="...",
  #   @message=#<Google::Apis::PubsubV1::Message:0x007fdc440be140
  #     @attributes={"download_files"=>"[\"gs://bucket1/path/to/file1\"]"},
  #     @message_id="50414480536440",
  #     @publish_time="2016-12-17T08:08:35.599Z">>
  res = GCP.pubsub.pull_subscription(name, pull_req)
  msg = (res.received_messages || []).first
  msg.nil? ? nil : MessageWrapper.new(self, msg)
end