mongodb使用docker搭建replicaSet集群与变更监听(最新推荐)(mongodb python)奔走相告

随心笔谈9个月前发布 admin
214 00
🌐 经济型:买域名、轻量云服务器、用途:游戏 网站等 《腾讯云》特点:特价机便宜 适合初学者用 点我优惠购买
🚀 拓展型:买域名、轻量云服务器、用途:游戏 网站等 《阿里云》特点:中档服务器便宜 域名备案事多 点我优惠购买
🛡️ 稳定型:买域名、轻量云服务器、用途:游戏 网站等 《西部数码》 特点:比上两家略贵但是稳定性超好事也少 点我优惠购买

文章摘要

这篇文章主要介绍了使用MongoDB的Java API进行数据变化监控和同步测试。代码中定义了一个名为MongodbWatchTestMain的类,实现了对MongoDB数据的变化监听功能。该类通过创建MongoDB客户端并配置相关选项,初始化了一个测试用例的MongoDB数据库和集合。核心功能包括: 1. **数据变化监控**:通过MongoDB的`watch` API对指定集合进行数据变化监听,捕获更新、删除等操作的变更文档。2. **同步测试**:测试了数据插入、更新和删除操作,并通过`CompletableFuture`实现了异步变化检测的阻塞执行。3. **测试功能**:在主线程中执行数据操作,并通过键盘输入退出。 文章重点讲述了如何配置MongoDB的同步选项,以及如何使用异步机制实现数据变化的实时监控。代码中还涵盖了MongoDB客户端的创建、集合的初始化以及数据操作的测试部分。

package io.github.puhaiyang;

import com.google.common.collect.Lists;
import com.mongodb.client.*;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;

public class MongodbWatchTestMain {

public static void main(String[] args) throws Exception {
String uri=”mongodb://admin:123456@192.168.1.11:27017,192.168.1.12:27017/?replicaSet=haiyangReplset”;
MongoClient mongoClient=MongoClients.create(uri);
MongoDatabase mongoDatabase=mongoClient.getDatabase(“my-test-db”);
String myTestCollectionName=”myTestCollection”;
//获取出collection
MongoCollection<Document> mongoCollection=initCollection(mongoDatabase, myTestCollectionName);
//进行watch
CompletableFuture.runAsync(() -> {
while (true) {
List<Bson> pipeline=Lists.newArrayList(
Aggregates.match(Filters.in(“ns.coll”, myTestCollectionName)),
Aggregates.match(Filters.in(“operationType”, Arrays.asList(“insert”, “update”, “replace”, “delete”)))
);
ChangeStreamIterable<Document> changeStream=mongoDatabase.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);

changeStream.forEach(event -> {
String collectionName=Objects.requireNonNull(event.getNamespace()).getCollectionName();
System.out.println(“——–> event:” + event.toString());
});
}
});

//数据变更测试
{
Thread.sleep(3_000);
InsertOneResult insertResult=mongoCollection.insertOne(new Document(“test”, “sample movie document”));
System.out.println(“Success! Inserted document id: ” + insertResult.getInsertedId());
UpdateResult updateResult=mongoCollection.updateOne(new Document(“test”, “sample movie document”), Updates.set(“field2”, “sample movie document update”));
System.out.println(“Updated ” + updateResult.getModifiedCount() + ” document.”);
DeleteResult deleteResult=mongoCollection.deleteOne(new Document(“field2”, “sample movie document update”));
System.out.println(“Deleted ” + deleteResult.getDeletedCount() + ” document.”);
}

new Scanner(System.in).next();
}

private static MongoCollection<Document> initCollection(MongoDatabase mongoDatabase, String myTestCollectionName) {
ArrayList<Document> existsCollections=mongoDatabase.listCollections().into(new ArrayList<>());
Optional<Document> existsCollInfoOpl=existsCollections.stream().filter(doc -> StringUtils.equals(myTestCollectionName, doc.getString(“name”))).findFirst();
existsCollInfoOpl.ifPresent(collInfo -> {
//确保开启了changeStreamPreAndPost
Document changeStreamPreAndPostImagesEnable=collInfo.get(“options”, Document.class).get(“changeStreamPreAndPostImages”, Document.class);
if (changeStreamPreAndPostImagesEnable !=null && !changeStreamPreAndPostImagesEnable.getBoolean(“enabled”)) {
Document mod=new Document();
mod.put(“collMod”, myTestCollectionName);
mod.put(“changeStreamPreAndPostImages”, new Document(“enabled”, true));
mongoDatabase.runCommand(mod);
}
});
if (!existsCollInfoOpl.isPresent()) {
CreateCollectionOptions collectionOptions=new CreateCollectionOptions();
//创建collection时开启ChangeStreamPreAndPostImages
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
mongoDatabase.createCollection(myTestCollectionName, collectionOptions);
}
return mongoDatabase.getCollection(myTestCollectionName);
}
}

© 版权声明

相关文章