博客
关于我
MongoDB change stream 详解
阅读量:797 次
发布时间:2023-02-09

本文共 4641 字,大约阅读时间需要 15 分钟。

MongoDB Change Streams入门与实践指南

Change Streams是MongoDB从3.6版本开始提供的一项强大功能,用于实时监控数据库中的数据变更事件。它类似于关系数据库中的触发器,但基于不同的原理,能够为应用程序提供实时的增量数据推送。

Change Streams概述

Change Streams定义为数据变化的事件流,旨在帮助开发者跟踪MongoDB数据库中文档的增删改查操作。它通过订阅机制,实时捕捉数据库中的变更事件,并将这些事件推送给注册的回调函数。这种机制特别适用于需要实时响应数据变化的场景。

Change Streams实现原理

Change Streams的核心实现依赖于MongoDB的oplog(操作日志)结构。oplog记录了所有对数据库集合的增删改查操作。Change Stream通过在oplog上建立一个可滚动的游标(tailable cursor),持续追踪所有变更操作。具体来说,它会监控所有复制集上的变更事件,并根据配置的回调函数将事件内容推送给客户端应用。

支持的变更事件类型

Change Stream能够监控以下类型的变更事件:

  • insert/update/delete:插入、更新、删除操作
  • drop:集合被删除
  • rename:集合被重命名
  • dropDatabase:数据库被删除
  • invalidate:由于drop、rename或dropDatabase操作,Change Stream会触发invalidate事件并关闭相关订阅

从MongoDB 6.0版本开始,Change Stream还支持DDL(数据定义语言)事件的通知,例如createIndexes和dropIndexes操作。

Change Streams的使用场景

Change Streams适用于以下场景:

  • 监控

    • 用户需要实时了解数据库中的文档变动情况,例如账户信息的更改、订单状态的变化等。
  • 数据分析

    • 需要基于增量数据进行实时分析的场景,例如用户行为分析、日志处理等。Change Streams可以将数据推送到如Flink、Spark等流处理平台。
  • 数据同步

    • 建立热备份或冷备份的副本数据库,确保数据一致性和可用性。Change Streams可以用于跨地域或跨数据中心的数据同步,支持全球范围内的数据复制。
  • 消息推送

    • 实时推送变更信息到客户端或其他系统。例如,公交车位置信息、物流状态更新等实时消息可以通过Change Streams实现。
  • 故障恢复

    在使用Change Streams时,可能会遇到应用服务中断的情况。为了保证数据变更的可用性,Change Stream支持断点恢复机制。具体来说:

    • 如果应用在接收到某个变更事件时崩溃,重启后可以通过保留上次变更通知中的_id值,继续从上一个中断点继续获取后续变更事件。

    Spring Boot整合Change Streams

    Spring Boot提供了对MongoDB的集成支持,可以通过Spring Boot Starter Data MongoDB依赖,将Change Streams与Spring应用集成。以下是具体配置步骤:

    引入依赖

    在项目的依赖管理中添加以下配置:

    org.springframework.boot
    spring-boot-starter-data-mongodb

    配置MongoDB URI

    在application.yml中配置MongoDB的连接信息:

    spring:    data:        mongodb:            uri: mongodb://hushang:123456@192.168.75.100:28017,192.168.75.100:28018,192.168.75.100:28019/test?authSource=admin&replicaSet=rs0

    配置Change Stream监听器

    创建一个自定义的MessageListener类,用于接收变更事件:

    package com.hs.learn.changestream;import lombok.extern.slf4j.Slf4j;import org.springframework.data.mongodb.core.messaging.Message;import org.springframework.data.mongodb.core.messaging.MessageListener;import org.springframework.stereotype.Component;@Component@Slf4jpublic class DocumentMessageListener implements MessageListener
    { @Override public void onMessage(Message
    message) { log.info("收到消息:{}", message.getProperties().getCollectionName()); log.info("原始内容:{}", message.getRaw()); log.info("转换后的内容:{}", message.getBody()); }}

    配置MessageListenerContainer

    在配置类中注册Change Stream监听器:

    package com.hs.learn.config;import com.hs.learn.changestream.DocumentMessageListener;import com.mongodb.ReadConcern;import com.mongodb.ReadPreference;import com.mongodb.TransactionOptions;import com.mongodb.WriteConcern;import com.mongodb.client.model.changestream.FullDocument;import org.bson.Document;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.mongodb.MongoDatabaseFactory;import org.springframework.data.mongodb.MongoTransactionManager;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.data.mongodb.core.aggregation.Aggregation;import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;import org.springframework.data.mongodb.core.query.Criteria;import java.util.concurrent.TimeUnit;@Configurationpublic class MongodbConfig {    @Bean    public MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {        Executor executor = new ThreadPoolExecutor(5, 5, TimeUnit.MILLISECONDS.toSeconds(30), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());        MessageListenerContainer container = new DefaultMessageListenerContainer(template, executor) {            @Override            public boolean isAutoStartup() {                return true;            }        };        ChangeStreamRequest request = ChangeStreamRequest.builder(documentMessageListener)                .collection("emp")                .filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete"))))                .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)                .build();        container.register(request, Document.class);        return container;    }}

    测试

    通过MongoDB Shell验证Change Streams的运行:

    rs0 [direct: primary] test> db.emp.insertOne({ name: "hushang", age: 24 })

    控制台输出将显示变更事件的详细信息:

    Received Message in collection: emptrawsource: ChangeStreamDocument{...}tconverted: Document{...}

    以上配置和实现提供了一个完整的Change Streams使用示例,能够帮助开发者了解如何在Spring Boot应用中集成MongoDB的变更事件流。

    转载地址:http://ycffk.baihongyu.com/

    你可能感兴趣的文章
    Mac:Permission denied XXX
    查看>>
    macaca 测试web(2)
    查看>>
    Macbook / pro卡顿怎么处理?这些方法让它满血复活!
    查看>>
    MacBook Air怎么重新输入wifi密码
    查看>>
    MacBook开机出现问号文件夹?别急 可能是这些原因!
    查看>>
    MacBook键盘突然失灵?这几个排查步骤一定要试试!
    查看>>
    Macbook风扇突然一直狂转?一文搞定各种可能原因
    查看>>
    MacBook黑屏/白屏开不了机?一文搞定所有可能的解决方案!
    查看>>
    Machine Learning in Action -- 树回归
    查看>>
    Machine Learning Project Walkthrough: Preparing the features
    查看>>
    macOS Big Sur 11.0.1 上未弹出应用程序
    查看>>
    MacOS:创建目录出现 Read-only file system
    查看>>
    macOS使用django安装mysqlclient遇到的问题(mysqlclient 1.3.3 or newer is required)
    查看>>
    macOS系统上安装JDK
    查看>>
    MacType Decency 项目常见问题解决方案
    查看>>
    Mac下IDEA更换Maven仓库
    查看>>
    Mac下MySQL 报错:Error1045(28000)解决办法
    查看>>
    Mac下redis安装和启动
    查看>>
    Mac下如何配置环境变量
    查看>>
    Mac下安装jdk
    查看>>