Class: CPEE::Controller

Inherits:
Object
  • Object
show all
Defined in:
lib/cpee/controller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, dir, opts) ⇒ Controller

Returns a new instance of Controller.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/cpee/controller.rb', line 37

def initialize(id,dir,opts)
  CPEE::redis_connect(opts)

  @redis = opts[:redis]
  @votes = []

  @id = id

  @attributes = {}
  @redis.keys("instance:#{id}/attributes/*").each do |key|
    @attributes[File.basename(key)] = @redis.get(key)
  end

  @attributes_helper = AttributesHelper.new
  @thread = nil
  @opts = opts
  @instance = nil
  @loop_guard = {}

  @callback_keys = {}
  @psredis = @opts[:redis_dyn].call

  Thread.new do
    @psredis.psubscribe('callback-response:*','callback-end:*') do |on|
      on.pmessage do |pat, what, message|
        if pat == 'callback-response:*' && @callback_keys.has_key?(what[18..-1])
          index = message.index(' ')
          mess = message[index+1..-1]
          instance = message[0...index]
          m = JSON.parse(mess)
          resp = []
          m['content']['values'].each do |e|
            if e[1][0] == 'simple'
              resp << Riddl::Parameter::Simple.new(e[0],e[1][1])
            elsif e[1][0] == 'complex'
              resp << Riddl::Parameter::Complex.new(e[0],e[1][1],File.open(e[1][2]))
            end
          end
          @callback_keys[what[18..-1]].send(:callback,resp,m['content']['headers'])
        end
        if pat == 'callback-end:*'
          @callback_keys.delete(what[13..-1])
        end
      end
    end
  end
end

Instance Attribute Details

#attributesObject (readonly)

Returns the value of attribute attributes.



86
87
88
# File 'lib/cpee/controller.rb', line 86

def attributes
  @attributes
end

#idObject (readonly)

Returns the value of attribute id.



85
86
87
# File 'lib/cpee/controller.rb', line 85

def id
  @id
end

#loop_guardObject (readonly)

Returns the value of attribute loop_guard.



87
88
89
# File 'lib/cpee/controller.rb', line 87

def loop_guard
  @loop_guard
end

Instance Method Details

#attributes_translatedObject



93
94
95
# File 'lib/cpee/controller.rb', line 93

def attributes_translated
  @attributes_helper.translate(attributes,dataelements,endpoints)
end

#baseObject



109
110
111
# File 'lib/cpee/controller.rb', line 109

def base
  base_url
end

#base_urlObject



100
101
102
# File 'lib/cpee/controller.rb', line 100

def base_url
  @opts[:url]
end

#callback(hw, key, content) ⇒ Object



189
190
191
192
# File 'lib/cpee/controller.rb', line 189

def callback(hw,key,content)
  CPEE::Message::send(:callback,'activity/content',base,@id,uuid,info,content.merge(:key => key),@redis)
  @callback_keys[key] = hw
end

#cancel_callback(key) ⇒ Object



194
195
196
# File 'lib/cpee/controller.rb', line 194

def cancel_callback(key)
  CPEE::Message::send(:'callback-end',key,base,@id,uuid,info,{},@redis)
end

#dataelementsObject



118
119
120
# File 'lib/cpee/controller.rb', line 118

def dataelements
  @instance.data
end

#endpointsObject



115
116
117
# File 'lib/cpee/controller.rb', line 115

def endpoints
  @instance.endpoints
end

#hostObject



97
98
99
# File 'lib/cpee/controller.rb', line 97

def host
  @opts[:host]
end

#infoObject



144
145
146
# File 'lib/cpee/controller.rb', line 144

def info
  @attributes['info']
end

#instance=(inst) ⇒ Object



112
113
114
# File 'lib/cpee/controller.rb', line 112

def instance=(inst)
  @instance = inst
end

#instance_idObject



106
107
108
# File 'lib/cpee/controller.rb', line 106

def instance_id
  @id
end

#instance_urlObject



103
104
105
# File 'lib/cpee/controller.rb', line 103

def instance_url
  File.join(@opts[:url].to_s,@id.to_s)
end

#notify(what, content = {}) ⇒ Object



148
149
150
151
# File 'lib/cpee/controller.rb', line 148

def notify(what,content={})
  content[:attributes] = attributes_translated
  CPEE::Message::send(:event,what,base,@id,uuid,info,content,@redis)
end

#startObject



122
123
124
125
126
127
128
129
130
# File 'lib/cpee/controller.rb', line 122

def start
  if vote("state/change", :state => 'running')
    @thread = @instance.start
    @thread.join
  else
    @thread = @instance.stop
    @thread.join
  end
end

#stopObject



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/cpee/controller.rb', line 132

def stop
  ### tell the instance to stop
  @instance.stop
  ### end all votes or it will not work
  Thread.new do # doing stuff in trap context is a nono. but in a thread its fine :-)
    @votes.each do |key|
      CPEE::Message::send(:'vote-response',key,base,@id,uuid,info,true,@redis)
    end
  end
  @thread.join if !@thread.nil? && @thread.alive?
end

#uuidObject



89
90
91
# File 'lib/cpee/controller.rb', line 89

def uuid
  @attributes['uuid']
end

#vote(what, content = {}) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/cpee/controller.rb', line 153

def vote(what,content={})
  topic, name = what.split('/')
  handler = File.join(topic,'vote',name)
  votes = []
  @redis.smembers("instance:#{id}/handlers/#{handler}").each do |client|
    voteid = Digest::MD5.hexdigest(Kernel::rand().to_s)
    content[:key] = voteid
    content[:attributes] = attributes_translated
    content[:subscription] = client
    votes << voteid
    CPEE::Message::send(:vote,what,base,@id,uuid,info,content,@redis)
  end

  if votes.length > 0
    @votes += votes
    psredis = @opts[:redis_dyn].call
    collect = []
    psredis.subscribe(votes.map{|e| ['vote-response:' + e.to_s, 'vote-end:' + e.to_s] }.flatten) do |on|
      on.message do |what, message|
        index = message.index(' ')
        mess = message[index+1..-1]
        m = JSON.parse(mess)
        collect << ((m['content'] == true || m['content'] == 'true') || false)
        @votes.delete m['name']
        cancel_callback m['name']
        if collect.length >= votes.length
          psredis.unsubscribe
        end
      end
    end
    !collect.include?(false)
  else
    true
  end
end