本文共 4641 字,大约阅读时间需要 15 分钟。
Change Streams是MongoDB从3.6版本开始提供的一项强大功能,用于实时监控数据库中的数据变更事件。它类似于关系数据库中的触发器,但基于不同的原理,能够为应用程序提供实时的增量数据推送。
Change Streams定义为数据变化的事件流,旨在帮助开发者跟踪MongoDB数据库中文档的增删改查操作。它通过订阅机制,实时捕捉数据库中的变更事件,并将这些事件推送给注册的回调函数。这种机制特别适用于需要实时响应数据变化的场景。
Change Streams的核心实现依赖于MongoDB的oplog(操作日志)结构。oplog记录了所有对数据库集合的增删改查操作。Change Stream通过在oplog上建立一个可滚动的游标(tailable cursor),持续追踪所有变更操作。具体来说,它会监控所有复制集上的变更事件,并根据配置的回调函数将事件内容推送给客户端应用。
Change Stream能够监控以下类型的变更事件:
从MongoDB 6.0版本开始,Change Stream还支持DDL(数据定义语言)事件的通知,例如createIndexes和dropIndexes操作。
Change Streams适用于以下场景:
监控
数据分析
数据同步
消息推送
在使用Change Streams时,可能会遇到应用服务中断的情况。为了保证数据变更的可用性,Change Stream支持断点恢复机制。具体来说:
Spring Boot提供了对MongoDB的集成支持,可以通过Spring Boot Starter Data MongoDB依赖,将Change Streams与Spring应用集成。以下是具体配置步骤:
在项目的依赖管理中添加以下配置:
org.springframework.boot spring-boot-starter-data-mongodb
在application.yml中配置MongoDB的连接信息:
spring: data: mongodb: uri: mongodb://hushang:123456@192.168.75.100:28017,192.168.75.100:28018,192.168.75.100:28019/test?authSource=admin&replicaSet=rs0
创建一个自定义的MessageListener类,用于接收变更事件:
package com.hs.learn.changestream;import lombok.extern.slf4j.Slf4j;import org.springframework.data.mongodb.core.messaging.Message;import org.springframework.data.mongodb.core.messaging.MessageListener;import org.springframework.stereotype.Component;@Component@Slf4jpublic class DocumentMessageListener implements MessageListener
在配置类中注册Change Stream监听器:
package com.hs.learn.config;import com.hs.learn.changestream.DocumentMessageListener;import com.mongodb.ReadConcern;import com.mongodb.ReadPreference;import com.mongodb.TransactionOptions;import com.mongodb.WriteConcern;import com.mongodb.client.model.changestream.FullDocument;import org.bson.Document;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.mongodb.MongoDatabaseFactory;import org.springframework.data.mongodb.MongoTransactionManager;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.data.mongodb.core.aggregation.Aggregation;import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;import org.springframework.data.mongodb.core.query.Criteria;import java.util.concurrent.TimeUnit;@Configurationpublic class MongodbConfig { @Bean public MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) { Executor executor = new ThreadPoolExecutor(5, 5, TimeUnit.MILLISECONDS.toSeconds(30), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); MessageListenerContainer container = new DefaultMessageListenerContainer(template, executor) { @Override public boolean isAutoStartup() { return true; } }; ChangeStreamRequest request = ChangeStreamRequest.builder(documentMessageListener) .collection("emp") .filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete")))) .fullDocumentLookup(FullDocument.UPDATE_LOOKUP) .build(); container.register(request, Document.class); return container; }}
通过MongoDB Shell验证Change Streams的运行:
rs0 [direct: primary] test> db.emp.insertOne({ name: "hushang", age: 24 })
控制台输出将显示变更事件的详细信息:
Received Message in collection: emptrawsource: ChangeStreamDocument{...}tconverted: Document{...}
以上配置和实现提供了一个完整的Change Streams使用示例,能够帮助开发者了解如何在Spring Boot应用中集成MongoDB的变更事件流。
转载地址:http://ycffk.baihongyu.com/