Class: MessageChannel::Mongodb

Inherits:
Object
  • Object
show all
Defined in:
lib/message_channel/mongodb.rb

Constant Summary collapse

SIZE =
8000
NAME =
"_event_queue"

Instance Method Summary collapse

Constructor Details

#initialize(host: nil, port: nil, db: nil, size: nil, name: nil) ⇒ Mongodb

Returns a new instance of Mongodb.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/message_channel/mongodb.rb', line 11

def initialize( host: nil, port: nil, db: nil, size: nil, name: nil )
  @host  =  host  ||  "127.0.0.1"
  @port  =  ( port  ||  27017 ).to_i
  @db    =  db  ||  "test"
  @size  =  ( size  ||  SIZE ).to_i
  @name  =  name  ||  NAME

  @url   =  "mongodb://#{ @host }:#{ @port }/#{ @db }"
  @client  =  ::Mongo::Client.new( @url )

  @threads  =  {}
  @mutex  =  Mutex.new

  @event_queue  =  get_event_queue
end

Instance Method Details

#get_event_queueObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/message_channel/mongodb.rb', line 27

def get_event_queue
  event_queue  =  @mutex.synchronize do
    if  @client.database.collection_names.include?( @name )
      event_queue  =  @client[ @name ]
    else
      event_queue  =  @client[ @name, capped: true, size: @size ]
      event_queue.create
      now  =  Time.now
      doc  =  {
        topic: "reset",
        at: now.strftime("%Y%m%d.%H%M%S.%6L"),
      }
      event_queue.insert_one( doc )
      event_queue
    end
  end
end

#get_event_tail(event_queue) ⇒ Object



45
46
47
48
49
50
51
52
53
# File 'lib/message_channel/mongodb.rb', line 45

def get_event_tail( event_queue )
  filter  =  {}
  if ( enum  =  event_queue.find( {}, { sort: { "$natural" => -1 } } ).to_enum )
    if ( doc  =  enum.next    rescue  nil )
      filter  =  { "_id"=>{ "$gt"=>doc["_id"] } }
    end
  end
  event_tail  =  event_queue.find( filter, { cursor_type: :tailable_await } ).to_enum
end

#listen(*patterns, &block) ⇒ Object



108
109
110
111
112
113
114
115
116
# File 'lib/message_channel/mongodb.rb', line 108

def listen( *patterns, &block )
  if block.nil?
    listen_once( *patterns )
  else
    listen_each( *patterns ) do |topic, items|
      block.call( topic, items )
    end
  end
end

#listen_each(*patterns, &block) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/message_channel/mongodb.rb', line 86

def listen_each( *patterns, &block )
  patterns.each do |pattern|
    @threads[pattern]  =  ::Thread.start( pattern ) do |pttrn|
      begin
        event_queue  =  get_event_queue
        event_tail  =  get_event_tail( event_queue )
        while ( doc  =  event_tail.next )
          items  =  JSON.parse( doc.to_json, symbolize_names: true )
          topic  =  items[:topic]
          if File.fnmatch( pttrn, topic, File::FNM_PATHNAME )
            items.delete( :_id )
            items.delete( :topic )
            block.call( topic, items )
          end
        end
      rescue => e
        nil
      end
    end
  end
end

#listen_once(*patterns) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/message_channel/mongodb.rb', line 55

def listen_once( *patterns )
  queue  =  Queue.new
  threads  =  {}
  patterns.each do |pattern|
    threads[pattern]  =  ::Thread.start( pattern ) do |pttrn|
      event_queue  =  get_event_queue
      event_tail  =  get_event_tail( event_queue )
      begin
        while ( doc  =  event_tail.next )
          items  =  JSON.parse( doc.to_json, symbolize_names: true )
          topic  =  items[:topic]
          if File.fnmatch( pttrn, topic, File::FNM_PATHNAME )
            items.delete( :_id )
            items.delete( :topic )
            queue.push  [topic, items]
          end
        end
      rescue => e
        nil
      end
    end
  end

  topic, items  =  queue.pop
  patterns.each do |pattern|
    threads[pattern].kill    rescue  nil
    threads.delete( pattern )    rescue  nil
  end
  [topic, items]
end

#notify(topic, **items) ⇒ Object



125
126
127
128
# File 'lib/message_channel/mongodb.rb', line 125

def notify( topic, **items )
  items[:topic]  =  topic
  @event_queue.insert_one( items )
end

#unlisten(**patterns) ⇒ Object



118
119
120
121
122
123
# File 'lib/message_channel/mongodb.rb', line 118

def unlisten( **patterns )
  patterns.each do |pattern|
    @threads[pattern].kill    rescue  nil
    @threads.delete( pattern )    rescue  nil
  end
end