programing

MongoDB 컬렉션의 변경 내용을 청취하는 방법

instargram 2023. 4. 2. 09:42
반응형

MongoDB 컬렉션의 변경 내용을 청취하는 방법

저는 MongoDB를 데이터 스토어로 하여 일종의 백그라운드 작업 큐 시스템을 만들고 있습니다.작업자가 작업을 처리하기 전에 MongoDB 컬렉션에 대한 삽입을 "리슨"하려면 어떻게 해야 합니까?

이전과 같은 변경이 있는지 확인하기 위해 몇 초마다 폴링을 해야 합니까?아니면 스크립트가 삽입될 때까지 기다릴 수 있는 방법이 있습니까?

현재 진행 중인 PHP 프로젝트입니다만, Ruby 또는 언어에 구애받지 않고 자유롭게 답변해 주십시오.

당신이 생각하는 것은 방아쇠처럼 들린다.MongoDB는 트리거를 지원하지 않지만 일부 사용자는 몇 가지 트릭을 사용하여 "스스로 롤링"하고 있습니다.여기서의 열쇠는 oplog입니다.

복제 세트에서 MongoDB를 실행하면 모든 MongoDB 작업이 작업 로그(oplog)에 기록됩니다.oplog는 기본적으로 데이터를 수정한 실행 목록입니다.복제본 세트는 이 oplog에서 변경사항을 들은 후 변경사항을 로컬로 적용하여 작동합니다.

이 말이 익숙하게 들리나요?

여기에서는 전체 프로세스를 자세히 설명할 수 없습니다.몇 페이지의 문서이지만 필요한 도구를 사용할 수 있습니다.

먼저 oplog에 대한 몇 가지 기록 - 간단한 설명 - 컬렉션 레이아웃(oplog 포함)

또한 조정 가능한 커서를 활용할 수도 있습니다.이를 통해 변경을 폴링하는 대신 이를 청취할 수 있습니다.복제에는 조정 가능한 커서가 사용되므로 이는 지원되는 기능입니다.

MongoDB는 소위 말하는 것을 가지고 있으며, MongoDB는 데이터를 청취자에게 푸시할 수 있습니다.

A capped collection는 기본적으로 크기가 고정되어 삽입만 가능한 컬렉션입니다.작성 방법은 다음과 같습니다.

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB 맞춤 커서 (Jonathan H. Way의 원본 게시물)

루비

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (Robert Stewart 지음)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl(최대 기준)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

기타 자원:

Ruby/Node.js Tutorial: MongoDB의 캡이 있는 컬렉션에서 삽입물을 재생하는 응용 프로그램을 만드는 과정을 안내합니다.

조정 가능한 커서에 대해 자세히 설명하는 문서입니다.

조정 가능한 커서를 사용하는 PHP, Ruby, Python 및 Perl의 예.

이것 좀 봐.스트림 변경

2018년 1월 10일 - 릴리즈 3.6

*편집: 이 방법에 대한 기사를 작성했습니다.https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


mongodb 3.6 신기능 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

changeStreams를 사용하려면 데이터베이스가 복제 세트여야 합니다.

레플리케이션 세트의 상세한 것에 대하여는, https://docs.mongodb.com/manual/replication/ 를 참조해 주세요.

기본적으로 데이터베이스는 "독립 실행형"입니다.

스탠드아론을 레플리카 세트로 변환하는 방법:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


다음은 이 사용 방법에 대한 실제 적용 예입니다.
* 특히 노드의 경우.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

다음 중 하나:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-sethttpsdocs.mongodb.com/manual/tutorial/
https://docs.mongodb.com/manual/tutorial/change-streams-examplehttpsdocs.mongodb.com/manual/tutorial/

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

MongoDB 3.6 이후 Change Streams라는 새로운 알림 API가 등장할 예정입니다.예에 대해서는, 이 블로그의 투고를 참조해 주세요.예:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

MongoDB 버전 3.6에는 OpLog 상단에 기본적으로 API인 변경 스트림이 포함되어 있어 트리거/알림 같은 사용 사례가 가능합니다.

다음은 Java의 예에 대한 링크입니다.http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

NodeJS의 예는 다음과 같습니다.

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });

또는 표준 Mongo Find And Update 메서드를 사용하여 콜백 실행 시 콜백 내에서 EventEmitter 이벤트(노드 내)를 실행할 수도 있습니다.

이 이벤트를 듣고 있는 애플리케이션 또는 아키텍처의 다른 부분은 업데이트에 대해 통지되며, 관련 데이터도 전송됩니다.이것은 Mongo로부터 통지를 받는 매우 간단한 방법입니다.

이러한 답변의 대부분은 새로운 기록만 제공할 뿐 업데이트는 하지 않거나 매우 비효율적입니다.

신뢰할 수 있는 퍼포먼스 방법은 로컬 DB에 맞춤 가능한 커서를 작성하는 것뿐입니다.oplog.rs 컬렉션에서는 MongoDB에 대한 모든 변경을 가져와 원하는 작업을 수행합니다(MongoDB는 복제를 지원하기 위해 내부적으로도 이 작업을 수행합니다).

oplog 내용 설명:https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Node.js 라이브러리의 예에서는 oplog를 사용하여 수행할 수 있는 작업에 관한 API를 제공합니다.https://github.com/cayasso/mongo-oplog

MongoDB Stitch라는 멋진 서비스 세트가 있습니다.스티치 기능/트리거를 조사합니다.이것은 클라우드 기반 유료 서비스(AWS)입니다.당신의 경우 삽입물에서는 Javascript로 작성된 커스텀 함수를 호출할 수 있습니다.

여기에 이미지 설명 입력

사실 출력을 보는 대신 mongoose 스키마에서 제공하는 미들웨어를 사용하여 새로운 것을 삽입해도 알 수 없는 이유는 무엇입니까?

이 삽입이 완료된 후 새 문서를 삽입하고 작업을 수행할 수 있습니다.

여기서 확인할 수 있는 작업용 Java 예가 있습니다.

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

여기에 제시된 QUERY OPTIONS가 핵심입니다.

또한 매번 모든 데이터를 로드할 필요가 없는 경우 찾기 쿼리를 변경할 수도 있습니다.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

3.6 이후 데이터베이스 사용이 허용되면 다음과 같은 데이터베이스 트리거 유형이 허용됩니다.

  • 이벤트 주도 트리거 - 관련 문서 자동 갱신, 다운스트림 서비스 알림, 혼합 워크로드 지원 데이터 전파, 데이터 무결성 및 감사에 도움이 됩니다.
  • 예약된 트리거 - 예약된 데이터 검색, 전파, 아카이브 및 분석 워크로드에 유용합니다.

Atlas 계정에 로그인하고Triggers인터페이스 및 새 트리거 추가:

여기에 이미지 설명 입력

자세한 설정 또는 세부 정보를 보려면 각 섹션을 확장하십시오.

언급URL : https://stackoverflow.com/questions/9691316/how-to-listen-for-changes-to-a-mongodb-collection

반응형