Class: MessageChannel::Mongodb

Inherits:
Object
  • Object
show all
Defined in:
lib/message_channel/mongodb.rb

Constant Summary collapse

SIZE =
8000
NAME =
"_event_queue"

Instance Method Summary collapse

Constructor Details

#initialize(host: nil, port: nil, db: nil, size: nil, name: nil) ⇒ Mongodb

Returns a new instance of Mongodb.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/message_channel/mongodb.rb', line 11

def initialize( host: nil, port: nil, db: nil, size: nil, name: nil )
  @host  =  host  ||  "127.0.0.1"
  @port  =  ( port  ||  27017 ).to_i
  @db    =  db  ||  "test"
  @size  =  ( size  ||  SIZE ).to_i
  @name  =  name  ||  NAME

  @url   =  "mongodb://#{ @host }:#{ @port }/#{ @db }"
  @client  =  ::Mongo::Client.new( @url )

  @threads  =  {}
  @mutex  =  Mutex.new

  @event_queue  =  get_event_queue
end

Instance Method Details

#get_event_queueObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/message_channel/mongodb.rb', line 27

def get_event_queue
  event_queue  =  @mutex.synchronize do
    if  @client.database.collection_names.include?( @name )
      event_queue  =  @client[ @name ]
    else
      event_queue  =  @client[ @name, capped: true, size: @size ]
      event_queue.create
      now  =  Time.now
      doc  =  {
        topic: "reset",
        at: now.strftime("%Y%m%d.%H%M%S.%6L"),
      }
      event_queue.insert_one( doc )
      event_queue
    end
  end
end

#get_event_tail(event_queue) ⇒ Object



45
46
47
48
49
50
51
52
53
# File 'lib/message_channel/mongodb.rb', line 45

def get_event_tail( event_queue )
  filter  =  {}
  if  enum  =  event_queue.find( {}, { sort: { "$natural" => -1 } } ).to_enum
    if  doc  =  enum.next    rescue  nil
      filter  =  { "_id"=>{ "$gt"=>doc["_id"] } }
    end
  end
  event_tail  =  event_queue.find( filter, { cursor_type: :tailable_await } ).to_enum
end

#listen(*patterns, &block) ⇒ Object



106
107
108
109
110
111
112
113
114
# File 'lib/message_channel/mongodb.rb', line 106

def listen( *patterns, &block )
  if block.nil?
    listen_once( *patterns )
  else
    listen_each( *patterns ) do |topic, items|
      block.call( topic, items )
    end
  end
end

#listen_each(*patterns, &block) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/message_channel/mongodb.rb', line 85

def listen_each( *patterns, &block )
  patterns.each do |pattern|
    @threads[pattern]  =  ::Thread.start(pattern) do |pattern|
      begin
        event_queue  =  get_event_queue
        event_tail  =  get_event_tail( event_queue )
        while  doc  =  event_tail.next
          items  =  JSON.parse( doc.to_json, symbolize_names: true )
          topic  =  items[:topic]
          if File.fnmatch( pattern, topic, File::FNM_PATHNAME )
            items.delete( :_id )
            items.delete( :topic )
            block.call( topic, items )
          end
        end
      ensure
      end
    end
  end
end

#listen_once(*patterns) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/message_channel/mongodb.rb', line 55

def listen_once( *patterns )
  queue  =  Queue.new
  threads  =  {}
  patterns.each do |pattern|
    threads[pattern]  =  ::Thread.start(pattern) do |pattern|
      event_queue  =  get_event_queue
      event_tail  =  get_event_tail( event_queue )
      begin
        while  doc  =  event_tail.next
          items  =  JSON.parse( doc.to_json, symbolize_names: true )
          topic  =  items[:topic]
          if File.fnmatch( pattern, topic, File::FNM_PATHNAME )
            items.delete( :_id )
            items.delete( :topic )
            queue.push  [topic, items]
          end
        end
      ensure
      end
    end
  end

  topic, items  =  queue.pop
  patterns.each do |pattern|
    threads[pattern].kill    rescue  nil
    threads.delete( pattern )    rescue  nil
  end
  [topic, items]
end

#notify(topic, **items) ⇒ Object



123
124
125
126
# File 'lib/message_channel/mongodb.rb', line 123

def notify( topic, **items )
  items[:topic]  =  topic
  @event_queue.insert_one( items )
end

#unlisten(**patterns) ⇒ Object



116
117
118
119
120
121
# File 'lib/message_channel/mongodb.rb', line 116

def unlisten( **patterns )
  patterns.each do |pattern|
    @threads[pattern].kill    rescue  nil
    @threads.delete( pattern )    rescue  nil
  end
end