您当前的位置:首页 > 电脑百科 > 数据库 > 百科

数据实时同步之MongoDB

时间:2020-07-07 11:00:54  来源:  作者:
数据实时同步之MongoDB

 

转载本文需注明出处:微信公众号EAWorld,违者必究。

​前言:

随着传统企业的发展,企业数据呈现多样化,海量化,难以实现数据快速分析。MongoDB是当前很多企业使用的,当日积月累数据很大时,就可能会忽略历史数据的价值,可以把数据实时同步到其他储存:HBASE、HIVE、HDFS文件等等。在当前大数据、云计算的时代潮流下,实现数据价值,对企业决策力、洞察发现力极其有益。

在MongoDB 3.6 之后版本,提供Change Streams API。但目前数据量庞大的仍还是3.6之前版本的历史悠久企业。这些资产数据是不可缺少的,所以当使用3.6之前版本,两步走:首先对历史库数据迁移。再开始监听MongoDB库增量变化,实现MongoDB的监听和实时同步(Oplog)。

目录:

1.Oplog简介

2.MongoDB服务配置启动

3.Oplog获取和查看

4.简单JAVA代码实现

1.Oplog简介

1、认识

当在MongoDB的Primary下,我们进行操作库表时,这些操作会以特殊格式储存在local库下的一个固定集合中(下面会介绍到)。Secondary(次)就会通过获取主的oplog,来进行同步数据,并且存储自己的Oplog。所以Oplog 也是Mongodb Replication的重要组成了。

2、大小

Mongodb默认将其大小分配的是5%的空闲磁盘空间。也可以在创建 mongod 服务时,在mongo.conf中oplogSize自定义参数设置,单位是mb,如果不指定,不同操作系统上的 oplog 默认大小不同,具体为以下:

For 64-bit linux, Solaris, and FreeBSD systems:可以分配 5% 的剩余空间。如果分配的值仍小于 1GB,那么会分配 1GB。

For 64-bit OS X systems:分配 183MB。

For 32-bit systems:分配 48MB。

oplog的内存占比速度与系统处理写请求的速度相当,所以很快就会增量更新数据。时间上完全可以支持实时同步。

3、oplog库表

oplog会自动创建在local库的collection:

a、master/slave 架构下:local.oplog.$main

数据实时同步之MongoDB

 

b、replica sets 架构下:local.oplog.rs

数据实时同步之MongoDB

 

c、sharding 架构下,mongos下不能查看oplog,可到每一片去

2.MongoDB服务配置启动

1.解压当前目录

tar zxvf mongodb-linux-x86_64-3.2.22.tgz -C ./ mongodb-3.2.22

2.创建data、logs/mongodb.log文件夹

数据实时同步之MongoDB

 

3. bin下创建mongodb.conf自定义配置

数据实时同步之MongoDB

 

4. 创建启动脚本

start-mongodb.sh,赋权chmod +x start-mongodb.sh

数据实时同步之MongoDB

 

5. 启动 ./start-mongodb.sh

6.测试

./mongo,默认进入的collections是test,PRIMARY节点

数据实时同步之MongoDB

 


数据实时同步之MongoDB

 

3.oplog获取和查看

1. oplog数据结构

分析oplog中字段的含义

  • ts: 8字节的时间戳,由4字节unix timestamp + 4字节自增计数表示。这个值很重要,在选举(如master宕机时)新primary时,会选择ts最大的那个secondary作为新primary
  • op:1字节的操作类型
  • "i":insert
  • "u":update
  • "d":delete
  • "c":db cmd
  • "db":声明当前数据库 (其中ns 被设置成为=>数据库名称+ '.')
  • "n": no op,即空操作,其会定期执行以确保时效性
  • ns:操作所在的namespace
  • o:操作所对应的document,即当前操作的内容(比如更新操作时要更新的的字段和值)
  • o2: 在执行更新操作时的where条件,仅限于update时才有该属性

2. 查看oplog的基本信息

通过"db.printReplicationInfo()"命令可以查看oplog的信息

数据实时同步之MongoDB

 

字段说明:

  • configured oplog size:oplog文件大小
  • log length start to end: oplog日志的启用时间段
  • oplog first event time: 第一个事务日志的产生时间
  • oplog last event time: 最后一个事务日志的产生时间
  • now: 现在的时间

3、查看oplog日志数据

这里我们一般会重视数据的变化,所以列出insert、update、delete示例

添加一条数据:

db.test.insert({"name":"这是一侧测试","age":"18"})

oplog日志数据:

{
    "ts" : Timestamp(1588728789, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "i",
    "ns" : "runoob.test",
    "o" : {
        "_id" : ObjectId("5eb213d5ce1474899c3a2482"),
        "name" : "这是一侧测试",
        "age" : "18"
    }
}

修改:

db.test.update({"_id": ObjectId("5eb213d5ce1474899c3a2482")},{$set:{"name":"这是修改的测试","age":"20"}},false,true)
{
    "ts" : Timestamp(1588730210, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "u",
    "ns" : "runoob.test",
    "o2" : {
        "_id" : ObjectId("5eb213d5ce1474899c3a2482")
    },
    "o" : {
        "$set" : {
            "name" : "这是修改的测试",
            "age" : "20"
        }
    }
}

删除:

db.test.remove({"name" : "这是修改的测试"})
{
    "ts" : Timestamp(1588730347, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "d",
    "ns" : "runoob.test",
    "o" : {
        "_id" : ObjectId("5eb213d5ce1474899c3a2482")
    }
}

4.简单Java代码实现

1、maven依赖引入

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.41</version>
</dependency>
<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>mongo-java-driver</artifactId>
  <version>3.2.2</version>
</dependency>

2、配置文件resource/mongo-config.properties

connectionsPerHost=10
connectTimeout=10000
cursorFinalizerEnabled=true
maxWaitTime=120000
threadsAllowedToBlockForConnectionMultiplier=5
readSecondary=false
socketTimeout=0
socketKeepAlive=false
write=0
writeTimeout=0
journal=false
hostConfString=127.0.0.1:27017
userName=adminUser
useCollection=admin
password=adminPass

3、MongoDBUtil.java工具类

/**
 * @author wxb
 * @date 2019-10-12 11:26
 */
public class MongoDBUtil {
    private static MongoClient mongoClient;
    private static Properties properties;
    private static WriteConcern concern;
    static {
        try {
            InputStream inputStream = MongoDBUtil.class.getClassLoader().getResourceAsStream("mongo-config.properties");
            properties = new Properties();
            properties.load(inputStream);
            concern = new WriteConcern(Integer.parseInt(properties.getProperty("write")),
                    Integer.parseInt(properties.getProperty("writeTimeout")));
            concern.withJournal(Boolean.valueOf(properties.getProperty("journal")));//读取journal参数值

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 初始化,返回客户端
     */
    public static MongoClient initMongoHasUser() throws IOException {
        List<ServerAddress> adds = new ArrayList<>();
        String[] address = properties.getProperty("hostConfString").split(":");//读取服务IP地址和端口号
        ServerAddress serverAddress = new ServerAddress(address[0], Integer.valueOf(address[1]));
        adds.add(serverAddress);
        List<MongoCredential> credentials = new ArrayList<>();
        MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(
                properties.getProperty("userName"),
                properties.getProperty("useCollection"),
                properties.getProperty("passWord").toCharArray());
        credentials.add(mongoCredential);
        MongoClientOptions options = MongoClientOptions.builder()
                .connectionsPerHost(Integer.parseInt(properties.getProperty("connectionsPerHost")))
                .connectTimeout(Integer.parseInt(properties.getProperty("connectTimeout")))
                .cursorFinalizerEnabled(Boolean.valueOf(properties.getProperty("cursorFinalizerEnabled")))
                .maxWaitTime(Integer.parseInt(properties.getProperty("maxWaitTime")))
                .threadsAllowedToBlockForConnectionMultiplier(Integer.parseInt(properties
                        .getProperty("threadsAllowedToBlockForConnectionMultiplier")))
                .socketTimeout(Integer.valueOf(properties.getProperty("socketTimeout")))
                .socketKeepAlive(Boolean.valueOf(properties.getProperty("socketKeepAlive")))
                .writeConcern(concern)
                .build();
        if (adds.size() > 1){
            mongoClient = new MongoClient(adds, credentials, options);
        }else {
            mongoClient = new MongoClient(adds.get(0), credentials, options);
        }
        return mongoClient;
    }
}

4、MongoDBOpLog.java 集成了库验证、表查询、数据动态获取

4.1测试初始化客户端-持有数据库

    public class MongoDBOpLog {
    private static MongoClient mongoClient;
    public static void main(String[] args) throws InterruptedException {
        initMongoClient();
        //获取local库
        MongoDatabase database = getDatabase("local");
        //监控库oplog.$main
        MongoCollection<Document> runoob = getCollection(database, "oplog.$main");
        //处理
        dataProcessing(runoob);
    }
    private static void initMongoClient() {
        try {
            mongoClient = MongoDBUtil.initMongoHasUser();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static MongoDatabase getDatabase(String dataBase) {
        if (!mongoClient.getDatabaseNames().contains(dataBase)) {
            throw new RuntimeException(dataBase + " no exist !");
        }
        MongoDatabase mongoDatabase = mongoClient.getDatabase(dataBase);
        return mongoDatabase;
    }

4.2获取表对象

  /**
     * 获取表对象
          * @param mongoDatabase
     * @param testCollection
     * @return
     */
    public static MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, String testCollection) {
        MongoCollection<Document> collection = null;
        try {
            //获取数据库dataBase下的集合collecTion,如果没有将自动创建
            collection = mongoDatabase.getCollection(testCollection);
        } catch (Exception e) {
            throw new RuntimeException("获取" + mongoDatabase.getName() + "数据库下的" + testCollection + "集合 failed !" + e);
        }
        return collection;
    }

4.3获取数据流处理

    /**
     * 解析操作类型
     * @param op
     * @return
     */
    private static String getEventType(String op) {
        switch (op) {
            case "i":
                return "insert";
            case "u":
                return "update";
            case "d":
                return "delete";
            default:
                return "other";
        }
    }
    /**
     * 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空
     * @return JSONObject
     */
    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {
        JSONObject columns = new JSONObject();// 存放变化后的字段
        result.put("columns", columns);
        result.put("condition", new JSONObject()); // 条件
        for (Map.Entry<String, Object> entry : document.entrySet()) {
            if (entry.getKey().equalsIgnoreCase("_id")) {
                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());
                continue;
            }
            columns.put(entry.getKey(), entry.getValue());
        }
        return result;
    }
      case "d":                return "delete";            default:                return "other";        }    }    /**     * 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空     * @return JSONObject     */    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {        JSONObject columns = new JSONObject();// 存放变化后的字段        result.put("columns", columns);        result.put("condition", new JSONObject()); // 条件        for (Map.Entry<String, Object> entry : document.entrySet()) {            if (entry.getKey().equalsIgnoreCase("_id")) {                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());                continue;            }            columns.put(entry.getKey(), entry.getValue());        }        return result;    }

4.4数据流标准化

    /**
     * 解析操作类型
     * @param op
     * @return
     */
    private static String getEventType(String op) {
        switch (op) {
            case "i":
                return "insert";
            case "u":
                return "update";
            case "d":
                return "delete";
            default:
                return "other";
        }
    }
    /**
     * 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空
     * @return JSONObject
     */
    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {
        JSONObject columns = new JSONObject();// 存放变化后的字段
        result.put("columns", columns);
        result.put("condition", new JSONObject()); // 条件
        for (Map.Entry<String, Object> entry : document.entrySet()) {
            if (entry.getKey().equalsIgnoreCase("_id")) {
                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());
                continue;
            }
            columns.put(entry.getKey(), entry.getValue());
        }
        return result;
    }
      case "d":                return "delete";            default:                return "other";        }    }    /**     * 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空     * @return JSONObject     */    private static JSONObject resultRow(Document document, JSONObject result, String eventType) {        JSONObject columns = new JSONObject();// 存放变化后的字段        result.put("columns", columns);        result.put("condition", new JSONObject()); // 条件        for (Map.Entry<String, Object> entry : document.entrySet()) {            if (entry.getKey().equalsIgnoreCase("_id")) {                columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());                continue;            }            columns.put(entry.getKey(), entry.getValue());        }        return result;    }

5、结果

5.1新增

数据实时同步之MongoDB

 

5.2更新

数据实时同步之MongoDB

 

5.3删除

数据实时同步之MongoDB

 

实践

目前普元数据服务共享平台DSP(Data Service Platform),已经集成离线开发和在线开发实现单表和多表同步到HBASE的实践,做到了这一步,并且对客户的需求完成交付。

总之,对于当前企业数据库MongoDB,无论是使用Change Streams,还是Oplog增量同步,实现数据汇聚、搭建数据服务共享平台,提取价值、长久规划,都是必不可少的。

关于作者: 雨声,现任普元高级开发工程师,熟悉软件开发的大数据、Java、常用消息组件等主流技术,有数据采集、消息推送、数据清洗、实时计算、数据可视化的完整开发经验。

关于EAWorld:微服务,DevOps,数据治理,移动架构原创技术分享。



Tags:数据实时同步   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
背景在大数据时代,存在大量基于数据的业务。数据需要在不同的系统之间流动、整合。通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据。传统...【详细内容】
2020-10-29  Tags: 数据实时同步  点击:(167)  评论:(0)  加入收藏
转载本文需注明出处:微信公众号EAWorld,违者必究。​前言:随着传统企业的发展,企业数据呈现多样化,海量化,难以实现数据快速分析。MongoDB是当前很多企业使用的,当日积月累数据很...【详细内容】
2020-07-07  Tags: 数据实时同步  点击:(71)  评论:(0)  加入收藏
▌简易百科推荐
1增1.1【插入单行】insert [into] <表名> (列名) values (列值)例:insert into Strdents (姓名,性别,出生日期) values (&#39;开心朋朋&#39;,&#39;男&#39;,&#39;1980/6/15&#3...【详细内容】
2021-12-27  快乐火车9d3    Tags:SQL   点击:(1)  评论:(0)  加入收藏
最近发现还有不少做开发的小伙伴,在写存储过程的时候,在参考已有的不同的写法时,往往很迷茫, 不知道各种写法孰优孰劣,该选用哪种写法,以及各种写法的优缺点,本文以一个简单的查询...【详细内容】
2021-12-23  linux上的码农    Tags:sql   点击:(9)  评论:(0)  加入收藏
《开源精选》是我们分享Github、Gitee等开源社区中优质项目的栏目,包括技术、学习、实用与各种有趣的内容。本期推荐的HasorDB 是一个全功能数据库访问工具,提供对象映射、丰...【详细内容】
2021-12-22  GitHub精选    Tags:HasorDB   点击:(5)  评论:(0)  加入收藏
作者丨Rafal Grzegorczyk译者丨陈骏策划丨孙淑娟【51CTO.com原创稿件】您是否还在手动对数据库执行各种脚本?您是否还在浪费时间去验证数据库脚本的正确性?您是否还需要将...【详细内容】
2021-12-22    51CTO  Tags:Liquibase   点击:(3)  评论:(0)  加入收藏
场景描述:由于生产环境的表比较复杂,字段很多。这里我们做下简化,只为说明今天要聊的问题。有两张表 tab1,tab2: tab1 数据如下: tab2 数据如下: 然后给你看下,我用来统计 name=&#3...【详细内容】
2021-12-20  Bald    Tags:SQL   点击:(5)  评论:(0)  加入收藏
前言知识无底,学海无涯,知识点虽然简单,但是比较多,所以将MySQL的基础写出来,方便自己以后查找,还有就是分享给大家。一、SQL简述1.SQL的概述Structure Query Language(结构化查...【详细内容】
2021-12-16  谣言止于独立思考    Tags:SQL基础   点击:(13)  评论:(0)  加入收藏
前言作为一名测试工程师,工作中在对测试结果进行数据比对的时候,或多或少要和数据库打交道的,要和数据库打交道,那么一些常用的 SQL 查询语法必须要掌握。最近有部分做测试小伙...【详细内容】
2021-12-14  柠檬班软件测试    Tags:SQL   点击:(15)  评论:(0)  加入收藏
话说C是面向内存的编程语言。数据要能存得进去,取得出来,且要考虑效率。不管是顺序存储还是链式存储,其寻址方式总是很重要。顺序存储是连续存储。同质结构的数组通过其索引表...【详细内容】
2021-12-08  小智雅汇    Tags:数据存储   点击:(17)  评论:(0)  加入收藏
概述DBConvert Studio 是一款强大的跨数据库迁移和同步软件,可在不同数据库格式之间转换数据库结构和数据。它将成熟、稳定、久经考验的 DBConvert 和 DBSync 核心与改进的现...【详细内容】
2021-11-17  雪竹聊运维    Tags:数据库   点击:(26)  评论:(0)  加入收藏
一、前言 大家好,我是小诚,《从0到1-全面深刻理解MySQL系列》已经来到第四章,这一章节的主要从一条SQL执行的开始,由浅入深的解析SQL语句由客户端到服务器的完整执行流程,最...【详细内容】
2021-11-09  woaker    Tags:SQL   点击:(35)  评论:(0)  加入收藏
相关文章
    无相关信息
最新更新
栏目热门
栏目头条