Class: PubControl

Inherits:
Object
  • Object
show all
Defined in:
lib/pubcontrol.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri) ⇒ PubControl

Returns a new instance of PubControl.



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/pubcontrol.rb', line 20

def initialize(uri)
  @uri = uri
  @lock = Mutex.new
  @thread = nil
  @thread_cond = nil
  @thread_mutex = nil
  @req_queue = Containers::Deque.new
  @auth_basic_user = nil
  @auth_basic_pass = nil
  @auth_jwt_claim = nil
  @auth_jwt_key = nil
end

Instance Attribute Details

#req_queueObject

Returns the value of attribute req_queue.



18
19
20
# File 'lib/pubcontrol.rb', line 18

def req_queue
  @req_queue
end

Class Method Details

.pubbatch(reqs) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/pubcontrol.rb', line 132

def self.pubbatch(reqs)
  raise 'reqs length == 0' unless reqs.length > 0
  uri = reqs[0][0]
  auth_header = reqs[0][1]
  items = Array.new
  callbacks = Array.new
  reqs.each do |req|
    items.push(req[2])
    callbacks.push(req[3])
  end
  begin
    PubControl.pubcall(uri, auth_header, items)
    result = [true, '']
  rescue => e
    result = [false, e.message]
  end
  callbacks.each do |callback|
    if !callback.nil?
      callback.call(result[0], result[1])
    end
  end
end

.pubcall(uri, auth_header, items) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/pubcontrol.rb', line 109

def self.pubcall(uri, auth_header, items)
  uri = URI(uri + '/publish/')
  content = Hash.new
  content['items'] = items
  # REVIEW: POST implementation
  request = Net::HTTP::Post.new(uri.path)
  # REVIEW: Ruby strings are unicode by default
  # is forcing the encoding to UTF-8 necessary?
  request.body = content.to_json.force_encoding('UTF-8')
  if !auth_header.nil?
    request['Authorization'] = auth_header
  end
  request['Content-Type'] = 'application/json'
  response = Net::HTTP.start(uri.host, use_ssl: true) do |http|
    http.request(request)
  end
  # REVIEW: HTTPSuccess does not include 3xx status codes.
  if !response.kind_of? Net::HTTPSuccess
    raise 'failed to publish: ' + response.class.to_s + ' ' +
        response.message
  end
end

.timestamp_utcnowObject



155
156
157
158
# File 'lib/pubcontrol.rb', line 155

def self.timestamp_utcnow
  # REVIEW: gmtime Ruby implementation
  return Time.now.utc.to_i
end

Instance Method Details

#finishObject



72
73
74
75
76
77
78
79
80
# File 'lib/pubcontrol.rb', line 72

def finish
  @lock.synchronize do
    if !@thread.nil?
      queue_req(['stop'])
      @thread.join
      @thread = nil
    end
  end
end

#publish(channel, item) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/pubcontrol.rb', line 47

def publish(channel, item)
  export = item.export
  export['channel'] = channel
  uri = nil
  auth = nil
  @lock.synchronize do
    uri = @uri
    auth = gen_auth_header
  end
  PubControl.pubcall(uri, auth, [export])
end

#publish_async(channel, item, callback = nil) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/pubcontrol.rb', line 59

def publish_async(channel, item, callback=nil)
  export = item.export
  export['channel'] = channel
  uri = nil
  auth = nil
  @lock.synchronize do
    uri = @uri
    auth = gen_auth_header
    ensure_thread
  end
  queue_req(['pub', uri, auth, export, callback])
end

#pubworkerObject



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/pubcontrol.rb', line 82

def pubworker
  quit = false
  while !quit do
    @thread_mutex.lock
    if @req_queue.length == 0
      @thread_cond.wait(@thread_mutex)
      if @req_queue.length == 0
        @thread_mutex.unlock
        next
      end
    end
    reqs = Array.new
    while @req_queue.length > 0 and reqs.length < 10 do
      m = @req_queue.pop_front
      if m[0] == 'stop'
        quit = true
        break
      end
      reqs.push([m[1], m[2], m[3], m[4]])
    end
    @thread_mutex.unlock
    if reqs.length > 0
      PubControl.pubbatch(reqs)
    end
  end
end

#set_auth_basic(username, password) ⇒ Object



33
34
35
36
37
38
# File 'lib/pubcontrol.rb', line 33

def set_auth_basic(username, password)
  @lock.synchronize do
    @auth_basic_user = username
    @auth_basic_pass = password
  end
end

#set_auth_jwt(claim, key) ⇒ Object



40
41
42
43
44
45
# File 'lib/pubcontrol.rb', line 40

def set_auth_jwt(claim, key)
  @lock.synchronize do
    @auth_jwt_claim = claim
    @auth_jwt_key = key
  end
end