Module: CPEE::Notifications

Defined in:
lib/cpee/implementation_notifications.rb

Defined Under Namespace

Classes: CreateSubscription, DeleteSubscription, Overview, SSE, Subscription, Subscriptions, Topics, UpdateSubscription

Class Method Summary collapse

Class Method Details

.implementation(id, opts) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/cpee/implementation_notifications.rb', line 21

def self::implementation(id,opts)
  Proc.new do
    if CPEE::Persistence::exists?(id,opts)
      on resource "notifications" do
        run CPEE::Notifications::Overview if get
        on resource "topics" do
          run CPEE::Notifications::Topics, opts if get
        end
        on resource "subscriptions" do
          run CPEE::Notifications::Subscriptions, id, opts if get
          run CPEE::Notifications::CreateSubscription, id, opts if post 'create_subscription'
          on resource do
            run CPEE::Notifications::Subscription, id, opts if get
            run CPEE::Notifications::UpdateSubscription, id, opts if put 'change_subscription'
            run CPEE::Notifications::DeleteSubscription, id, opts if delete
            on resource 'sse' do
              run CPEE::Notifications::SSE, id, opts if sse
            end
          end
        end
      end
    else
      run CPEE::FAIL
    end
  end
end

.sse_distributor(opts) ⇒ Object

}}}



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/cpee/implementation_notifications.rb', line 186

def self::sse_distributor(opts) #{{{
  conn = opts[:redis_dyn].call "Server SSE"
  conn.psubscribe('forward:*','event:state/change') do |on|
    on.pmessage do |pat, what, message|
      if pat == 'forward:*'
        id, key = what.match(/forward:([^\/]+)\/(.+)/).captures
        if sse = opts.dig(:sse_connections,id,key)
          sse.send message
        else
          DeleteSubscription::set(id,opts,key)
        end
      elsif pat == 'event:state/change'
        mess = JSON.parse(message[message.index(' ')+1..-1])
        state = mess.dig('content','state')
        if state == 'finished' || state == 'abandoned'
          opts.dig(:sse_connections,mess.dig('instance').to_s)&.each do |key,sse|
            EM.add_timer(10) do # just to be sure that all messages arrived. 10 seconds should be enough ... we think ... therefore we are (not sure)
              sse.close
            end
          end
        end
      end
    end
  end
  conn.close
end

.sse_heartbeat(opts) ⇒ Object

}}}



212
213
214
215
216
217
218
# File 'lib/cpee/implementation_notifications.rb', line 212

def self::sse_heartbeat(opts) #{{{
  opts.dig(:sse_connections).each do |id,keys|
    keys.each do |key,sse|
      sse.send_with_id('heartbeat', '42') unless sse&.closed?
    end
  end
end