亚洲精品中文免费|亚洲日韩中文字幕制服|久久精品亚洲免费|一本之道久久免费

      
      

            <dl id="hur0q"><div id="hur0q"></div></dl>

                Apache Doris 整合 Iceberg + Flink CDC 構(gòu)建實時湖倉一體的聯(lián)邦查詢

                Apache Doris 整合 Iceberg + Flink CDC 構(gòu)建實時湖倉一體的聯(lián)邦查詢

                導讀:這是一篇非常完整全面的應用技術(shù)干貨,手把手教你如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實時湖倉一體的聯(lián)邦查詢分析架構(gòu)。按照本文中步驟一步步完成,完整體驗搭建操作的完整過程。

                作者 Apache Doris PMC 成員 張家鋒

                1.概覽

                這篇教程將展示如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實時湖倉一體的聯(lián)邦查詢分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,同時本教程整個環(huán)境是都基于偽分布式環(huán)境搭建,大家按照步驟可以一步步完成。完整體驗整個搭建操作的過程。

                1.1 軟件環(huán)境

                本教程的演示環(huán)境如下:

              1. Centos7
              2. Apahce doris 1.1
              3. Hadoop 3.3.3
              4. hive 3.1.3
              5. Fink 1.14.4
              6. flink-sql-connector-mysql-cdc-2.2.1
              7. Apache Iceberg 0.13.2
              8. JDK 1.8.0_311
              9. MySQL 8.0.29
              10. wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gzwget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gzwget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgzwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

                1.2 系統(tǒng)架構(gòu)

                我們整理架構(gòu)圖如下

              11. 首先我們從Mysql數(shù)據(jù)中使用Flink 通過 Binlog完成數(shù)據(jù)的實時采集
              12. 然后再Flink 中創(chuàng)建 Iceberg 表,Iceberg的元數(shù)據(jù)保存在hive里
              13. 最后我們在Doris中創(chuàng)建Iceberg外表
              14. 在通過Doris 統(tǒng)一查詢?nèi)肟谕瓿蓪ceberg里的數(shù)據(jù)進行查詢分析,供前端應用調(diào)用,這里iceberg外表的數(shù)據(jù)可以和Doris內(nèi)部數(shù)據(jù)或者Doris其他外部數(shù)據(jù)源的數(shù)據(jù)進行關(guān)聯(lián)查詢分析
              15. Doris湖倉一體的聯(lián)邦查詢架構(gòu)如下:

              16. Doris 通過 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer
              17. 同時支持 Elasticsearch 外表
              18. 1.0版本支持Hive外表
              19. 1.1版本支持Iceberg外表
              20. 1.2版本支持Hudi 外表
              21. 2.環(huán)境安裝部署

                2.1 安裝Hadoop、Hive

                tar zxvf hadoop-3.3.3.tar.gztar zxvf apache-hive-3.1.3-bin.tar.gz

                配置系統(tǒng)環(huán)境變量

                export HADOOP_HOME=/data/hadoop-3.3.3export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopexport HADOOP_HDFS_HOME=$HADOOP_HOMEexport HIVE_HOME=/data/hive-3.1.3export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf

                2.2 配置hdfs

                2.2.1 core-site.xml

                vi etc/hadoop/core-site.xml

                fs.defaultFS hdfs://localhost:9000

                2.2.2 hdfs-site.xml

                vi etc/hadoop/hdfs-site.xml

                dfs.replication 1 dfs.namenode.name.dir /data/hdfs/namenode dfs.datanode.data.dir /data/hdfs/datanode

                2.2.3 修改Hadoop啟動腳本

                sbin/start-dfs.sh

                sbin/stop-dfs.sh

                在文件開始加上下面的內(nèi)容

                HDFS_DATANODE_USER=rootHADOOP_SECURE_DN_USER=hdfsHDFS_NAMENODE_USER=rootHDFS_SECONDARYNAMENODE_USER=root

                sbin/start-yarn.sh

                sbin/stop-yarn.sh

                在文件開始加上下面的內(nèi)容

                YARN_RESOURCEMANAGER_USER=rootHADOOP_SECURE_DN_USER=yarnYARN_NODEMANAGER_USER=root

                2.3 配置yarn

                這里我改變了Yarn的一些端口,因為我是單機環(huán)境和Doris 的一些端口沖突。你可以不啟動yarn

                vi etc/hadoop/yarn-site.xml

                yarn.resourcemanager.address jiafeng-test:50056 yarn.resourcemanager.scheduler.address jiafeng-test:50057 yarn.resourcemanager.resource-tracker.address jiafeng-test:50058 yarn.resourcemanager.admin.address jiafeng-test:50059 yarn.resourcemanager.webapp.address jiafeng-test:9090 yarn.nodemanager.localizer.address 0.0.0.0:50060 yarn.nodemanager.webapp.address 0.0.0.0:50062

                vi etc/hadoop/mapred-site.xm

                mapreduce.jobhistory.address 0.0.0.0:10020 mapreduce.jobhistory.webapp.address 0.0.0.0:19888 mapreduce.shuffle.port 50061

                2.2.4 啟動hadoop

                sbin/start-all.sh

                2.4 配置Hive

                2.4.1 創(chuàng)建hdfs目錄

                hdfs dfs -mkdir -p /user/hive/warehousehdfs dfs -mkdir /tmphdfs dfs -chmod g+w /user/hive/warehousehdfs dfs -chmod g+w /tmp

                2.4.2 配置hive-site.xml

                javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName root javax.jdo.option.ConnectionPassword MyNewPass4! hive.metastore.warehouse.dir /user/hive/warehouse location of default database for the warehouse hive.metastore.uris Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. javax.jdo.PersistenceManagerFactoryClass org.datanucleus.api.jdo.JDOPersistenceManagerFactory hive.metastore.schema.verification false datanucleus.schema.autoCreateAll true

                2.4.3 配置 hive-env.sh

                加入以下內(nèi)容

                HADOOP_HOME=/data/hadoop-3.3.3

                2.4.4 hive元數(shù)據(jù)初始化

                schematool -initSchema -dbType mysql

                2.4.5 啟動hive metaservice

                后臺運行

                nohup bin/hive –service metaservice 1>/dev/null 2>&1 &

                驗證

                lsof -i:9083COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAMEjava 20700 root 567u IPv6 54605348 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)

                2.5 安裝MySQL

                具體請參照這里:

                使用 Flink CDC 實現(xiàn) MySQL 數(shù)據(jù)實時入 Apache Doris

                2.5.1 創(chuàng)建MySQL數(shù)據(jù)庫表并初始化數(shù)據(jù)

                CREATE DATABASE demo;USE demo;CREATE TABLE userinfo ( id int NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL DEFAULT ‘flink’, address VARCHAR(1024), phone_number VARCHAR(512), email VARCHAR(255), PRIMARY KEY (`id`))ENGINE=InnoDB ;INSERT INTO userinfo VALUES (10001,’user_110′,’Shanghai’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10002,’user_111′,’xian’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10003,’user_112′,’beijing’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10004,’user_113′,’shenzheng’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10005,’user_114′,’hangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10006,’user_115′,’guizhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10007,’user_116′,’chengdu’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10008,’user_117′,’guangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10009,’user_118′,’xian’,’13347420870′, NULL);

                2.6 安裝 Flink

                tar zxvf flink-1.14.4-bin-scala_2.12.tgz

                然后需要將下面的依賴拷貝到Flink安裝目錄下的lib目錄下,具體的依賴的lib文件如下:

                下面將幾個Hadoop和Flink里沒有的依賴下載地址放在下面

                wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jarwget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jarwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

                其他的:

                hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jarhadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jarhadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jarhadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jaradoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jarhadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jarhive-3.1.3/lib/hive-exec-3.1.3.jarhive-3.1.3/lib/hive-metastore-3.1.3.jarhive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar

                2.6.1 啟動Flink

                bin/start-cluster.sh

                啟動后的界面如下:

                2.6.2 進入 Flink SQL Client

                bin/sql-client.sh embedded

                開啟 checkpoint,每隔3秒做一次 checkpoint

                Checkpoint 默認是不開啟的,我們需要開啟 Checkpoint 來讓 Iceberg 可以提交事務。 并且,mysql-cdc 在 binlog 讀取階段開始前,需要等待一個完整的 checkpoint 來避免 binlog 記錄亂序的情況。

                注意:

                這里是演示環(huán)境,checkpoint的間隔設置比較短,線上使用,建議設置為3-5分鐘一次checkpoint。

                Flink SQL> SET execution.checkpointing.interval = 3s;[INFO] Session property has been set.

                2.6.3 創(chuàng)建Iceberg Catalog

                CREATE CATALOG hive_catalog WITH ( ‘type’=’iceberg’, ‘catalog-type’=’hive’, ‘uri’=’thrift://localhost:9083’, ‘clients’=’5’, ‘property-version’=’1’, ‘warehouse’=’hdfs://localhost:8020/user/hive/warehouse’);

                查看catalog

                Flink SQL> show catalogs;+—————–+| catalog name |+—————–+| default_catalog || hive_catalog |+—————–+2 rows in set

                2.6.4 創(chuàng)建 Mysql CDC 表

                CREATE TABLE user_source ( database_name STRING METADATA VIRTUAL, table_name STRING METADATA VIRTUAL, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( ‘connector’ = ‘mysql-cdc’, ‘hostname’ = ‘localhost’, ‘port’ = ‘3306’, ‘username’ = ‘root’, ‘password’ = ‘MyNewPass4!’, ‘database-name’ = ‘demo’, ‘table-name’ = ‘userinfo’ );

                查詢CDC表:

                select * from user_source;

                2.6.5 創(chuàng)建Iceberg表

                —查看catalogshow catalogs;—使用cataloguse catalog hive_catalog;–創(chuàng)建數(shù)據(jù)庫CREATE DATABASE iceberg_hive; –使用數(shù)據(jù)庫use iceberg_hive;

                2.6.5.1 創(chuàng)建表

                CREATE TABLE all_users_info ( database_name STRING, table_name STRING, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED ) WITH ( ‘catalog-type’=’hive’ );

                從CDC表里插入數(shù)據(jù)到Iceberg表里

                use catalog default_catalog; insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

                在web界面可以看到任務的運行情況

                然后停掉任務,我們?nèi)ゲ樵僫ceberg表

                select * from hive_catalog.iceberg_hive.all_users_info

                可以看到下面的結(jié)果

                我們?nèi)dfs上可以看到hive目錄下的數(shù)據(jù)及對應的元數(shù)據(jù)

                我們也可以通過Hive建好Iceberg表,然后通過Flink將數(shù)據(jù)插入到表里

                下載Iceberg Hive運行依賴

                wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar

                在hive shell下執(zhí)行:

                SET engine.hive.enabled=true; SET iceberg.engine.hive.enabled=true; SET iceberg.mr.catalog=hive; add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

                創(chuàng)建表

                CREATE EXTERNAL TABLE iceberg_hive( `id` int, `name` string)STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’TBLPROPERTIES ( ‘iceberg.mr.catalog’=’hadoop’, ‘iceberg.mr.catalog.hadoop.warehouse.location’=’hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’ );

                然后再Flink SQL Client下執(zhí)行下面語句將數(shù)據(jù)插入到Iceber表里

                INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, ‘c’);INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, ‘zhangfeng’);

                查詢這個表

                select * from hive_catalog.iceberg_hive.iceberg_hive

                可以看到下面的結(jié)果

                3. Doris 查詢 Iceberg

                Apache Doris 提供了 Doris 直接訪問 Iceberg 外部表的能力,外部表省去了繁瑣的數(shù)據(jù)導入工作,并借助 Doris 本身的 OLAP 的能力來解決 Iceberg 表的數(shù)據(jù)分析問題:

              22. 支持 Iceberg 數(shù)據(jù)源接入Doris
              23. 支持 Doris 與 Iceberg 數(shù)據(jù)源中的表聯(lián)合查詢,進行更加復雜的分析操作
              24. 3.1安裝Doris

                這里我們不在詳細講解Doris的安裝,如果你不知道怎么安裝Doris請參照官方文檔:快速入門

                3.2 創(chuàng)建Iceberg外表

                CREATE TABLE `all_users_info` ENGINE = ICEBERGPROPERTIES (“iceberg.database” = “iceberg_hive”,”iceberg.table” = “all_users_info”,”iceberg.hive.metastore.uris” = “thrift://localhost:9083″,”iceberg.catalog.type” = “HIVE_CATALOG”);

                參數(shù)說明:

                • ENGINE 需要指定為 ICEBERG
                • PROPERTIES 屬性:
                  • iceberg.hive.metastore.uris:Hive Metastore 服務地址
                  • iceberg.database:掛載 Iceberg 對應的數(shù)據(jù)庫名
                  • iceberg.table:掛載 Iceberg 對應的表名,掛載 Iceberg database 時無需指定。
                  • iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默認為 HIVE_CATALOG,當前僅支持該方式,后續(xù)會支持更多的 Iceberg catalog 接入方式。

                mysql> CREATE TABLE `all_users_info` -> ENGINE = ICEBERG -> PROPERTIES ( -> “iceberg.database” = “iceberg_hive”, -> “iceberg.table” = “all_users_info”, -> “iceberg.hive.metastore.uris” = “thrift://localhost:9083”, -> “iceberg.catalog.type” = “HIVE_CATALOG” -> );Query OK, 0 rows affected (0.23 sec) mysql> select * from all_users_info;+—————+————+——-+———-+———–+————–+——-+| database_name | table_name | id | name | address | phone_number | email |+—————+————+——-+———-+———–+————–+——-+| demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL || demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL || demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL || demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL || demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL || demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL || demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL || demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL || demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |+—————+————+——-+———-+———–+————–+——-+9 rows in set (0.18 sec)

                3.3 同步掛載

                當 Iceberg 表 Schema 發(fā)生變更時,可以通過 REFRESH 命令手動同步,該命令會將 Doris 中的 Iceberg 外表刪除重建。

                — 同步 Iceberg 表REFRESH TABLE t_iceberg; — 同步 Iceberg 數(shù)據(jù)庫REFRESH DATABASE iceberg_test_db;

                3.4 Doris 和 Iceberg 數(shù)據(jù)類型對應關(guān)系

                支持的 Iceberg 列類型與 Doris 對應關(guān)系如下表:

                ICEBERG

                DORIS

                描述

                BOOLEAN

                BOOLEAN

                INTEGER

                INT

                LONG

                BIGINT

                FLOAT

                FLOAT

                DOUBLE

                DOUBLE

                DATE

                DATE

                TIMESTAMP

                DATETIME

                Timestamp 轉(zhuǎn)成 Datetime 會損失精度

                STRING

                STRING

                UUID

                VARCHAR

                使用 VARCHAR 來代替

                DECIMAL

                DECIMAL

                TIME

                不支持

                FIXED

                不支持

                BINARY

                不支持

                STRUCT

                不支持

                LIST

                不支持

                MAP

                不支持

                3.5 注意事項

                • Iceberg 表 Schema 變更不會自動同步,需要在 Doris 中通過 REFRESH 命令同步 Iceberg 外表或數(shù)據(jù)庫。
                • 當前默認支持的 Iceberg 版本為 0.12.0,0.13.x,未在其他版本進行測試。后續(xù)后支持更多版本。

                3.6 Doris FE 配置

                下面幾個配置屬于 Iceberg 外表系統(tǒng)級別的配置,可以通過修改 fe.conf 來配置,也可以通過 ADMIN SET CONFIG 來配置。

                • iceberg_table_creation_strict_mode
                • 創(chuàng)建 Iceberg 表默認開啟 strict mode。 strict mode 是指對 Iceberg 表的列類型進行嚴格過濾,如果有 Doris 目前不支持的數(shù)據(jù)類型,則創(chuàng)建外表失敗。
                • iceberg_table_creation_interval_second
                • 自動創(chuàng)建 Iceberg 表的后臺任務執(zhí)行間隔,默認為 10s。
                • max_iceberg_table_creation_record_size
                • Iceberg 表創(chuàng)建記錄保留的最大值,默認為 2000. 僅針對創(chuàng)建 Iceberg 數(shù)據(jù)庫記錄。

                4. 總結(jié)

                這里Doris On Iceberg我們只演示了Iceberg單表的查詢,你還可以聯(lián)合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等進行聯(lián)合查詢分析,通過Doris對外提供統(tǒng)一的查詢分析入口。

                自此我們完整從搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介紹完了,Doris朝著數(shù)據(jù)倉庫和數(shù)據(jù)融合的架構(gòu)演進,支持湖倉一體的聯(lián)邦查詢,給我們的開發(fā)帶來更多的便利,更高效的開發(fā),省去了很多數(shù)據(jù)同步的繁瑣工作,快快來體驗吧。

                鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
                用戶投稿
                上一篇 2022年6月24日 09:12
                下一篇 2022年6月24日 09:13

                相關(guān)推薦

                聯(lián)系我們

                聯(lián)系郵箱:admin#wlmqw.com
                工作時間:周一至周五,10:30-18:30,節(jié)假日休息