mongodb使用docker搭建replicaSet集群与变更监听(最新推荐)(mongodb python)奔走相告

随心笔谈12个月前发布 admin
100 0

package io.github.puhaiyang;

import com.google.common.collect.Lists;
import com.mongodb.client.*;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;

public class MongodbWatchTestMain {

public static void main(String[] args) throws Exception {
String uri=”mongodb://admin:123456@192.168.1.11:27017,192.168.1.12:27017/?replicaSet=haiyangReplset”;
MongoClient mongoClient=MongoClients.create(uri);
MongoDatabase mongoDatabase=mongoClient.getDatabase(“my-test-db”);
String myTestCollectionName=”myTestCollection”;
//获取出collection
MongoCollection<Document> mongoCollection=initCollection(mongoDatabase, myTestCollectionName);
//进行watch
CompletableFuture.runAsync(() -> {
while (true) {
List<Bson> pipeline=Lists.newArrayList(
Aggregates.match(Filters.in(“ns.coll”, myTestCollectionName)),
Aggregates.match(Filters.in(“operationType”, Arrays.asList(“insert”, “update”, “replace”, “delete”)))
);
ChangeStreamIterable<Document> changeStream=mongoDatabase.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);

changeStream.forEach(event -> {
String collectionName=Objects.requireNonNull(event.getNamespace()).getCollectionName();
System.out.println(“——–> event:” + event.toString());
});
}
});

//数据变更测试
{
Thread.sleep(3_000);
InsertOneResult insertResult=mongoCollection.insertOne(new Document(“test”, “sample movie document”));
System.out.println(“Success! Inserted document id: ” + insertResult.getInsertedId());
UpdateResult updateResult=mongoCollection.updateOne(new Document(“test”, “sample movie document”), Updates.set(“field2”, “sample movie document update”));
System.out.println(“Updated ” + updateResult.getModifiedCount() + ” document.”);
DeleteResult deleteResult=mongoCollection.deleteOne(new Document(“field2”, “sample movie document update”));
System.out.println(“Deleted ” + deleteResult.getDeletedCount() + ” document.”);
}

new Scanner(System.in).next();
}

private static MongoCollection<Document> initCollection(MongoDatabase mongoDatabase, String myTestCollectionName) {
ArrayList<Document> existsCollections=mongoDatabase.listCollections().into(new ArrayList<>());
Optional<Document> existsCollInfoOpl=existsCollections.stream().filter(doc -> StringUtils.equals(myTestCollectionName, doc.getString(“name”))).findFirst();
existsCollInfoOpl.ifPresent(collInfo -> {
//确保开启了changeStreamPreAndPost
Document changeStreamPreAndPostImagesEnable=collInfo.get(“options”, Document.class).get(“changeStreamPreAndPostImages”, Document.class);
if (changeStreamPreAndPostImagesEnable !=null && !changeStreamPreAndPostImagesEnable.getBoolean(“enabled”)) {
Document mod=new Document();
mod.put(“collMod”, myTestCollectionName);
mod.put(“changeStreamPreAndPostImages”, new Document(“enabled”, true));
mongoDatabase.runCommand(mod);
}
});
if (!existsCollInfoOpl.isPresent()) {
CreateCollectionOptions collectionOptions=new CreateCollectionOptions();
//创建collection时开启ChangeStreamPreAndPostImages
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
mongoDatabase.createCollection(myTestCollectionName, collectionOptions);
}
return mongoDatabase.getCollection(myTestCollectionName);
}
}

© 版权声明

相关文章