Module: CPEE::Notifications

Defined in:
lib/cpee/implementation_notifications.rb

Defined Under Namespace

Classes: CreateSubscription, DeleteSubscription, Overview, SSE, Subscription, Subscriptions, Topics, UpdateSubscription

Class Method Summary collapse

Class Method Details

.implementation(id, opts) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/cpee/implementation_notifications.rb', line 20

def self::implementation(id,opts)
  Proc.new do
    on resource "notifications" do
      run CPEE::Notifications::Overview if get
      on resource "topics" do
        run CPEE::Notifications::Topics, opts if get
      end
      on resource "subscriptions" do
        run CPEE::Notifications::Subscriptions, id, opts if get
        run CPEE::Notifications::CreateSubscription, id, opts if post 'subscribe'
        on resource do
          run CPEE::Notifications::Subscription, id, opts if get
          run CPEE::Notifications::UpdateSubscription, id, opts if put 'subscribe'
          run CPEE::Notifications::DeleteSubscription, id, opts if delete
          on resource 'sse' do
            run CPEE::Notifications::SSE, id, opts if sse
          end
        end
      end
    end
  end
end

.sse_distributor(opts) ⇒ Object

}}}



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/cpee/implementation_notifications.rb', line 181

def self::sse_distributor(opts) #{{{
  conn = opts[:redis_dyn].call
  conn.psubscribe('forward:*','event:state/change') do |on|
    on.pmessage do |pat, what, message|
      if pat == 'forward:*'
        id, key = what.match(/forward:([^\/]+)\/(.+)/).captures
        if sse = opts.dig(:sse_connections,id.to_i,key)
          sse.send message
        else
          DeleteSubscription::set(id,opts,key)
        end
      elsif pat == 'event:state/change'
        mess = JSON.parse(message[message.index(' ')+1..-1])
        state = mess.dig('content','state')
        if state == 'finished' || state == 'abandoned'
          opts.dig(:sse_connections,mess.dig('instance').to_i)&.each do |key,sse|
            EM.add_timer(10) do # just to be sure that all messages arrived. 10 seconds should be enough ... we think ... therefore we are (not sure)
              sse.close
            end
          end
        end
      end
    end
  end
  conn.close
end

.sse_heartbeat(opts) ⇒ Object

}}}



207
208
209
210
211
212
213
# File 'lib/cpee/implementation_notifications.rb', line 207

def self::sse_heartbeat(opts) #{{{
  opts.dig(:sse_connections).each do |id,keys|
    keys.each do |key,sse|
      sse.send_with_id('heartbeat', '42') unless sse&.closed?
    end
  end
end