Module: CPEE::Notifications

Defined in:
lib/cpee/implementation_notifications.rb

Defined Under Namespace

Classes: CreateSubscription, DeleteSubscription, Overview, SSE, Subscription, Subscriptions, Topics, UpdateSubscription

Class Method Summary collapse

Class Method Details

.implementation(id, opts) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/cpee/implementation_notifications.rb', line 6

def self::implementation(id,opts)
  Proc.new do
    on resource "notifications" do
      run CPEE::Notifications::Overview if get
      on resource "topics" do
        run CPEE::Notifications::Topics, opts if get
      end
      on resource "subscriptions" do
        run CPEE::Notifications::Subscriptions, id, opts if get
        run CPEE::Notifications::CreateSubscription, id, opts if post 'subscribe'
        on resource do
          run CPEE::Notifications::Subscription, id, opts if get
          run CPEE::Notifications::UpdateSubscription, id, opts if put 'subscribe'
          run CPEE::Notifications::DeleteSubscription, id, opts if delete
          on resource 'sse' do
            run CPEE::Notifications::SSE, id, opts if sse
          end
        end
      end
    end
  end
end

.sse_distributor(opts) ⇒ Object

}}}



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/cpee/implementation_notifications.rb', line 167

def self::sse_distributor(opts) #{{{
  conn = opts[:redis_dyn].call
  conn.psubscribe('forward:*','event:state/change') do |on|
    on.pmessage do |pat, what, message|
      if pat == 'forward:*'
        _, id, key = what.match(/forward(-end)?:([^\/]+)\/(.+)/).captures
        opts.dig(:sse_connections,id.to_i,key)&.send message
      elsif pat == 'event:state/change'
        mess = JSON.parse(message[message.index(' ')+1..-1])
        state = mess.dig('content','state')
        if state == 'finished' || state == 'abandoned'
          opts.dig(:sse_connections,mess.dig('instance').to_i)&.each do |key,sse|
            EM.add_timer(10) do # just to be sure that all messages arrived. 10 seconds should be enough ... we think ... therefore we are (not sure)
              sse.close
            end
          end
        end
      end
    end
  end
  conn.close
end

.sse_heartbeat(opts) ⇒ Object

}}}



189
190
191
192
193
194
195
# File 'lib/cpee/implementation_notifications.rb', line 189

def self::sse_heartbeat(opts) #{{{
  opts.dig(:sse_connections).each do |id,keys|
    keys.each do |key,sse|
      sse.send_with_id('heartbeat', '42') unless sse&.closed?
    end
  end
end