Ezekielx
Ezekielx
发布于 2026-06-13 / 1 阅读
0
0

大数据项目 在线教育实时数仓-02-业务数据采集平台

一、前言

上一期在这里🥰:大数据项目 在线教育实时数仓-01-用户行为采集平台 - 滕王阁

简介

在线教育业务流程可以以一个普通用户的浏览足迹为例进行说明,用户点开在线教育网站首页开始浏览,可能会通过分类查询也可能通过全文检索寻找自己中意的课程,这些课程都是存储在后台管理系统中的。

当用户寻找到自己中意的课程,可能会想要购买,将商品添加到购物车后发现需要登录,登录后对课程进行结算,这时候购物车的管理和课程订单信息的生成都会对业务数据库产生影响,会生成相应的订单数据和支付数据。

订单正式生成之后,还会对订单进行跟踪处理,直到订单全部完成。

在线教育的主要业务流程包括用户前台浏览课程时的课程详情的管理,用户课程加入购物车进行支付时用户个人中心&支付服务的管理,用户支付完成后订单后台服务的管理,这些流程涉及到了十几个甚至几十个业务数据表,甚至更多。

二、表结构

以下为本在线教育系统涉及到的业务数据表结构关系。这 25 张表以订单表、用户表、 课程信息表、测验表和用户章节进度表为中心,延伸出了支付表、订单明细表、章节表、课程评价表、科目表、试卷表、知识点表等,用户表提供用户的详细信息,支付表提供该订单的支付详情,订单详情表提供订单的课程等情况,课程表给订单明细表提供课程的详细信息。

3BS0LQ71-1.png

3BS0LQ71-2.png

分类表(base_category_info)

字段名 字段说明
id 编号(主键)
category_name 分类名称
create_time 创建时间
update_time 更新时间
deleted 是否删除

省份表(base_province)

字段名 字段说明
id 编号(主键)
name 省份名称
region_id 大区id
area_code 行政区位码
iso_code 国际编码
iso_3166_2 ISO3166 编码

来源表(base_source)

字段名 字段说明
id 编号(主键)
source_site 引流来源名称
source_url 引流来源链接

科目表(base_subject_info)

字段名 字段说明
id 编号(主键)
subject_name 科目名称
category_id 分类
create_time 创建时间
update_time 更新时间
deleted 是否删除

加购表(cart_info)

字段名 字段说明
id 编号(主键)
user_id 用户id
course_id 课程id
course_name 课程名称(冗余)
cart_price 放入购物车时价格
img_url 图片URL
session_id 会话id
create_time 创建时间
update_time 修改时间
deleted 是否删除
sold 是否已售

章节表(chapter_info)

字段名 字段说明
id 编号(主键)
chapter_name 章节名称
course_id 课程id
video_id 视频id
publisher_id 发布者id
is_free 是否免费
create_time 创建时间
update_time 更新时间
deleted 是否删除

章节评价表(comment_info)

字段名 字段说明
id 编号(主键)
user_id 用户id
chapter_id 章节id
course_id 课程id
comment_txt 评价内容
create_time 创建时间
deleted 是否删除

课程信息表(course_info)

字段名 字段说明
id 编号(主键)
course_name 课程名称
course_slogan 课程标语
course_cover_url 课程封面URL
subject_id 科目id
teacher 讲师名称
publisher_id 发布者id
chapter_num 章节数
origin_price 原价
reduce_amount 优惠金额
actual_price 实际价格
course_introduce 课程介绍
create_time 创建时间
update_time 更新时间
deleted 是否删除

收藏表(favor_info)

字段名 字段说明
id 编号(主键)
course_id 课程id
user_id 用户id
create_time 创建时间
update_time 更新时间
deleted 是否删除

知识点表(knowledge_point)

字段名 字段说明
id 编号(主键)
point_txt 知识点内容
point_level 知识点级别
course_id 课程id
chapter_id 章节id
publisher_id 发布者id
create_time 创建时间
update_time 修改时间
deleted 是否删除

订单明细表(order_detail)

字段名 字段说明
id 编号(主键)
course_id 课程id
course_name 课程名称
order_id 订单编号
user_id 用户id
origin_amount 原始金额
coupon_reduce 优惠券减免金额
final_amount 最终金额
session_id 会话id (当前会话id 继承购物车中会话id)
create_time 创建时间
update_time 更新时间

订单表(order_info)

字段名 字段说明
id 编号(主键)
user_id 用户id
origin_amount 原始金额
coupon_reduce 优惠券减免
final_amount 最终金额
order_status 订单状态
out_trade_no 订单交易编号(第三方支付用)
trade_body 订单描述(第三方支付用)
session_id 会话id
province_id 省份id
create_time 创建时间
expire_time 失效时间
update_time 更新时间

支付表(payment_info)

字段名 字段说明
id 编号(主键)
out_trade_no 对外业务编号
order_id 订单编号
alipay_trade_no 支付宝交易编号
total_amount 支付金额
trade_body 交易内容
payment_type 支付类型
payment_status 支付状态
create_time 创建时间
update_time 更新时间
callback_content 回调信息
callback_time 回调时间

课程评价表(review_info)

字段名 字段说明
id 编号(主键)
user_id 用户id
course_id 课程id
review_txt 评价内容
review_stars 评价星级
create_time 创建时间
deleted 是否删除

测验表(test_exam)

字段名 字段说明
id 编号(主键)
paper_id 试卷id
user_id 用户id
score 分数
duration_sec 所用时长
create_time 创建时间
submit_time 提交时间
update_time 更新时间
deleted 是否删除

测验问题表(test_exam_question)

字段名 字段说明
id 编号(主键)
exam_id 考试id
paper_id 试卷id
question_id 问题id
user_id 用户id
answer 答案
is_correct 是否正确
score 本题得分
create_time 创建时间
update_time 更新时间
deleted 是否删除

试卷表(test_paper)

字段名 字段说明
id 编号(主键)
paper_title 试卷名称
course_id 课程id
create_time 创建时间
update_time 更新时间
publisher_id 发布者id
deleted 是否删除

试卷问题表(test_paper_question)

字段名 字段说明
id 编号(主键)
paper_id 试卷id
question_id 题目id
score 题目分值
create_time 创建时间
deleted 是否删除
publisher_id 发布者id

知识点问题表(test_point_question)

字段名 字段说明
id 编号(主键)
point_id 知识点id
question_id 问题id
create_time 创建时间
publisher_id 发布者id
deleted 是否删除

问题信息表(test_question_info)

字段名 字段说明
id 编号(主键)
question_txt 题目内容
chapter_id 章节id
course_id 课程id
question_type 题目类型
create_time 创建时间
update_time 更新时间
publisher_id 发布者id
deleted 是否删除

问题选项表(test_question_option)

字段名 字段说明
id 编号(主键)
option_txt 选项内容
question_id 题目id
is_correct 是否正确
create_time 创建时间
update_time 更新时间
deleted 是否删除

用户章节进度表(user_chapter_process)

字段名 字段说明
id 编号(主键)
course_id 课程id
chapter_id 章节id
user_id 用户id
position_sec 播放位置(秒)
create_time 创建时间
update_time 更新时间
deleted 是否删除

用户表(user_info)

字段名 字段说明
id 编号(主键)
login_name 用户名称
nick_name 用户昵称
passwd 用户密码
real_name 用户姓名
phone_num 手机号
email 邮箱
head_img 头像URL
user_level 用户级别
birthday 用户生日
gender 性别 M男,F女
create_time 创建时间
operate_time 修改时间
status 状态

视频表(video_info)

字段名 字段说明
id 编号(主键)
video_name 视频名称
during_sec 时长
video_status 状态 未上传,上传中,上传完
video_size 大小
video_url 视频存储路径
video_source_id 云端资源编号
version_id 版本号
chapter_id 章节id
course_id 课程id
publisher_id 发布者id
create_time 创建时间
update_time 更新时间
deleted 是否删除

VIP 变化表(vip_change_detail)

字段名 字段说明
id 编号(主键)
user_id 用户id
from_vip 原VIP等级
to_vip 新VIP等级
create_time 创建时间

三、业务数据采集模块

业务数据是数据仓库的重要数据来源。用户、课程、订单、支付、评价等数据最初都存储在业务数据库中,若要在数仓中进行统一建模和统计分析,就需要先将这些数据同步到数据仓库中。

在离线数仓中,数据计算通常以天为周期,因此业务数据同步一般也按天进行。也就是说,每天定时从业务数据库中抽取数据,写入数据仓库,供后续分层建模和指标统计使用。

业务数据同步时,常见的同步策略主要有两种:全量同步增量同步

1、数据同步策略

业务数据同步策略主要分为两类:

  • 全量同步
  • 增量同步

全量同步

3BS0LQ71-3.png

全量同步是指每天将业务数据库中某张表的全部数据完整同步到数据仓库中。

这种方式实现简单,不需要判断哪些数据发生了变化,只需要每天重新同步整张表即可。因此,全量同步是保证业务数据库和数据仓库数据一致性较为直接的方式。

不过,当表数据量较大、每日变化数据较少时,如果仍然每天同步整张表,就会产生较多重复传输和重复存储。

增量同步

3BS0LQ71-4.png

增量同步是指每天只同步业务数据库中新增或发生变化的数据。

相比全量同步,增量同步可以减少重复数据的传输和存储,更适合数据量较大、但每日变化比例较低的表。

需要注意的是,采用增量同步的表通常需要在首日先进行一次全量同步,用于初始化历史数据,之后再每天同步新增和变化数据。

2、数据同步策略选择

同步策略 优点 缺点
全量同步 逻辑简单,实现方便 当表数据量较大且每日变化较少时,会重复同步和存储大量相同数据,效率较低
增量同步 同步效率高,避免重复同步和存储未变化数据 实现逻辑相对复杂,需要将每日新增及变化数据与历史数据进行整合
  • 如果表数据量较小,或者每日变化数据占比较高,可以采用全量同步。
  • 如果表数据量较大,且每日变化数据占比较低,更适合采用增量同步。
  • 如果后续数仓建模需要保留每日快照,也可以对部分表采用全量同步。

本项目中,各业务表的同步策略如下:

3BS0LQ71-5.png

由于后续数仓建模需要,cart_info 表既需要进行全量同步,也需要进行增量同步。后续章节会结合具体建模过程进行说明。

3、数据同步工具选型

常见的数据同步工具较多,按照同步原理可以大致分为两类:

  1. 基于查询的离线批量同步工具
  2. 基于数据库变更日志的实时同步工具

基于查询的离线批量同步工具

这类工具的代表有:

  • DataX
  • Sqoop

它们通常通过 select 查询的方式从业务数据库中读取数据,再将数据写入目标存储系统。

全量同步通常使用这类工具实现,因为全量同步只需要每天查询整张表即可。

增量同步也可以使用这类工具实现,但通常要求业务表中存在类似下面的时间字段:

  • create_time
  • update_time

用于筛选出指定时间范围内新增或发生变化的数据。

基于变更日志的实时同步工具

这类工具的代表有:

  • Maxwell
  • Canal

它们通过监听数据库的变更日志来捕获数据变化。例如在 MySQL 中,可以通过监听 binlog 获取表中的 insertupdatedelete 操作。

通常用于增量同步,因为它能够较完整地记录数据的变化过程。

增量同步方案对比

增量同步方案 DataX / Sqoop Maxwell / Canal
同步原理 基于 select 查询获取数据 基于数据库变更日志获取数据
对数据库的要求 表中通常需要存在 create_timeupdate_time 等时间字段,用于筛选新增和变化数据 数据库需要开启变更日志,例如 MySQL 需要开启 binlog
数据中间状态 离线批量同步,只能获取某个时间点的最终状态;如果一条数据一天内变化多次,中间状态无法获取 实时捕获所有变更操作,可以获取数据变化过程中的中间状态
适用场景 适合全量同步,或简单的离线增量同步 适合实时增量同步,尤其适合需要完整变更过程的场景

在本项目中,业务数据同步工具选择:

  • 全量同步采用 DataX
  • 增量同步采用 Maxwell

4、安装 DataX

这个项目怎么又不维护了,成天学过时的技术是吧😡。

DataX 是阿里开源的一款离线数据同步工具,主要用于在不同数据源之间进行批量数据传输。

它可以把数据从一个地方读取出来,再写入到另一个地方。例如:

  • 从 MySQL 同步到 HDFS
  • 从 MySQL 同步到 Hive
  • 从 Oracle 同步到 MySQL
  • 从文本文件同步到数据库

DataX 的核心思想是 Reader + Writer

  • Reader:负责从源端读取数据
  • Writer:负责把数据写入目标端

例如,从 MySQL 同步数据到 HDFS 时:

  • mysqlreader 负责读取 MySQL 数据
  • hdfswriter 负责写入 HDFS

DataX 通常用于业务数据的全量同步,也可以配合时间字段实现简单的增量同步。

软件包下载

GitHub 官方仓库地址:https://github.com/alibaba/DataX

README.TXT 里面有下载地址。

百度网盘下载地址:https://pan.baidu.com/s/1BeGFoCIST1u5U7t3Dn303Q?pwd=vzke

下载好后将软件压缩包使用 MobeXterm 上传到 hadoop101 的 ~/ansible/files 目录下。

和前面安装软件的过程一样,复制文件名,将之前写好的 deploy_package.yml 中的变量进行替换,注意 hosts 改为 hadoop101,这个软件不需要所以集群都安装。

替换后的 deploy_package.yml:

- name: Deploy package
  hosts: hadoop101

  vars:
    pkg_name: datax.tar.gz
    dest_dir: datax-202309

  tasks:
    - name: Copy archive
      copy:
        src: "./files/{{ pkg_name }}"
        dest: "/opt/{{ pkg_name }}"

    - name: Get top directory name from archive
      shell: tar -tf /opt/{{ pkg_name }} | head -1 | cut -d/ -f1
      register: archive_dir

    - name: Extract archive
      command: tar -xzf /opt/{{ pkg_name }} -C /opt

    - name: Rename directory
      command: mv /opt/{{ archive_dir.stdout | trim }} /opt/{{ dest_dir }}
      args:
        creates: /opt/{{ dest_dir }}

    - name: Change ownership
      file:
        path: "/opt/{{ dest_dir }}"
        owner: vaultattic
        group: vaultattic
        recurse: yes

    - name: Remove archive
      file:
        path: "/opt/{{ pkg_name }}"
        state: absent

运行 deploy_package.yml:

ansible-playbook deploy_package.yml

执行完后可以在各节点的 /opt 目录下查看 Datax 目录是否只存在 hadoop101 上。

cluster-run.sh ls /opt
[vaultattic@hadoop101 ansible]$ cluster-run.sh ls /opt
--------- hadoop101 ----------
amazon-corretto-8
data_mocker
datax-202309
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
--------- hadoop102 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
--------- hadoop103 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
[vaultattic@hadoop101 ansible]$

使用 set_env.yml 配置 DATAX_HOME 环境变量。

复制要配置环境变量的目录路径,将 set_env.yml 中的变量进行替换:

---
- name: Configure environment variables
  hosts: hadoop101

  vars:
    env_name: DATAX_HOME
    env_path: /opt/datax-202309

  tasks:

    - name: Add environment variables to /home/vaultattic/.bashrc
      lineinfile:
        path: "/home/vaultattic/.bashrc"
        line: "export {{ env_name }}={{ env_path }}"
        create: yes

    - name: Add PATH for {{ env_name }}
      lineinfile:
        path: "/home/vaultattic/.bashrc"
        line: "export PATH=${{ env_name }}/bin:$PATH"

运行 set_env.yml:

ansible-playbook set_env.yml

执行 playbook 后刷新环境变量,并执行一个自检任务:

source ~/.bashrc
python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/job.json
[vaultattic@hadoop101 ansible]$ source ~/.bashrc
[vaultattic@hadoop101 ansible]$ python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/job.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2026-06-08 17:23:19.099 [main] INFO  MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2026-06-08 17:23:19.103 [main] INFO  MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2026-06-08 17:23:19.169 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2026-06-08 17:23:19.177 [main] INFO  Engine - the machine info  =>

        osInfo: Linux amd64 6.12.0-124.8.1.el10_1.x86_64
        jvmInfo:        Amazon.com Inc. 1.8 25.482-b08
        cpu num:        2

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size
        PS Eden Space                  | 256.00MB                       | 256.00MB
        Code Cache                     | 240.00MB                       | 2.44MB
        Compressed Class Space         | 1,024.00MB                     | 0.00MB
        PS Survivor Space              | 42.50MB                        | 42.50MB
        PS Old Gen                     | 683.00MB                       | 683.00MB
        Metaspace                      | -0.00MB                        | 0.00MB


2026-06-08 17:23:19.189 [main] INFO  Engine -
{
        "setting":{
                "speed":{
                        "channel":1
                },
                "errorLimit":{
                        "record":0,
                        "percentage":0.02
                }
        },
        "content":[
                {
                        "reader":{
                                "name":"streamreader",
                                "parameter":{
                                        "column":[
                                                {
                                                        "value":"DataX",
                                                        "type":"string"
                                                },
                                                {
                                                        "value":20250101,
                                                        "type":"long"
                                                },
                                                {
                                                        "value":"2025-01-01 00:00:00",
                                                        "type":"date"
                                                },
                                                {
                                                        "value":true,
                                                        "type":"bool"
                                                },
                                                {
                                                        "value":"test",
                                                        "type":"bytes"
                                                }
                                        ],
                                        "sliceRecordCount":100000
                                }
                        },
                        "writer":{
                                "name":"streamwriter",
                                "parameter":{
                                        "print":false,
                                        "encoding":"UTF-8"
                                }
                        }
                }
        ]
}

2026-06-08 17:23:19.221 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false
2026-06-08 17:23:19.221 [main] INFO  JobContainer - DataX jobContainer starts job.
2026-06-08 17:23:19.222 [main] INFO  JobContainer - Set jobId = 0
2026-06-08 17:23:19.237 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2026-06-08 17:23:19.237 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do prepare work .
2026-06-08 17:23:19.237 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2026-06-08 17:23:19.238 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2026-06-08 17:23:19.238 [job-0] INFO  JobContainer - Job set Channel-Number to 1 channels.
2026-06-08 17:23:19.238 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2026-06-08 17:23:19.239 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2026-06-08 17:23:19.260 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2026-06-08 17:23:19.269 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2026-06-08 17:23:19.272 [job-0] INFO  JobContainer - Running by standalone Mode.
2026-06-08 17:23:19.285 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2026-06-08 17:23:19.292 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2026-06-08 17:23:19.293 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2026-06-08 17:23:19.309 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2026-06-08 17:23:19.713 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[404]ms
2026-06-08 17:23:19.714 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2026-06-08 17:23:29.297 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.069s |  All Task WaitReaderTime 0.091s | Percentage 100.00%
2026-06-08 17:23:29.297 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2026-06-08 17:23:29.298 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2026-06-08 17:23:29.298 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.
2026-06-08 17:23:29.298 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2026-06-08 17:23:29.299 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/datax-v202309/hook
2026-06-08 17:23:29.300 [job-0] INFO  JobContainer -
         [total cpu info] =>
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] =>
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s

2026-06-08 17:23:29.300 [job-0] INFO  JobContainer - PerfTrace not enable!
2026-06-08 17:23:29.301 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.069s |  All Task WaitReaderTime 0.091s | Percentage 100.00%
2026-06-08 17:23:29.302 [job-0] INFO  JobContainer -
任务启动时刻                    : 2026-06-08 17:23:19
任务结束时刻                    : 2026-06-08 17:23:29
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

[vaultattic@hadoop101 ansible]$

读写失败总数如果为 0 说明安装成功🥰。

DataX 的使用方式很简单:根据数据同步的目标选择对应的 Reader 和 Writer,然后将相关参数写入一个 JSON 配置文件,最后通过 DataX 命令提交任务即可。

例如,将 MySQL 数据同步到 HDFS 时,需要使用:

  • mysqlreader:负责读取 MySQL 数据
  • hdfswriter:负责写入 HDFS 数据

如果是将 HDFS 数据同步到 MySQL,则需要使用:

  • hdfsreader:负责读取 HDFS 数据
  • mysqlwriter:负责写入 MySQL 数据

DataX 任务提交命令

DataX 任务通过 datax.py 脚本提交,基本命令格式如下:

python $DATAX_HOME/bin/datax.py path/to/your/job.json

其中,path/to/your/job.json 是 DataX 任务配置文件路径。

例如,在 DataX 安装目录下执行某个任务:

任务提交后,DataX 会根据 JSON 配置文件中的 Reader 和 Writer 信息,完成数据读取、传输和写入。

DataX 配置文件格式

DataX 的同步任务通过 JSON 文件进行描述,本项目中,全量表同步使用的插件和 JSON 配置文档如下:

数据方向 DataX 插件 插件文档
MySQL -> DataX mysqlreade https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
DataX -> HDFS hdfswriter https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

编写配置文件时,可以先通过命令查看指定 Reader 和 Writer 的配置模板。

例如,查看 mysqlreaderhdfswriter 的配置模板:

python $DATAX_HOME/bin/datax.py -r mysqlreader -w hdfswriter
[vaultattic@hadoop101 ~]$ python $DATAX_HOME/bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [],
                        "compress": "",
                        "defaultFS": "",
                        "fieldDelimiter": "",
                        "fileName": "",
                        "fileType": "",
                        "path": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
[vaultattic@hadoop101 ~]$

从结构上看,DataX 配置文件最外层是 job,其中主要包含两部分:

配置项 说明
setting 用于配置整个同步任务的运行参数,例如并发数、限速等
content 用于配置数据源和目标端,也就是 Reader 和 Writer

其中,content 中又包含:

配置项 说明
reader 数据读取端配置,指定从哪里读取数据
writer 数据写入端配置,指定将数据写到哪里

Reader 和 Writer 的具体参数会根据插件不同而有所差异。例如,mysqlreader 需要配置 MySQL 连接信息、表名、字段等;hdfswriter 需要配置 HDFS 地址、输出路径、文件格式等。

常见配置项含义如下:

配置项 说明
name 插件名称,例如 mysqlreaderhdfswriter
parameter 插件参数配置,不同插件参数不同
connection 数据源或目标端连接信息
column 读取或写入的字段列表
setting.speed.channel 同步任务的并发通道数

更多参数可以参考 DataX GitHub 官网仓库 README.TXT 中的一个支持表格:https://github.com/alibaba/DataX 。

3BS0LQ71-6.png

5、安装 Maxwell

Maxwell 是一款基于 MySQL binlog 的实时数据采集工具,主要用于捕获 MySQL 中的数据变更,并将变更数据发送到 Kafka、Kinesis、RabbitMQ、Redis 等目标端。

它的工作方式可以简单理解为:Maxwell 伪装成 MySQL 的一个从库,实时监听 MySQL 的 binlog 日志。当业务表发生 insertupdatedelete 操作时,Maxwell 会解析这些变更,并将变更内容封装成 JSON 格式输出。

例如:

MySQL 执行插入语句:

INSERT INTO school.student(id, name, age)
VALUES(1, 'tom', 18);

Maxwell 输出示例:

{
  "database": "school",
  "table": "student",
  "type": "insert",
  "ts": 1700000000,
  "xid": 203401,
  "commit": true,
  "data": {
    "id": 1,
    "name": "tom",
    "age": 18
  }
}

MySQL 执行修改语句:

UPDATE school.student
SET age = 19
WHERE id = 1;

Maxwell 输出示例:

{
  "database": "school",
  "table": "student",
  "type": "update",
  "ts": 1700000060,
  "xid": 203522,
  "commit": true,
  "data": {
    "id": 1,
    "name": "tom",
    "age": 19
  },
  "old": {
    "age": 18
  }
}

MySQL 执行删除语句:

DELETE FROM school.student
WHERE id = 1;

Maxwell 输出示例:

{
  "database": "school",
  "table": "student",
  "type": "delete",
  "ts": 1700000120,
  "xid": 203689,
  "commit": true,
  "data": {
    "id": 1,
    "name": "tom",
    "age": 19
  }
}

Maxwell 常用于业务数据的增量同步。相比每天通过 SQL 查询增量数据,Maxwell 可以更完整地捕获数据的变化过程,尤其是一条数据在一天内多次变化时,也可以记录每一次变更。

软件包下载

Maxwell 从 1.30.0 开始就不支持 JDK 8 了,使用这个项目要用的话得去他们的 GitHub 仓库下载历史版本,不能用最新版。

去官网下载最新版使用。

本来是按照原教程使用最后一个支持 JDK 8 的版本的,也就是 1.29.2。但这有出现了一个问题,我使用的 MySQL 数据库版本也没按教程的来,用的是新版 8.4.8,而 Maxwell 1.29.2 在运行时会使用 SHOW MASTER STATUS; 命令,这个命令在 MySQL 8.4.8 已经被废弃了,所以启动时会报错:

ERROR 1064 (42000): You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near 'MASTER STATUS' at line 1

现在只能被迫使用最新版的 Maxwell,单独将他的 JDK 改为更高的版本😵‍💫。

3BS0LQ71-7.png

没找到还有后续,实测官网的那个最新版也不可以使用,我当时该用官网的最新版后就没测试了,后面整体测试时又出现了一样的问题,后面才官方的那个甚至也不是最新版(可能是忘记更新官网下载链接了),最新版得去他们的 GitHub Release 里面找,而且这个 BUG 到 1.42.1 才修复,还得用很高的版本🥲。

3BS0LQ71-8.png

GitHub 官方仓库下载链接:https://github.com/zendesk/maxwell/releases/download/v1.44.1/maxwell-1.44.1.tar.gz

百度网盘下载链接:https://pan.baidu.com/s/1c9mHGQzVviUSrh6j_tCx9g?pwd=3a3n

下载好后将软件压缩包使用 MobeXterm 上传到 hadoop101 的 ~/ansible/files 目录下。

和前面安装软件的过程一样,复制文件名,将之前写好的 deploy_package.yml 中的变量进行替换,注意 hosts 改为 hadoop101,这个软件不需要所以集群都安装。

替换后的 deploy_package.yml:

- name: Deploy package
  hosts: hadoop101

  vars:
    pkg_name: maxwell-1.44.1.tar.gz
    dest_dir: maxwell-1.44.1

  tasks:
    - name: Copy archive
      copy:
        src: "./files/{{ pkg_name }}"
        dest: "/opt/{{ pkg_name }}"

    - name: Get top directory name from archive
      shell: tar -tf /opt/{{ pkg_name }} | head -1 | cut -d/ -f1
      register: archive_dir

    - name: Extract archive
      command: tar -xzf /opt/{{ pkg_name }} -C /opt

    - name: Rename directory
      command: mv /opt/{{ archive_dir.stdout | trim }} /opt/{{ dest_dir }}
      args:
        creates: /opt/{{ dest_dir }}

    - name: Change ownership
      file:
        path: "/opt/{{ dest_dir }}"
        owner: vaultattic
        group: vaultattic
        recurse: yes

    - name: Remove archive
      file:
        path: "/opt/{{ pkg_name }}"
        state: absent

运行 deploy_package.yml:

ansible-playbook deploy_package.yml

执行完后可以在各节点的 /opt 目录下查看 Maxwell 目录是否只存在 hadoop101 上。

cluster-run.sh ls /opt
[vaultattic@hadoop101 ansible]$ cluster-run.sh ls /opt
--------- hadoop101 ----------
amazon-corretto-8
data_mocker
datax-202309
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
maxwell-1.44.1
zookeeper-3.8.6
--------- hadoop102 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
--------- hadoop103 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
[vaultattic@hadoop101 ansible]$

安装 JDK 11

上面解释了新版本 Maxwell 不兼容 JDK 8,老版本 Maxwell 不兼容 MySQL 8.4.8,所以下面给新版 Maxwell 单独使用 JDK 11。

其实如果像我一样喜欢用新版的话一开始就应该全部换成 JDK 11 的,我会看了一下 JDK 11 可以全部兼容这个项目,JDK 8 还是太老了😥。

这里用 OpenJDK 的发行版 Corretto 8,下载地址:Downloads for Amazon Corretto 8 - Amazon Corretto 8

Corretto 8 百度网盘下载链接:https://pan.baidu.com/s/1RPezDnEcdqbc93pu6H_vWQ?pwd=ikqu

下载好后将软件压缩包使用 MobeXterm 上传到 hadoop101 的 ~/ansible/files 目录下。

和前面安装软件的过程一样,复制文件名,将之前写好的 deploy_package.yml 中的变量进行替换,注意 hosts 改为 hadoop101,这个软件不需要所以集群都安装。

替换后的 deploy_package.yml:

- name: Deploy package
  hosts: hadoop101

  vars:
    pkg_name: amazon-corretto-11.0.31.11.1-linux-x64.tar.gz
    dest_dir: amazon-corretto-11

  tasks:
    - name: Copy archive
      copy:
        src: "./files/{{ pkg_name }}"
        dest: "/opt/{{ pkg_name }}"

    - name: Get top directory name from archive
      shell: tar -tf /opt/{{ pkg_name }} | head -1 | cut -d/ -f1
      register: archive_dir

    - name: Extract archive
      command: tar -xzf /opt/{{ pkg_name }} -C /opt

    - name: Rename directory
      command: mv /opt/{{ archive_dir.stdout | trim }} /opt/{{ dest_dir }}
      args:
        creates: /opt/{{ dest_dir }}

    - name: Change ownership
      file:
        path: "/opt/{{ dest_dir }}"
        owner: vaultattic
        group: vaultattic
        recurse: yes

    - name: Remove archive
      file:
        path: "/opt/{{ pkg_name }}"
        state: absent

运行 deploy_package.yml:

ansible-playbook deploy_package.yml

执行完后可以在各节点的 /opt 目录下查看 amazon-corretto-11 目录是否只存在 hadoop101 上。

cluster-run.sh ls /opt
[vaultattic@hadoop101 ansible]$ cluster-run.sh ls /opt
--------- hadoop101 ----------
amazon-corretto-11
amazon-corretto-8
data_mocker
datax-202309
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
maxwell-1.44.1
zookeeper-3.8.6
--------- hadoop102 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
--------- hadoop103 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
[vaultattic@hadoop101 ansible]$

开启 MySQL binlog

Maxwell 通过监听 MySQL binlog 获取数据变更,因此需要先开启 MySQL binlog,并将 binlog 格式设置为 ROW

官方文档:Quick Start - Maxwell's Daemon

3BS0LQ71-9.png

编辑 MySQL 配置文件:

sudo vim /etc/my.cnf

[mysqld] 下添加或修改以下配置:

server_id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=edu
参数 说明
server_id MySQL 服务编号,开启 binlog 时必须配置
log-bin 开启 binlog,并指定 binlog 文件名前缀
binlog_format binlog 日志格式,Maxwell 要求为 row
binlog-do-db 指定记录 binlog 的数据库,这里表示只记录 edu

3BS0LQ71-10.png

修改完成后,重启 MySQL:

sudo systemctl restart mysqld

创建 Maxwell 用户并授权

Maxwell 需要连接 MySQL,并读取 binlog 日志,因此建议创建一个专门的 MySQL 用户。

使用 MySQL 的 root 用户登录 MySQL 命令行(密码 123456):

mysql -u root -p

在 MySQL 命令行中执行:

SET GLOBAL validate_password.policy=LOW;
SET GLOBAL validate_password.length=6;

CREATE DATABASE IF NOT EXISTS maxwell;

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
CREATE USER 'maxwell'@'localhost' IDENTIFIED BY '123456';

GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';

FLUSH PRIVILEGES;

修改 Maxwell 配置文件

进入 Maxwell 安装目录:

cd /opt/maxwell-1.44.1

复制配置文件模板:

cp config.properties.example config.properties

编辑配置文件:

vim config.properties

更改或添加以下配置项(直接粘贴到配置文件或者取消注释后修改都行):

producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092

host=hadoop101
port=3306
user=maxwell
password=123456
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true

kafka_topic=topic_db

3BS0LQ71-11.png

编写 Maxwell 集群管理脚本

前面说的单独使用 JDK 11 就写在启动脚本里面了。

创建启动脚本:

vim ~/bin/maxwell-start.sh
#!/bin/bash

host=$(hostname)

echo "========== 启动 ${host} Maxwell =========="

export JAVA_HOME=/opt/amazon-corretto-11
export PATH=$JAVA_HOME/bin:$PATH

MAXWELL_HOME=/opt/maxwell-1.44.1

PID=$(jps -l | grep 'com.zendesk.maxwell.Maxwell' | awk '{print $1}')

if [ -n "$PID" ]; then
    echo "Maxwell 已运行,PID=$PID"
    exit 0
fi

mkdir -p $MAXWELL_HOME/logs

nohup $MAXWELL_HOME/bin/maxwell \
--config $MAXWELL_HOME/config.properties \
> $MAXWELL_HOME/logs/maxwell.log 2>&1 &

sleep 3

PID=$(jps -l | grep 'com.zendesk.maxwell.Maxwell' | awk '{print $1}')

if [ -n "$PID" ]; then
    echo "Maxwell 启动成功,PID=$PID"
else
    echo "Maxwell 启动失败,请查看日志:$MAXWELL_HOME/logs/maxwell.log"
fi

创建停止脚本:

vim ~/bin/maxwell-stop.sh
#!/bin/bash

host=$(hostname)

echo "========== 停止 ${host} Maxwell =========="

export JAVA_HOME=/opt/amazon-corretto-11
export PATH=$JAVA_HOME/bin:$PATH

PID=$(jps -l | grep 'com.zendesk.maxwell.Maxwell' | awk '{print $1}')

if [ -z "$PID" ]; then
    echo "Maxwell 未运行"
    exit 0
fi

kill $PID

sleep 3

PID=$(jps -l | grep 'com.zendesk.maxwell.Maxwell' | awk '{print $1}')

if [ -z "$PID" ]; then
    echo "Maxwell 停止成功"
else
    kill -9 $PID
    echo "Maxwell 已强制停止"
fi

创建状态检查脚本:

vim ~/bin/maxwell-status.sh
#!/bin/bash

host=$(hostname)

echo "========== 查看 ${host} Maxwell 状态 =========="

export JAVA_HOME=/opt/amazon-corretto-11
export PATH=$JAVA_HOME/bin:$PATH

PID=$(jps -l | grep 'com.zendesk.maxwell.Maxwell' | awk '{print $1}')

if [ -n "$PID" ]; then
    echo "Maxwell 正在运行,PID=$PID"
else
    echo "Maxwell 未运行"
fi

给脚本添加执行权限:

chmod +x ~/bin/maxwell-*.sh

6、全量表数据同步

同步目标

全量表指每天需要完整同步一份快照数据的业务表。

本项目中,全量表数据由 DataX 从 MySQL 业务库同步到 HDFS,后续由 Hive 外部表进行加载和分析。

全量同步的目标路径格式如下:/origin_data/edu/db/表名_full/日期

3BS0LQ71-13.png

全量表数据同步链路如下:

flowchart LR A[MySQL 业务库 edu] --> B[DataX] B --> C[HDFS 原始数据层] C --> D[Hive ODS 层]
  • DataX 通过 mysqlreader 读取 MySQL 表数据;
  • DataX 通过 hdfswriter 将数据写入 HDFS;
  • HDFS 路径中包含日期分区,用于保存每日快照;
  • 后续 Hive ODS 层基于该路径加载数据。

DataX 配置文件生成脚本

如果每张表都手写一个 DataX JSON 配置文件,工作量较大,而且容易出错。

因此这里编写一个 Python 脚本,根据 MySQL 的元数据信息自动生成 DataX 配置文件。

生成逻辑如下:

  • 查询 information_schema.COLUMNS 获取表字段;
  • 根据 MySQL 字段类型映射为 HDFSWriter 支持的字段类型;
  • 自动拼接 mysqlreaderhdfswriter 配置;
  • 将 JSON 配置文件写入指定目录 /opt/datax-202309/job/import

安装 Python MySQL 驱动(原教程用的 Python 2 的驱动,我换成 Python 3 的了):

sudo yum install -y python3 python3-pip
python3 -m pip install PyMySQL

创建 gen_import_config.py 脚本:

vim ~/bin/gen_import_config.py
import json
import getopt
import os
import sys
import pymysql

# MySQL 相关配置
mysql_host = "hadoop101"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "123456"

# HDFS NameNode 相关配置
hdfs_nn_host = "hadoop101"
hdfs_nn_port = "8020"

# DataX 配置文件输出目录
output_path = "/opt/datax-202309/job/import"

def get_connection():
    return pymysql.connect(
        host=mysql_host,
        port=int(mysql_port),
        user=mysql_user,
        password=mysql_passwd,
        charset="utf8"
    )

def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()

    sql = """
        SELECT COLUMN_NAME, DATA_TYPE
        FROM information_schema.COLUMNS
        WHERE TABLE_SCHEMA = %s
          AND TABLE_NAME = %s
        ORDER BY ORDINAL_POSITION
    """

    cursor.execute(sql, [database, table])
    result = cursor.fetchall()

    cursor.close()
    connection.close()

    return result

def get_mysql_columns(database, table):
    return list(map(lambda x: x[0], get_mysql_meta(database, table)))

def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "integer": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "mediumint": "bigint",
            "decimal": "string",
            "numeric": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "varbinary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string",
            "tinytext": "string",
            "mediumtext": "string",
            "longtext": "string",
            "json": "string"
        }
        return mappings.get(mysql_type.lower(), "string")

    meta = get_mysql_meta(database, table)

    return list(
        map(
            lambda x: {
                "name": x[0],
                "type": type_mapping(x[1])
            },
            meta
        )
    )

def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 1
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": mysql_user,
                            "password": mysql_passwd,
                            "column": get_mysql_columns(source_database, source_table),
                            "splitPk": "",
                            "connection": [
                                {
                                    "table": [
                                        source_table
                                    ],
                                    "jdbcUrl": [
                                        "jdbc:mysql://{}:{}/{}?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true".format(
                                            mysql_host,
                                            mysql_port,
                                            source_database
                                        )
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "hdfswriter",
                        "parameter": {
                            "defaultFS": "hdfs://{}:{}".format(
                                hdfs_nn_host,
                                hdfs_nn_port
                            ),
                            "fileType": "text",
                            "path": "${targetdir}",
                            "fileName": source_table,
                            "column": get_hive_columns(source_database, source_table),
                            "writeMode": "append",
                            "fieldDelimiter": "\t",
                            "compress": "gzip"
                        }
                    }
                }
            ]
        }
    }

    if not os.path.exists(output_path):
        os.makedirs(output_path)

    file_name = "{}.{}.json".format(source_database, source_table)
    file_path = os.path.join(output_path, file_name)

    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(job, f, ensure_ascii=False, indent=4)

    print("生成配置文件:{}".format(file_path))

def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(
        args,
        "-d:-t:",
        ["sourcedb=", "sourcetbl="]
    )

    for opt_name, opt_value in options:
        if opt_name in ("-d", "--sourcedb"):
            source_database = opt_value
        if opt_name in ("-t", "--sourcetbl"):
            source_table = opt_value

    if source_database == "" or source_table == "":
        print("用法:python3 gen_import_config.py -d database -t table")
        sys.exit(1)

    generate_json(source_database, source_table)

if __name__ == "__main__":
    main(sys.argv[1:])

脚本使用方式如下:

python3 ~/bin/gen_import_config.py -d 数据库名 -t 表名

例如生成 base_province 表的 DataX 配置文件:

python3 ~/bin/gen_import_config.py -d edu -t base_province

执行后会生成:/opt/datax-202309/job/import/edu.base_province.json

批量生成全量表 DataX 配置文件

为了批量生成所有全量表的 DataX 配置文件,可以再封装一个 Shell 脚本。

创建脚本:

vim ~/bin/gen_import_config.sh
#!/bin/bash

python3 ~/bin/gen_import_config.py -d edu -t base_category_info
python3 ~/bin/gen_import_config.py -d edu -t base_source
python3 ~/bin/gen_import_config.py -d edu -t base_province
python3 ~/bin/gen_import_config.py -d edu -t base_subject_info
python3 ~/bin/gen_import_config.py -d edu -t cart_info
python3 ~/bin/gen_import_config.py -d edu -t chapter_info
python3 ~/bin/gen_import_config.py -d edu -t course_info
python3 ~/bin/gen_import_config.py -d edu -t knowledge_point
python3 ~/bin/gen_import_config.py -d edu -t test_paper
python3 ~/bin/gen_import_config.py -d edu -t test_paper_question
python3 ~/bin/gen_import_config.py -d edu -t test_point_question
python3 ~/bin/gen_import_config.py -d edu -t test_question_info
python3 ~/bin/gen_import_config.py -d edu -t user_chapter_process
python3 ~/bin/gen_import_config.py -d edu -t test_question_option
python3 ~/bin/gen_import_config.py -d edu -t video_info

添加执行权限:

chmod +x ~/bin/gen_import_config.sh

执行脚本:

gen_import_config.sh
[vaultattic@hadoop101 ~]$ gen_import_config.sh
生成配置文件:/opt/datax-202309/job/import/edu.base_category_info.json
生成配置文件:/opt/datax-202309/job/import/edu.base_source.json
生成配置文件:/opt/datax-202309/job/import/edu.base_province.json
生成配置文件:/opt/datax-202309/job/import/edu.base_subject_info.json
生成配置文件:/opt/datax-202309/job/import/edu.cart_info.json
生成配置文件:/opt/datax-202309/job/import/edu.chapter_info.json
生成配置文件:/opt/datax-202309/job/import/edu.course_info.json
生成配置文件:/opt/datax-202309/job/import/edu.knowledge_point.json
生成配置文件:/opt/datax-202309/job/import/edu.test_paper.json
生成配置文件:/opt/datax-202309/job/import/edu.test_paper_question.json
生成配置文件:/opt/datax-202309/job/import/edu.test_point_question.json
生成配置文件:/opt/datax-202309/job/import/edu.test_question_info.json
生成配置文件:/opt/datax-202309/job/import/edu.user_chapter_process.json
生成配置文件:/opt/datax-202309/job/import/edu.test_question_option.json
生成配置文件:/opt/datax-202309/job/import/edu.video_info.json
[vaultattic@hadoop101 ~]$

查看生成结果:

ll /opt/datax-202309/job/import/
[vaultattic@hadoop101 ~]$ ll /opt/datax-202309/job/import/
总计 60
-rw-r--r--. 1 vaultattic vaultattic 1618  6月10日 13:45 edu.base_category_info.json
-rw-r--r--. 1 vaultattic vaultattic 1608  6月10日 13:45 edu.base_province.json
-rw-r--r--. 1 vaultattic vaultattic 1604  6月10日 13:45 edu.base_source.json
-rw-r--r--. 1 vaultattic vaultattic 1616  6月10日 13:45 edu.base_subject_info.json
-rw-r--r--. 1 vaultattic vaultattic 1600  6月10日 13:45 edu.cart_info.json
-rw-r--r--. 1 vaultattic vaultattic 1606  6月10日 13:45 edu.chapter_info.json
-rw-r--r--. 1 vaultattic vaultattic 1604  6月10日 13:45 edu.course_info.json
-rw-r--r--. 1 vaultattic vaultattic 1612  6月10日 13:45 edu.knowledge_point.json
-rw-r--r--. 1 vaultattic vaultattic 1602  6月10日 13:45 edu.test_paper.json
-rw-r--r--. 1 vaultattic vaultattic 1620  6月10日 13:45 edu.test_paper_question.json
-rw-r--r--. 1 vaultattic vaultattic 1620  6月10日 13:45 edu.test_point_question.json
-rw-r--r--. 1 vaultattic vaultattic 1618  6月10日 13:45 edu.test_question_info.json
-rw-r--r--. 1 vaultattic vaultattic 1622  6月10日 13:45 edu.test_question_option.json
-rw-r--r--. 1 vaultattic vaultattic 1622  6月10日 13:45 edu.user_chapter_process.json
-rw-r--r--. 1 vaultattic vaultattic 1602  6月10日 13:45 edu.video_info.json
[vaultattic@hadoop101 ~]$

base_province 表为例,测试生成的 DataX 配置文件是否可用。

DataX 的 HDFSWriter 要求目标路径提前存在,因此需要先用以下命令创建目录:

hadoop fs -mkdir -p /origin_data/edu/db/base_province_full/2022-02-21

执行 DataX 同步任务:

python /opt/datax-202309/bin/datax.py \
-p"-Dtargetdir=/origin_data/edu/db/base_province_full/2022-02-21" \
/opt/datax-202309/job/import/edu.base_province.json
[vaultattic@hadoop101 ~]$ python /opt/datax-202309/bin/datax.py \
-p"-Dtargetdir=/origin_data/edu/db/base_province_full/2022-02-21" \
/opt/datax-202309/job/import/edu.base_province.json

......

2026-06-10 15:33:46.216 [job-0] INFO  JobContainer -
任务启动时刻                    : 2026-06-10 15:33:33
任务结束时刻                    : 2026-06-10 15:33:46
任务总计耗时                    :                 13s
任务平均流量                    :               70B/s
记录写入速度                    :              3rec/s
读出记录总数                    :                  34
读写失败总数                    :                   0

[vaultattic@hadoop101 ~]$

查看同步结果:

hadoop fs -ls /origin_data/edu/db/base_province_full/2022-02-21
[vaultattic@hadoop101 ~]$ hadoop fs -ls /origin_data/edu/db/base_province_full/2022-02-21
Found 1 items
-rw-r--r--   3 vaultattic supergroup        563 2026-06-10 15:33 /origin_data/edu/db/base_province_full/2022-02-21/base_province__57efe611_b1fa_4d08_9ade_df1b9f7734eb.gz
[vaultattic@hadoop101 ~]$

如果目录下出现 .gz 文件,说明同步成功。

全量表数据同步脚本

为了方便后续使用,可以封装一个全量表同步脚本。

创建 mysql_to_hdfs_full.sh 脚本:

vim ~/bin/mysql_to_hdfs_full.sh
#!/bin/bash

DATAX_HOME=/opt/datax-202309
DATAX_JOB=$DATAX_HOME/job

handle_targetdir() {
    hadoop fs -rm -r -f "$1" >/dev/null 2>&1
    hadoop fs -mkdir -p "$1"
}

import_data() {
    local datax_config=$1
    local target_dir=$2

    handle_targetdir "$target_dir"

    echo "========== 开始同步:$datax_config =========="
    echo "目标路径:$target_dir"

    python "$DATAX_HOME/bin/datax.py" \
    -p"-Dtargetdir=$target_dir" \
    "$datax_config" >/tmp/datax_run.log 2>&1

    if [ $? -ne 0 ]; then
        echo "同步失败,日志如下:"
        cat /tmp/datax_run.log
        rm -f /tmp/datax_run.log
        exit 1
    else
        echo "同步成功:$target_dir"
    fi

    rm -f /tmp/datax_run.log
}

if [ $# -ne 2 ]; then
    echo "用法:mysql_to_hdfs_full.sh {表名|all} 日期"
    echo "示例:mysql_to_hdfs_full.sh base_province 2022-02-21"
    echo "示例:mysql_to_hdfs_full.sh all 2022-02-21"
    exit 1
fi

tab=$1
do_date=$2

case $tab in
base_category_info|base_source|base_province|base_subject_info|cart_info|chapter_info|course_info|knowledge_point|test_paper|test_paper_question|test_point_question|test_question_info|user_chapter_process|test_question_option|video_info)
    import_data "$DATAX_JOB/import/edu.${tab}.json" "/origin_data/edu/db/${tab}_full/$do_date"
;;
all)
    for tmp in base_category_info base_source base_province base_subject_info cart_info chapter_info course_info knowledge_point test_paper test_paper_question test_point_question test_question_info user_chapter_process test_question_option video_info
    do
        import_data "$DATAX_JOB/import/edu.${tmp}.json" "/origin_data/edu/db/${tmp}_full/$do_date"
    done
;;
*)
    echo "参数错误:$tab"
    echo "用法:mysql_to_hdfs_full.sh {表名|all} 日期"
    echo "示例:mysql_to_hdfs_full.sh base_province 2022-02-21"
    echo "示例:mysql_to_hdfs_full.sh all 2022-02-21"
    exit 1
;;
esac

添加执行权限:

chmod +x ~/bin/mysql_to_hdfs_full.sh

用法:

mysql_to_hdfs_full.sh {表名|all} 日期

同步所有表查看脚本是否可用:

mysql_to_hdfs_full.sh all 2022-02-21
[vaultattic@hadoop101 ~]$ mysql_to_hdfs_full.sh all 2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.base_category_info.json ==========
目标路径:/origin_data/edu/db/base_category_info_full/2022-02-21
同步成功:/origin_data/edu/db/base_category_info_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.base_source.json ==========
目标路径:/origin_data/edu/db/base_source_full/2022-02-21
同步成功:/origin_data/edu/db/base_source_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.base_province.json ==========
目标路径:/origin_data/edu/db/base_province_full/2022-02-21
同步成功:/origin_data/edu/db/base_province_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.base_subject_info.json ==========
目标路径:/origin_data/edu/db/base_subject_info_full/2022-02-21
同步成功:/origin_data/edu/db/base_subject_info_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.cart_info.json ==========
目标路径:/origin_data/edu/db/cart_info_full/2022-02-21
同步成功:/origin_data/edu/db/cart_info_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.chapter_info.json ==========
目标路径:/origin_data/edu/db/chapter_info_full/2022-02-21
同步成功:/origin_data/edu/db/chapter_info_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.course_info.json ==========
目标路径:/origin_data/edu/db/course_info_full/2022-02-21
同步成功:/origin_data/edu/db/course_info_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.knowledge_point.json ==========
目标路径:/origin_data/edu/db/knowledge_point_full/2022-02-21
同步成功:/origin_data/edu/db/knowledge_point_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.test_paper.json ==========
目标路径:/origin_data/edu/db/test_paper_full/2022-02-21
同步成功:/origin_data/edu/db/test_paper_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.test_paper_question.json ==========
目标路径:/origin_data/edu/db/test_paper_question_full/2022-02-21
同步成功:/origin_data/edu/db/test_paper_question_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.test_point_question.json ==========
目标路径:/origin_data/edu/db/test_point_question_full/2022-02-21
同步成功:/origin_data/edu/db/test_point_question_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.test_question_info.json ==========
目标路径:/origin_data/edu/db/test_question_info_full/2022-02-21
同步成功:/origin_data/edu/db/test_question_info_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.user_chapter_process.json ==========
目标路径:/origin_data/edu/db/user_chapter_process_full/2022-02-21
同步成功:/origin_data/edu/db/user_chapter_process_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.test_question_option.json ==========
目标路径:/origin_data/edu/db/test_question_option_full/2022-02-21
同步成功:/origin_data/edu/db/test_question_option_full/2022-02-21
========== 开始同步:/opt/datax-202309/job/import/edu.video_info.json ==========
目标路径:/origin_data/edu/db/video_info_full/2022-02-21
同步成功:/origin_data/edu/db/video_info_full/2022-02-21
[vaultattic@hadoop101 ~]$

检查全量表同步结果:

hadoop fs -ls /origin_data/edu/db
[vaultattic@hadoop101 ~]$ hadoop fs -ls /origin_data/edu/db
Found 15 items
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:50 /origin_data/edu/db/base_category_info_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:50 /origin_data/edu/db/base_province_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:50 /origin_data/edu/db/base_source_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:50 /origin_data/edu/db/base_subject_info_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:51 /origin_data/edu/db/cart_info_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:51 /origin_data/edu/db/chapter_info_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:51 /origin_data/edu/db/course_info_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:51 /origin_data/edu/db/knowledge_point_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:52 /origin_data/edu/db/test_paper_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:52 /origin_data/edu/db/test_paper_question_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:52 /origin_data/edu/db/test_point_question_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:52 /origin_data/edu/db/test_question_info_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:53 /origin_data/edu/db/test_question_option_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:53 /origin_data/edu/db/user_chapter_process_full
drwxr-xr-x   - vaultattic supergroup          0 2026-06-10 15:53 /origin_data/edu/db/video_info_full
[vaultattic@hadoop101 ~]$

7、增量表数据同步

同步目标

增量表是指业务系统中会持续发生新增、修改、删除的数据表。

这里使用 Maxwell 监听 MySQL 的 binlog,将业务表的变更数据实时写入 Kafka,再由 Flume 消费 Kafka 数据并写入 HDFS。

增量同步的目标路径格式如下:/origin_data/edu/db/表名_inc/日期

本项目中,增量表同步链路如下:

flowchart LR A[MySQL edu binlog] --> B[Maxwell] B --> C[Kafka topic_db] C --> D[Flume Kafka Source] D --> E[Flume HDFS Sink] E --> F[HDFS 原始数据层] F --> G[Hive ODS 层]
  • MySQL 开启 binlog;
  • Maxwell 监听 MySQL binlog;
  • Maxwell 将数据变更写入 Kafka 的 topic_db
  • Flume 从 Kafka 读取数据;
  • Flume 自定义拦截器提取 tablets
  • Flume 写入 HDFS 的对应增量目录。

创建 Kafka 业务数据 Topic

创建一个名为 topic_db 的 Topic:

kafka-topics.sh \
--bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 \
--create \
--topic topic_db \
--partitions 3 \
--replication-factor 3

查看 Topic 信息:

kafka-topics.sh \
--bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 \
--describe \
--topic topic_db
[vaultattic@hadoop101 ~]$ kafka-topics.sh \
--bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 \
--describe \
--topic topic_db
Topic: topic_db TopicId: 74LB_CQnS_G8t9ngr1ZWlQ PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: topic_db Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3      Elr: N/A        LastKnownElr: N/A
        Topic: topic_db Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1      Elr: N/A        LastKnownElr: N/A
        Topic: topic_db Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2      Elr: N/A        LastKnownElr: N/A
[vaultattic@hadoop101 ~]$

Maxwell 通道测试

Maxwell 配置文件就用之前安装时候就改好了,现在使用 Maxwell 启动脚本启动测试一下。

[vaultattic@hadoop101 ~]$ maxwell-start.sh
========== 启动 hadoop101 Maxwell ==========
Maxwell 启动成功,PID=88153
[vaultattic@hadoop101 ~]$

再新建一个终端,启动一个 Kafka 消费者消费 kafka_db 用来查看输出。

kafka-console-consumer.sh \
--bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 \
--topic topic_db

再新建一个终端,使用 mock.sh 生成一天的模拟数据:

mock.sh 2022-02-21

这里我卡了好久不知道是什么问题,Maxwell 一直捕获不到 MySQL 的更新,重启后将教程里面的 /etc/my.cnf 文件配置中的 server-id=1 字段改为了 server_id=1 就跑通了,不确定是环境出了一点问题还是字段写错了的问题,没有精力复现了😥。

如果刚刚运行的 Kafka 消费者终端输出 JSON 内容就说明跑通了。

Flume 配置

业务数据 Flume 运行在 hadoop103,负责从 Kafka 消费 topic_db 数据并写入 HDFS。

业务增量数据链路如下:

flowchart LR A["MySQL binlog"] --> B["Maxwell"] B --> C["Kafka topic_db"] C --> D["Kafka Source"] D --> E["TimestampAndTableNameInterceptor"] E --> F["File Channel"] F --> G["HDFS Sink"] G --> H["/origin_data/edu/db/%{tableName}_inc/%Y-%m-%d"]

Flume 需要从 Maxwell JSON 中获取:

  • table 字段:用于生成表名目录
  • ts 字段:用于生成日期目录

HDFS 路径格式为:

/origin_data/edu/db/%{tableName}_inc/%Y-%m-%d
  • %{tableName} 来自 Flume Header 中的 tableName
  • %Y-%m-%d 来自 Flume Header 中的 timestamp

原教程中使用的是魔改版的 Maxwell,而原版 Maxwell 输出 JSON 中的 ts 字段表示数据实际发生变更的时间。因此,Flume 写入 HDFS 时生成的日期目录,不是模拟数据中的业务日期,而是当前数据写入 MySQL 的日期。例如执行 mock.sh 2022-02-21 后,HDFS 目录不是 2022-02-21,而是实际执行当天的日期比如 2026-6-12

新增业务数据拦截器:

打开之前的 Maven 项目,在 cn.vaultattic.flume.interceptor 包中新建一个 TimestampAndTableNameInterceptor 类,代码如下:

package cn.vaultattic.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {

    @Override
    public void initialize() {
        // 当前没有需要初始化的资源
    }

    @Override
    public Event intercept(Event event) {
        // 获取 Event Header
        Map<String, String> headers = event.getHeaders();

        // 获取 Event Body
        String body = new String(event.getBody(), StandardCharsets.UTF_8);

        try {
            // 解析 Maxwell 输出的 JSON 字符串
            JSONObject jsonObject = JSONObject.parseObject(body);

            // 获取 Maxwell 输出的 ts 字段
            Long ts = jsonObject.getLong("ts");

            // 获取 Maxwell 输出的 table 字段
            String tableName = jsonObject.getString("table");

            // ts 或 table 为空时,丢弃该条数据
            if (ts == null || tableName == null || tableName.length() == 0) {
                return null;
            }

            // Maxwell 的 ts 是秒级时间戳,Flume HDFS Sink 需要毫秒级时间戳
            headers.put("timestamp", String.valueOf(ts * 1000));

            // 写入表名,用于 HDFS 路径中的 %{tableName}
            headers.put("tableName", tableName);

            return event;
        } catch (Exception e) {
            // JSON 解析失败时,丢弃该条数据
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        // 使用迭代器删除无效 Event
        Iterator<Event> iterator = events.iterator();

        while (iterator.hasNext()) {
            Event event = iterator.next();

            if (intercept(event) == null) {
                iterator.remove();
            }
        }

        return events;
    }

    @Override
    public void close() {
        // 当前没有需要释放的资源
    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            // 返回当前拦截器实例
            return new TimestampAndTableNameInterceptor();
        }

        @Override
        public void configure(Context context) {
            // 当前没有额外配置项
        }
    }
}

重新打包为 .jar 文件。

3BS0LQ71-14.png

.jar 包上传到 hadoop101 的 ~/ansible/files

使用之前的 distribute_file.yml,将 Jar 包分发到各节点的依赖目录中。

运行 deploy_package.yml:

ansible-playbook distribute_file.yml

业务增量 Flume 使用单独的 File Channel 目录,避免和前文用户行为日志消费链路冲突。

hadoop103 创建目录:

mkdir -p /opt/flume-1.11.0/file_channel_db/checkpoint /opt/flume-1.11.0/file_channel_db/data

hadoop103 创建 Flume 配置文件:

vim /opt/flume-1.11.0/job/kafka-to-hdfs-db.conf
# agent 中包含一个 source、一个 channel 和一个 sink
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 使用 Kafka Source 消费 topic_db
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

# Kafka Broker 地址
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092

# 订阅业务数据 Topic
a1.sources.r1.kafka.topics = topic_db

# 消费者组 ID
a1.sources.r1.kafka.consumer.group.id = flume_db_group

# 每批拉取的最大事件数
a1.sources.r1.batchSize = 5000

# 每批最大等待时间
a1.sources.r1.batchDurationMillis = 2000

# 没有历史 offset 时,从最新位置开始消费
a1.sources.r1.kafka.consumer.auto.offset.reset = latest

# 使用业务数据拦截器提取 ts 和 table 字段
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.vaultattic.flume.interceptor.TimestampAndTableNameInterceptor$Builder

# 使用 File Channel,提高可靠性
a1.channels.c1.type = file

# File Channel checkpoint 目录
a1.channels.c1.checkpointDir = /opt/flume-1.11.0/file_channel_db/checkpoint

# File Channel 数据目录
a1.channels.c1.dataDirs = /opt/flume-1.11.0/file_channel_db/data

# Channel 容量
a1.channels.c1.capacity = 1000000

# 单次事务处理数量
a1.channels.c1.transactionCapacity = 10000

# keep-alive 时间
a1.channels.c1.keep-alive = 6

# 使用 HDFS Sink 写入 HDFS
a1.sinks.k1.type = hdfs

# HDFS 输出路径,按表名和日期分区
a1.sinks.k1.hdfs.path = /origin_data/edu/db/%{tableName}_inc/%Y-%m-%d

# HDFS 文件名前缀
a1.sinks.k1.hdfs.filePrefix = db

# HDFS 文件后缀
a1.sinks.k1.hdfs.fileSuffix = .gz

# 使用 Event Header 中的 timestamp 作为分区时间
a1.sinks.k1.hdfs.useLocalTimeStamp = false

# 不开启时间取整
a1.sinks.k1.hdfs.round = false

# 文件滚动时间,单位秒
a1.sinks.k1.hdfs.rollInterval = 10

# 文件滚动大小,单位字节
a1.sinks.k1.hdfs.rollSize = 134217728

# 文件滚动条数,0 表示不按条数滚动
a1.sinks.k1.hdfs.rollCount = 0

# 每批写入 HDFS 的事件数
a1.sinks.k1.hdfs.batchSize = 5000

# 使用压缩流写入
a1.sinks.k1.hdfs.fileType = CompressedStream

# 使用 gzip 压缩
a1.sinks.k1.hdfs.codeC = gzip

# 写入文本格式
a1.sinks.k1.hdfs.writeFormat = Text

# 空闲文件关闭时间
a1.sinks.k1.hdfs.idleTimeout = 120

# 正在写入文件的临时前缀
a1.sinks.k1.hdfs.inUsePrefix = .

# 正在写入文件的临时后缀
a1.sinks.k1.hdfs.inUseSuffix = .tmp

# 绑定 source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

业务增量 Flume 单独使用一个脚本管理,避免和前面的用户行为日志采集 Flume 混在一起。

hadoop101 创建:

vim ~/bin/flume-db.sh
#!/usr/bin/env bash

set -euo pipefail

FLUME_HOME="/opt/flume-1.11.0"
FLUME_CONF="${FLUME_HOME}/job/kafka-to-hdfs-db.conf"
AGENT_NAME="a1"
LOG_FILE="${FLUME_HOME}/logs/flume-db.out"
PID_FILE="${FLUME_HOME}/logs/flume-db.pid"

HOSTS=(
  "hadoop103"
)

start() {
  for host in "${HOSTS[@]}"; do
    echo "========== 启动 ${host} 业务数据 Flume =========="

    ssh "$host" "
      if [ -f '${PID_FILE}' ]; then
        pid=\$(cat '${PID_FILE}' 2>/dev/null || true)

        if [ -n \"\$pid\" ] && kill -0 \"\$pid\" 2>/dev/null; then
          echo '${host} 业务数据 Flume 已在运行'
          exit 0
        fi
      fi

      mkdir -p '${FLUME_HOME}/logs'

      nohup '${FLUME_HOME}/bin/flume-ng' agent \
        --conf '${FLUME_HOME}/conf' \
        --conf-file '${FLUME_CONF}' \
        --name '${AGENT_NAME}' \
        -Dflume.root.logger=INFO,console \
        > '${LOG_FILE}' 2>&1 < /dev/null &

      echo \$! > '${PID_FILE}'
    "

    sleep 3

    if ssh "$host" "
      pid=\$(cat '${PID_FILE}' 2>/dev/null || true)
      [ -n \"\$pid\" ] && kill -0 \"\$pid\" 2>/dev/null
    "; then
      echo "${host} 业务数据 Flume 启动成功"
    else
      echo "${host} 业务数据 Flume 启动失败,请查看日志:${LOG_FILE}"
    fi
  done
}

stop() {
  for host in "${HOSTS[@]}"; do
    echo "========== 停止 ${host} 业务数据 Flume =========="

    ssh "$host" "
      if [ -f '${PID_FILE}' ]; then
        pid=\$(cat '${PID_FILE}' 2>/dev/null || true)

        if [ -n \"\$pid\" ] && kill -0 \"\$pid\" 2>/dev/null; then
          kill \"\$pid\"
          sleep 2
        fi

        if [ -n \"\$pid\" ] && kill -0 \"\$pid\" 2>/dev/null; then
          kill -9 \"\$pid\" 2>/dev/null || true
        fi

        rm -f '${PID_FILE}'
        echo '${host} 业务数据 Flume 停止完成'
      else
        echo '${host} 业务数据 Flume 未运行'
      fi
    "
  done
}

status() {
  for host in "${HOSTS[@]}"; do
    echo "========== 查看 ${host} 业务数据 Flume =========="

    ssh "$host" "
      if [ -f '${PID_FILE}' ]; then
        pid=\$(cat '${PID_FILE}' 2>/dev/null || true)

        if [ -n \"\$pid\" ] && kill -0 \"\$pid\" 2>/dev/null; then
          echo '${host} 业务数据 Flume 运行中'
        else
          echo '${host} 业务数据 Flume 未运行'
        fi
      else
        echo '${host} 业务数据 Flume 未运行'
      fi
    "
  done
}

case "${1:-}" in
  start)
    start
    ;;
  stop)
    stop
    ;;
  restart)
    stop
    sleep 3
    start
    ;;
  status)
    status
    ;;
  *)
    echo "用法: $0 {start|stop|restart|status}"
    exit 1
    ;;
esac

增加权限:

chmod +x ~/bin/flume-db.sh

启动:

flume-db.sh start

查看状态:

flume-db.sh status

停止:

flume-db.sh stop

重启:

flume-db.sh restart

Flume 通道测试

生成模拟业务数据:

mock.sh 2022-02-21

查看 HDFS 是否写入 Maxwell 生成的 JSON 数据:

hadoop fs -text /origin_data/edu/db/order_info_inc/*/db*.gz | head
[vaultattic@hadoop101 ansible]$ hadoop fs -text /origin_data/edu/db/order_info_inc/*/db*.gz | head
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59384,"commit":true,"data":{"id":21786,"user_id":235,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"437716997615379","trade_body":"MyBatis等1件商品","session_id":"976ff4ab-c6e7-4064-bf6f-f62b13d739f8","province_id":7,"create_time":"2022-02-21 20:02:38","expire_time":"2022-02-21 20:17:38","update_time":null}}
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59412,"commit":true,"data":{"id":21787,"user_id":139,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"844262756327171","trade_body":"RabbitMQ消息中间件等1件商品","session_id":"0d332629-d848-4712-86ac-115011fb18cf","province_id":32,"create_time":"2022-02-21 21:21:26","expire_time":"2022-02-21 21:36:26","update_time":null}}
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59436,"commit":true,"data":{"id":21788,"user_id":340,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"125574159976478","trade_body":"尚硅谷_Linux系统管理教程(Linux运维+云计算学科发布)等1件商品","session_id":"692f56ef-01c7-4ff7-8656-fd93b58d0654","province_id":18,"create_time":"2022-02-21 13:52:35","expire_time":"2022-02-21 14:07:35","update_time":null}}
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59492,"commit":true,"data":{"id":21789,"user_id":212,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"635887645543553","trade_body":"RabbitMQ消息中间件等1件商品","session_id":"8cc553ae-8092-4e8c-9c5f-03c6015e1a8b","province_id":33,"create_time":"2022-02-21 17:17:55","expire_time":"2022-02-21 17:32:55","update_time":null}}
{"database":"edu","table":"order_info","type":"update","ts":1781269716,"xid":59526,"commit":true,"data":{"id":21789,"user_id":212,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1003","out_trade_no":"635887645543553","trade_body":"RabbitMQ消息中间件等1件商品","session_id":"8cc553ae-8092-4e8c-9c5f-03c6015e1a8b","province_id":33,"create_time":"2022-02-21 17:17:55","expire_time":"2022-02-21 17:32:55","update_time":"2022-02-21 17:32:55"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59650,"commit":true,"data":{"id":21791,"user_id":150,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"314556962332941","trade_body":"SpringMVC等1件商品","session_id":"255146a9-03dd-4e92-afe4-5a5d5f68b5ea","province_id":21,"create_time":"2022-02-21 18:43:43","expire_time":"2022-02-21 18:58:43","update_time":null}}
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59647,"commit":true,"data":{"id":21790,"user_id":478,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"656755863954573","trade_body":"Android与H5互调等1件商品","session_id":"227d7721-8243-4a5a-877f-62f2e0f88cf5","province_id":23,"create_time":"2022-02-21 16:12:31","expire_time":"2022-02-21 16:27:31","update_time":null}}
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59695,"commit":true,"data":{"id":21792,"user_id":477,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"658767236614391","trade_body":"尚硅谷大数据技术之HBase等1件商品","session_id":"962dc5db-f30f-4c6d-8373-98c3f3acc811","province_id":19,"create_time":"2022-02-21 14:23:19","expire_time":"2022-02-21 14:38:19","update_time":null}}
{"database":"edu","table":"order_info","type":"update","ts":1781269716,"xid":59671,"commit":true,"data":{"id":21786,"user_id":235,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1002","out_trade_no":"437716997615379","trade_body":"MyBatis等1件商品","session_id":"976ff4ab-c6e7-4064-bf6f-f62b13d739f8","province_id":7,"create_time":"2022-02-21 20:02:38","expire_time":"2022-02-21 20:17:38","update_time":"2022-02-21 20:02:54"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu","table":"order_info","type":"insert","ts":1781269716,"xid":59712,"commit":true,"data":{"id":21793,"user_id":1,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1001","out_trade_no":"913737331653931","trade_body":"ES6教程-涵盖ES6-ES11等1件商品","session_id":"f54047d0-9522-484c-915b-a7ae8074e6c2","province_id":15,"create_time":"2022-02-21 22:57:00","expire_time":"2022-02-21 23:12:00","update_time":null}}
text: Unable to write to output stream.
text: Unable to write to output stream.
[vaultattic@hadoop101 ansible]$

如果输出的是 JSON 数据就说明跑通了。

增量表首日全量同步

Maxwell 是通过监听 MySQL binlog 来采集增量数据的。

因此,Maxwell 启动之后发生的 insertupdatedelete 操作可以被采集到,但如果某些数据在 Maxwell 启动之前就已经存在,这部分历史数据不会自动进入 Kafka。

所以对于增量表,一般需要在首日先进行一次初始化同步,将当前表中的历史数据同步到 Kafka。后续再由 Maxwell 实时监听 binlog,持续采集新增和变更数据。

这个首日初始化过程可以使用 Maxwell 提供的 maxwell-bootstrap 工具完成。

编写首日初始化脚本:

vim ~/bin/mysql_to_kafka_inc_init.sh
#!/bin/bash

export JAVA_HOME=/opt/amazon-corretto-11
export PATH=$JAVA_HOME/bin:$PATH

MAXWELL_HOME=/opt/maxwell-1.44.1

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap \
    --database edu \
    --table "$1" \
    --config $MAXWELL_HOME/config.properties
}

for table in cart_info comment_info favor_info order_detail order_info payment_info review_info test_exam test_exam_question user_info vip_change_detail
do
    echo "========== 初始化增量表:${table} =========="
    import_data "$table"
done

添加执行权限:

chmod +x ~/bin/mysql_to_kafka_inc_init.sh

如果之前测试过增量同步,为了方便观察结果,可以先删除 HDFS 上已有的增量表目录。

删除所有 _inc 目录:

hadoop fs -rm -r -f /origin_data/edu/db/*_inc
[vaultattic@hadoop101 ansible]$ hadoop fs -rm -r -f /origin_data/edu/db/*_inc
Deleted /origin_data/edu/db/cart_info_inc
Deleted /origin_data/edu/db/comment_info_inc
Deleted /origin_data/edu/db/favor_info_inc
Deleted /origin_data/edu/db/order_detail_inc
Deleted /origin_data/edu/db/order_info_inc
Deleted /origin_data/edu/db/payment_info_inc
Deleted /origin_data/edu/db/review_info_inc
Deleted /origin_data/edu/db/test_exam_inc
Deleted /origin_data/edu/db/test_exam_question_inc
Deleted /origin_data/edu/db/user_chapter_process_inc
Deleted /origin_data/edu/db/user_info_inc
Deleted /origin_data/edu/db/vip_change_detail_inc
[vaultattic@hadoop101 ansible]$

再运行全量同步脚本:

mysql_to_kafka_inc_init.sh
[vaultattic@hadoop101 ansible]$ mysql_to_kafka_inc_init.sh
========== 初始化增量表:cart_info ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:comment_info ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:favor_info ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:order_detail ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:order_info ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:payment_info ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:review_info ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:test_exam ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:test_exam_question ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:user_info ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
========== 初始化增量表:vip_change_detail ==========
connecting to jdbc:mysql://hadoop101:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
[vaultattic@hadoop101 ansible]$

检查同步结果:

hadoop fs -ls /origin_data/edu/db
[vaultattic@hadoop101 ansible]$ hadoop fs -ls /origin_data/edu/db | grep _inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/cart_info_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/comment_info_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/favor_info_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/order_detail_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/order_info_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/payment_info_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/review_info_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/test_exam_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/test_exam_question_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/user_info_inc
drwxr-xr-x   - vaultattic supergroup          0 2026-06-12 21:34 /origin_data/edu/db/vip_change_detail_inc
[vaultattic@hadoop101 ansible]$

三、数仓环境准备

1、安装 Hive

Hive 是基于 Hadoop 的数据仓库工具,可以将 HDFS 上的结构化数据映射成表,并使用类 SQL 的方式进行查询和分析。

在离线数仓中,Hive 主要用于:

  • 管理 HDFS 上的数据文件;
  • 建立 ODS、DIM、DWD、DWS、ADS 等数仓分层表;
  • 使用 SQL 完成离线数据清洗、转换和统计分析。

Hive 本身不负责存储数据,真实数据仍然存储在 HDFS 中。Hive 主要维护的是表结构、字段、分区、文件位置等元数据信息。

我以前有写过一个详细的安装步骤,感兴趣可以看看:Hive 环境搭建(基于 Hadoop + MySQL) - 滕王阁,所以一些配置的来源就不写了。

软件包下载

这里选择 Hive 3.1.3 版本。虽然当前 Hive 最新版为 4.0+,但还在更新中,所以选择较为稳定的 3.0+ 的最后这个版本。

Hive 官方下载链接:https://archive.apache.org/dist/hive/hive-3.1.3/

百度网盘链接:https://pan.baidu.com/s/1O3Skv-6OwtMnBt3Gm_5vqQ?pwd=kapr

下载好后将软件压缩包使用 MobeXterm 上传到 hadoop101 的 ~/ansible/files 目录下。

和前面安装软件的过程一样,复制文件名,将之前写好的 deploy_package.yml 中的变量进行替换,注意 hosts 改为 hadoop101,这个软件不需要所以集群都安装。

替换后的 deploy_package.yml:

- name: Deploy package
  hosts: hadoop101

  vars:
    pkg_name: apache-hive-3.1.3-bin.tar.gz
    dest_dir: hive-3.1.3

  tasks:
    - name: Copy archive
      copy:
        src: "./files/{{ pkg_name }}"
        dest: "/opt/{{ pkg_name }}"

    - name: Get top directory name from archive
      shell: tar -tf /opt/{{ pkg_name }} | head -1 | cut -d/ -f1
      register: archive_dir

    - name: Extract archive
      command: tar -xzf /opt/{{ pkg_name }} -C /opt

    - name: Rename directory
      command: mv /opt/{{ archive_dir.stdout | trim }} /opt/{{ dest_dir }}
      args:
        creates: /opt/{{ dest_dir }}

    - name: Change ownership
      file:
        path: "/opt/{{ dest_dir }}"
        owner: vaultattic
        group: vaultattic
        recurse: yes

    - name: Remove archive
      file:
        path: "/opt/{{ pkg_name }}"
        state: absent

运行 deploy_package.yml:

ansible-playbook deploy_package.yml

执行完后可以在各节点的 /opt 目录下查看 Hive 目录是否存在。

cluster-run.sh ls /opt
[vaultattic@hadoop101 ansible]$ cluster-run.sh ls /opt
--------- hadoop101 ----------
amazon-corretto-11
amazon-corretto-8
data_mocker
datax-202309
flume-1.11.0
hadoop-3.3.6
hive-3.1.3
kafka-3.9.2
maxwell-1.44.1
zookeeper-3.8.6
--------- hadoop102 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
--------- hadoop103 ----------
amazon-corretto-8
flume-1.11.0
hadoop-3.3.6
kafka-3.9.2
zookeeper-3.8.6
[vaultattic@hadoop101 ansible]$

使用 set_env.yml 配置 HIVE_HOME 环境变量。

复制要配置环境变量的目录路径,将 set_env.yml 中的变量进行替换:

---
- name: Configure environment variables
  hosts: hadoop101

  vars:
    env_name: HIVE_HOME
    env_path: /opt/hive-3.1.3

  tasks:

    - name: Add environment variables to /home/vaultattic/.bashrc
      lineinfile:
        path: "/home/vaultattic/.bashrc"
        line: "export {{ env_name }}={{ env_path }}"
        create: yes

    - name: Add PATH for {{ env_name }}
      lineinfile:
        path: "/home/vaultattic/.bashrc"
        line: "export PATH=${{ env_name }}/bin:$PATH"

运行 set_env.yml:

ansible-playbook set_env.yml

执行 playbook 后使用:

source ~/.bashrc
hive --version
[vaultattic@hadoop101 ansible]$ source ~/.bashrc
[vaultattic@hadoop101 ansible]$ hive --version
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hive-3.1.3/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.3.6/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive 3.1.3
Git git://MacBook-Pro.fios-router.home/Users/ngangam/commit/hive -r 4df4d75bf1e16fe0af75aad0b4179c34c07fc975
Compiled by ngangam on Sun Apr 3 16:58:16 EDT 2022
From source with checksum 5da234766db5dfbe3e92926c9bbab2af
[vaultattic@hadoop101 ansible]$

解决依赖冲突

Hive 3.1.2 和 Hadoop 3.3.6 搭配时,可能会遇到部分 jar 包版本冲突问题,需要提前处理。

Hive 自带的 guava 版本较老,而 Hadoop 3.3.6 使用的 guava 版本较新。为了避免运行时报 NoSuchMethodError 等问题,可以将 Hive 中的 guava 替换成 Hadoop 中的版本。

进入 Hive lib 目录:

cd $HIVE_HOME/lib

备份 Hive 自带 guava:

mv guava-19.0.jar guava-19.0.jar.bak

Hive 和 Hadoop 中都可能包含 SLF4J / Log4j 相关 jar 包,启动时可能出现日志绑定冲突。

进入 Hive lib 目录:

cd $HIVE_HOME/lib

备份 Hive 中的 log4j-slf4j-impl

mv log4j-slf4j-impl-2.17.1.jar log4j-slf4j-impl-2.17.1.jar.bak

2、Hive 元数据配置到 MySQL

Hive 的元数据默认可以存储在内置 Derby 数据库中,但 Derby 只适合本地测试,不适合多客户端同时访问。

在实际项目中,通常将 Hive 元数据存储到 MySQL 中。这样 Hive 客户端、HiveServer2、调度任务等都可以共享同一套元数据。

安装 MySQL JDBC 驱动

Hive 需要通过 JDBC 连接 MySQL,因此需要将 MySQL Connector/J 驱动放到 Hive 的 lib 目录下。

MySQL Connector/J 官网下载地址:https://downloads.mysql.com/archives/c-j/,直接选最新版下载😡。

3BS0LQ71-15.png

下载后解压,把里面的 JAR 文件上传到 /opt/hive-3.1.3/lib 目录中(或者我把这个 JAR 文件直接上传百度网盘了:https://pan.baidu.com/s/1aUpV1L4LlGpovVRQ8jSD1A?pwd=lj0f)。

3BS0LQ71-16.png

创建 Hive 元数据库

修改 Hive 配置文件

vim $HIVE_HOME/conf/hive-site.xml
<?xml version="1.0" encoding="UTF-8"?>

<configuration>

    <!-- Hive 元数据库连接地址 -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop101:3306/metastore?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true</value>
    </property>

    <!-- MySQL 8 使用的 JDBC 驱动类 -->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.cj.jdbc.Driver</value>
    </property>

    <!-- MySQL 用户名 -->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <!-- MySQL 密码 -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>

    <!-- Hive 默认仓库目录 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>

    <!-- 关闭元数据库版本校验 -->
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>

    <!-- HiveServer2 端口 -->
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>

    <!-- HiveServer2 绑定地址 -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>0.0.0.0</value>
    </property>

    <!-- 关闭 metastore 事件通知权限校验 -->
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
    </property>

    <!-- Hive CLI 显示查询结果表头 -->
    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>

    <!-- Hive CLI 显示当前数据库名 -->
    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>

    <!-- 缩短 HiveServer2 启动等待时间 -->
    <property>
        <name>hive.server2.active.passive.ha.enable</name>
        <value>true</value>
    </property>

    <!-- 让部分客户端可以正确读取 JsonSerDe 表结构 -->
    <property>
        <name>metastore.storage.schema.reader.impl</name>
        <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader</value>
    </property>

</configuration>

创建 Hive 元数据库

登录 MySQL:

mysql -uroot -p123456

创建 Hive 元数据库:

CREATE DATABASE IF NOT EXISTS metastore
DEFAULT CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci;
[vaultattic@hadoop101 ~]$ mysql -uroot -p123456
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 405
Server version: 8.4.8 MySQL Community Server - GPL

Copyright (c) 2000, 2026, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> CREATE DATABASE IF NOT EXISTS metastore
    -> DEFAULT CHARACTER SET utf8mb4
    -> COLLATE utf8mb4_general_ci;
Query OK, 1 row affected (0.01 sec)

mysql>

3、初始化 Hive 元数据库

初始化 metastore

执行初始化命令:

schematool -initSchema -dbType mysql -verbose

出现 schemaTool completed 说明初始化成功。

创建 HDFS 仓库目录

创建 Hive 仓库目录:

hadoop fs -mkdir -p /user/hive/warehouse

设置权限:

hadoop fs -chmod -R 777 /user/hive/warehouse

4、启动 Hive

Hive 可以通过 CLI 直接使用,也可以启动 HiveServer2,供 DataGrip、DBeaver、Beeline 等客户端连接。

启动 Hive CLI

直接执行:

hive

进入 Hive 后查看数据库:

show databases;

正常情况下可以看到:

default

退出 Hive:

quit;

启动 HiveServer2

如果需要通过 JDBC 客户端连接 Hive,可以启动 HiveServer2。

启动 HiveServer2:

nohup hiveserver2 > $HIVE_HOME/logs/hiveserver2.log 2>&1 &

如果 logs 目录不存在,先创建:

mkdir -p $HIVE_HOME/logs

查看进程:

jps

如果看到 RunJar 说明 HiveServer2 已经启动。

查看日志:

tail -f $HIVE_HOME/logs/hiveserver2.log

使用 Beeline 连接 HiveServer2

连接 HiveServer2:

beeline -u jdbc:hive2://hadoop101:10000 -n vaultattic

连接成功后执行:

show databases;

如果能正常返回数据库列表,说明 HiveServer2 可用。

四、后记

后面还有两章,没时间写了,每次都这样😓。

上课只上了第三章的一小半,主要是现在星期六了,星期一就检查成果了,我现在才刚刚把这个第二章搞完。

网络安全还有 3 篇实验报告要写,而且要考试,虽然是开卷但还没开始预习🥲。

早知道,当初就不该下载杀戮尖塔 2😵‍💫,谁能告诉我时间都去哪了😶‍🌫️。

中转站的 Codex 也被奥特曼制裁了,没有 AI 我现在感觉什么也做不了😭。


评论