科技改變生活 · 科技引領未來
作者:kyledong
來源:微信公眾號:騰訊云大數(shù)據(jù)
出處:http://mp.weixin.qq.com/s?__biz=MzUzNTc0NTcyMw==&mid=2247485657&idx=1&sn=f6fa360a7c35cb6f03e78222ca8691cb
CDC 變更數(shù)據(jù)捕獲技術(shù) 可以將 源數(shù)據(jù)庫的增量變動記錄,同步到一個或多個數(shù)據(jù)目的。本文基于 騰訊云 O ceanus 提供的 Flink CDC 引擎, 著重介紹 Flink 在變更數(shù)據(jù)捕獲技術(shù)中的應用。
一、CDC 是什么?
CDC 是變更數(shù)據(jù)捕獲(Change Data Capture)技術(shù)的縮寫,它可以將源數(shù)據(jù)庫(Source)的增量變動記錄,同步到一個或多個數(shù)據(jù)目的(Sink)。在同步過程中,還可以對數(shù)據(jù)進行一定的處理,例如分組(GROUP BY)、多表的關(guān)聯(lián)(JOIN)等。
例如對于電商平臺,用戶的訂單會實時寫入到某個源數(shù)據(jù)庫;A 部門需要將每分鐘的實時數(shù)據(jù)簡單聚合處理后保存到 Redis 中以供查詢,B 部門需要將當天的數(shù)據(jù)暫存到 Elasticsearch 一份來做報表展示,C 部門也需要一份數(shù)據(jù)到 ClickHouse 做實時數(shù)倉。隨著時間的推移,后續(xù) D 部門、E 部門也會有數(shù)據(jù)分析的需求,這種場景下,傳統(tǒng)的拷貝分發(fā)多個副本方法很不靈活,而 CDC 可以實現(xiàn)一份變動記錄,實時處理并投遞到多個目的地。
下圖是一個示例,通過騰訊云 Oceanus 提供的 Flink CDC 引擎,可以將某個 MySQL 的數(shù)據(jù)庫表的變動記錄,實時同步到下游的 Redis、Elasticsearch、ClickHouse 等多個接收端。這樣大家可以各自分析自己的數(shù)據(jù)集,互不影響,同時又和上游數(shù)據(jù)保持實時的同步。
二、CDC 的實現(xiàn)原理
通常來講,CDC 分為 主動查詢 和 事件接收 兩種技術(shù)實現(xiàn)模式。
對于主動查詢而言,用戶通常會在數(shù)據(jù)源表的某個字段中,保存上次更新的時間戳或版本號等信息,然后下游通過不斷的查詢和與上次的記錄做對比,來確定數(shù)據(jù)是否有變動,是否需要同步。這種方式優(yōu)點是不涉及數(shù)據(jù)庫底層特性,實現(xiàn)比較通用;缺點是要對業(yè)務表做改造,且實時性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對數(shù)據(jù)庫的壓力較大。
事件接收模式可以通過觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來實現(xiàn)。當數(shù)據(jù)源表發(fā)生變動時,會通過附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來。下游可以通過數(shù)據(jù)庫底層的協(xié)議,訂閱并消費這些事件,然后對數(shù)據(jù)庫變動記錄做重放,從而實現(xiàn)同步。這種方式的優(yōu)點是實時性高,可以精確捕捉上游的各種變動;缺點是部署數(shù)據(jù)庫的事件接收和解析器(例如 Debezium、Canal 等),有一定的學習和運維成本,對一些冷門的數(shù)據(jù)庫支持不夠。
綜合來看,事件接收模式整體在實時性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫實現(xiàn),建議使用 Debezium ( http://debezium.io/documentation/reference/1.4/connectors/index.html ) 來實現(xiàn)變更數(shù)據(jù)的捕獲(下圖來自 Debezium 官方文檔 [http://debezium.io/documentation/reference/architecture.html] )。如果使用的只有 MySQL,則還可以用 Canal ( http://github.com/alibaba/canal) 。
三、為什么選 Flink?
從上圖可以看到,Debezium 官方架構(gòu)圖中,是通過 Kafka Streams 直接實現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因為 Flink 相對 Kafka Streams 而言,有如下優(yōu)勢:
而且 Flink Table / SQL 模塊將數(shù)據(jù)庫表和變動記錄流(例如 CDC 的數(shù)據(jù)流)看做是 同一事物的兩面 ( http://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/dynamic_tables.html) ,因此內(nèi)部提供的 Upsert 消息結(jié)構(gòu)( +I 表示新增、 -U 表示記錄更新前的值、 +U 表示記錄更新后的值, -D 表示刪除)可以與 Debezium 等生成的變動記錄一一對應。
四、Flink CDC 的使用方法
目前 Flink CDC 支持兩種數(shù)據(jù)源輸入方式。
例如 MySQL -> Debezium -> Kafka -> Flink -> PostgreSQL。適用于已經(jīng)部署好了 Debezium,希望暫存一部分數(shù)據(jù)到 Kafka 中以供多次消費,只需要 Flink 解析并分發(fā)到下游的場景。
在該場景下,由于 CDC 變更記錄會暫存到 Kafka 一段時間,因此可以在這期間任意啟動/重啟 Flink 作業(yè)進行消費;也可以部署多個 Flink 作業(yè)對這些數(shù)據(jù)同時處理并寫到不同的數(shù)據(jù)目的(Sink)庫表中,實現(xiàn)了 Source 變動與 Sink 的解耦。
例如我們有個 MySQL 數(shù)據(jù)庫,需要實時將內(nèi)容同步到 PostgreSQL 中。假設已經(jīng)安裝部署好 Debezium 并開始消費 PostgreSQL 的變更日志,這些日志在持續(xù)寫入名為 YourDebeziumTopic 的 Kafka 主題中。
我們可以新建一個 Flink SQL 作業(yè),然后輸入如下 SQL 代碼(連接參數(shù)都是虛擬的,僅供參考):
CREATE TABLE `Data_Input` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;kafka&39;, -- 可選 &39;kafka&39;,&39;kafka-0.11&39;. 注意選擇對應的內(nèi)置 Connector &39;topic&39; = &39;YourDebeziumTopic&39;, -- 替換為您要消費的 Topic &39;scan.startup.mode&39; = &39;earliest-offset&39; -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一種 &39;properties.bootstrap.servers&39; = &39;10.0.1.2:9092&39;, -- 替換為您的 Kafka 連接地址 &39;properties.group.id&39; = &39;YourGroup&39;, -- 必選參數(shù), 一定要指定 Group ID -- 定義數(shù)據(jù)格式 (Debezium JSON 格式) &39;format&39; = &39;debezium-json&39;, &39;debezium-json.schema-include&39; = &39;false&39;,); CREATE TABLE `Data_Output` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;jdbc&39;, &39;url&39; = &39;jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchema=mySchema&reWriteBatchedInserts=true&39;, -- 請?zhí)鎿Q為您的實際 PostgreSQL 連接參數(shù) &39;table-name&39; = &39;MyTable&39;, -- 需要寫入的數(shù)據(jù)表 &39;username&39; = &39;user&39;, -- 數(shù)據(jù)庫訪問的用戶名(需要提供 INSERT 權(quán)限) &39;password&39; = &39;helloworld&39; -- 數(shù)據(jù)庫訪問的密碼); INSERT INTO `Data_Output` SELECt * FROM `Data_Input`;
如果在流計算 Oceanus 界面上,可以勾選 kafka 和 jdbc 兩個內(nèi)置的 Connector:
隨后直接開始運行作業(yè),F(xiàn)link 就會源源不斷的消費 YourDebeziumTopic 這個 Kafka 主題中 Debezium 寫入的記錄,然后輸出到下游的 MySQL 數(shù)據(jù)庫中,實現(xiàn)了數(shù)據(jù)同步。
我們還可以跳過 Debezium 和 Kafka 的中轉(zhuǎn),使用 Flink CDC Connectors ( http://github.com/ververica/flink-cdc-connectors ) 對上游數(shù)據(jù)源的變動進行直接的訂閱處理。從內(nèi)部實現(xiàn)上講,F(xiàn)link CDC Connectors 內(nèi)置了一套 Debezium 和 Kafka 組件,但這個細節(jié)對用戶屏蔽,因此用戶看到的數(shù)據(jù)鏈路如下圖所示:
同樣的,這次我們有個 MySQL 數(shù)據(jù)庫,需要實時將內(nèi)容同步到 PostgreSQL 中。但我們沒有也不想安裝 Debezium 等額外組件,那我們可以新建一個 Flink SQL 作業(yè),然后輸入如下 SQL 代碼(連接參數(shù)都是虛擬的,僅供參考):
CREATE TABLE `Data_Input` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;mysql-cdc&39;, -- 可選 &39;mysql-cdc&39; 和 &39;postgres-cdc&39; &39;hostname&39; = &39;192.168.10.22&39;, -- 數(shù)據(jù)庫的 IP &39;port&39; = &39;3306&39;, -- 數(shù)據(jù)庫的訪問端口 &39;username&39; = &39;debezium&39;, -- 數(shù)據(jù)庫訪問的用戶名(需要提供 SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT, SELECt, RELOAD 權(quán)限) &39;password&39; = &39;hello@world!&39;, -- 數(shù)據(jù)庫訪問的密碼 &39;database-name&39; = &39;YourData&39;, -- 需要同步的數(shù)據(jù)庫 &39;table-name&39; = &39;YourTable&39; -- 需要同步的數(shù)據(jù)表名);CREATE TABLE `Data_Output` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;jdbc&39;, &39;url&39; = &39;jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchema=mySchema&reWriteBatchedInserts=true&39;, -- 請?zhí)鎿Q為您的實際 PostgreSQL 連接參數(shù) &39;table-name&39; = &39;MyTable&39;, -- 需要寫入的數(shù)據(jù)表 &39;username&39; = &39;user&39;, -- 數(shù)據(jù)庫訪問的用戶名(需要提供 INSERT 權(quán)限) &39;password&39; = &39;helloworld&39; -- 數(shù)據(jù)庫訪問的密碼);INSERT INTO `Data_Output` SELECT * FROM `Data_Input`;
如果在流計算頁面,可以選擇內(nèi)置的 mysql-cdc 和 jdbc Connector:
注意
需要使用 Flink CDC Connectors ( http://github.com/ververica/flink-cdc-connectors) 附加組件。騰訊云 Oceanus 已經(jīng)自帶了 MySQL-CDC Connector,如果自行部署的話,需要下載 jar 包并將其放入 Flink 的 lib 目錄下。
訪問數(shù)據(jù)庫時,請確保連接的用戶足夠權(quán)限(PostgreSQL 用戶 看這里 [ http://debezium.io/documentation/reference/connectors/postgresql.htmlpostgresql-permissions] ,MySQL 用戶 看這里 [http://debezium.io/documentation/reference/connectors/mysql.htmlsetting-up-mysql] )。
五、Flink CDC 模塊的實現(xiàn)
flink-json 模塊中的 org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory 是負責構(gòu)造解析 Debezium JSON 格式的工廠類;同樣地, org.apache.flink.formats.json.canal.CanalJsonFormatFactory 負責 Canal JSON 格式。這些類已經(jīng)內(nèi)置在 Flink 1.11 的發(fā)行版中,直接可以使用,無需附加任何程序包。
對于 Debezium JSON 格式而言,F(xiàn)link 將具體的解析邏輯放在了 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchemaDebeziumJsonDeserializationSchema 類中。
上圖表示 Debezium JSON 的一條更新(Update)消息,它表示上游已將 id=123 的數(shù)據(jù)更新,且字段內(nèi)包含了更新前的舊值,以及更新后的新值。
那么,F(xiàn)link 是如何解析并生成對應的 Flink 消息呢?我們看下這個類的 deserialize 方法:
GenericRowData before = (GenericRowData) payload.getField(0); // 更新前的數(shù)據(jù)GenericRowData after = (GenericRowData) payload.getField(1); // 更新后的數(shù)據(jù)String op = payload.getField(2).toString(); // 獲取 &34;op&34; 字段的類型if (OP_CREATE.equals(op) || OP_READ.equals(op)) { // 如果是創(chuàng)建 (c) 或快照讀取 (r) 消息 after.setRowKind(RowKind.INSERT); // 設置消息類型為新建 (+I) out.collect(after); // 發(fā)送給下游} else if (OP_UPDATE.equals(op)) { // 如果是更新 (u) 消息 before.setRowKind(RowKind.UPDATE_BEFORE); // 把更新前的數(shù)據(jù)類型設置為撤回 (-U) after.setRowKind(RowKind.UPDATE_AFTER); // 把更新后的數(shù)據(jù)類型設置為更新 (+U) out.collect(before); // 發(fā)送兩條數(shù)據(jù)給下游 out.collect(after);} else if (OP_DELETE.equals(op)) { // 如果是刪除 (d) 消息 before.setRowKind(RowKind.DELETE); // 將消息類型設置為刪除 (-D) out.collect(before); // 發(fā)送給下游} else { ... // 異常處理邏輯}
從上述邏輯可以看出,對于每一種 Debezium 的操作碼( op 字段的類型),都可以用 Flink 的 RowKind 類型來表示。對于插入 +I 和刪除 D ,都只需要一條消息即可;而對于更新,則涉及刪除舊數(shù)據(jù)和寫入新數(shù)據(jù),因此需要 -U 和 +U 兩條消息來對應。
特別地,在 MySQL、PostgreSQL 等支持 Upsert(原子操作的 Update or Insert)語義的數(shù)據(jù)庫中,通常前一個 -U 消息可以省略,只把后一個 +U 消息用作實際的更新操作即可,這個優(yōu)化在 Flink 中也有實現(xiàn)。
因此可以看到,Debezium 到 Flink 消息的轉(zhuǎn)換邏輯是非常簡單和自然的,這也多虧了 Flink 先進的設計理念,很早就提出并實現(xiàn)了 Upsert 數(shù)據(jù)流和動態(tài)數(shù)據(jù)表之間的映射關(guān)系。
(1)flink-connector-debezium 模塊
我們在使用 Flink CDC Connectors 時,也會好奇它究竟是如何做到的不需要安裝和部署外部服務就可以實現(xiàn) CDC 的。當我們閱讀 flink-connector-mysql-cdc 的源碼時,可以看到它內(nèi)部依賴了 flink-connector-debezium 模塊,而這個模塊將 Debezium Embedded ( http://github.com/debezium/debezium/tree/master/debezium-embedded ) 嵌入到了 Connector 中。
flink-connector-debezium 的數(shù)據(jù)源實現(xiàn)類為 com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction ,它集成了 Flink 中的 RichSourceFunction 并實現(xiàn)了 CheckpointedFunction 以支持快照保存狀態(tài)。
通常而言,對于 SourceFunction,我們可以從它的 run 方法入手分析。它的核心代碼如下:
this.engine = DebeziumEngine.create(Connect.class) .using(properties) // 初始化 Debezium 所需的參數(shù) .notifying(debeziumConsumer) // 收到批量的變更消息, 則 Debezium 會回調(diào) DebeziumChangeConsumer 來反序列化并向下游輸出數(shù)據(jù) .using(OffsetCommitPolicy.always()) .using( (success, message, error) -> { if (!success && error != null) { this.reportError(error); } }) .build();... executor.execute(engine); // 向 Executor 提交 Debezium 線程以啟動運行
可以看到,這個 SourceFunction 使用一些預先定義的參數(shù),初始化了一個嵌入式的 DebeziumEngine(Java 的 Runnable ),然后提交給線程池(executor)去執(zhí)行。這個 Debezium 線程會批量接收 binlog 信息并回調(diào)傳入的 debeziumConsumer 以反序列化消息并交給 Flink 來處理。本類的其他方法主要負責初始化狀態(tài)和保存快照,這里略過。
這里我們再來看一下 DebeziumChangeConsumer 的實現(xiàn),它的最核心的方法是 handleBatch 。當 Debezium 收到一批新的事件時,會調(diào)用這個方法來通知我們的 Connector 進行處理。這里有個 for 循環(huán)輪詢的邏輯:
for (ChangeEvent event : changeEvents) { // 輪詢各個事件 SourceRecord record = event.value(); if (isHeartbeatEvent(record)) { // 如果時心跳包 // 只更新當前 offset 信息, 然后繼續(xù)(不進行實際處理) synchronized (checkpointLock) { debeziumOffset.setSourcePartition(record.sourcePartition()); debeziumOffset.setSourceOffset(record.sourceOffset()); } continue; } deserialization.deserialize(record, debeziumCollector); // 反序列化這條消息 if (isInDbSnapshotPhase) { // 如果處于數(shù)據(jù)庫快照期, 需要阻止 Flink 檢查點(Checkpoint)生成 if (!lockHold) { MemoryUtils.UNSAFE.monitorEnter(checkpointLock); lockHold = true; ... } if (!isSnapshotRecord(record)) { // 如果已經(jīng)不在數(shù)據(jù)庫快照期了, 就釋放鎖, 允許 Flink 正常生成檢查點(Checkpoint) MemoryUtils.UNSAFE.monitorExit(checkpointLock); isInDbSnapshotPhase = false; ... } } // 更新當前 offset 信息, 并向下游 Flink 算子發(fā)送數(shù)據(jù) emitRecordsUnderCheckpointLock( debeziumCollector.records, record.sourcePartition(), record.sourceOffset());}
可以看到邏輯比較簡單,只需要關(guān)注 checkpointLock 這個對象:只有持有這個對象的鎖時,才允許 Flink 進行檢查點的生成。
當作業(yè)處于數(shù)據(jù)庫快照期(即作業(yè)剛啟動時,需全量同步源數(shù)據(jù)庫的一份完整快照,此時收到的數(shù)據(jù)類型是 Debezium 的 SnapshotRecord ),則不允許 Flink 進行 Checkpoint 即檢查點的生成,以避免作業(yè)崩潰恢復后狀態(tài)不一致;同樣地,如果正在向下游算子發(fā)送數(shù)據(jù)并更新 offset 信息時,也不允許快照的進行。這些操作都是為了保證 Exacly-Once(精確一致)語義。
這里也解釋了在作業(yè)剛啟動時,如果數(shù)據(jù)庫較大(同步時間較久),F(xiàn)link 剛開始的 Checkpoint 永遠失?。ǔ瑫r)的原因:只有當 Flink 完整同步了全量數(shù)據(jù)后,才可以進行增量數(shù)據(jù)的處理,以及 Checkpoint 的生成。
(2)flink-connector-mysql-cdc 模塊
而對于 flink-connector-mysql-cdc 模塊而言,它主要涉及到 MySQLTableSource 的聲明和實現(xiàn)。
我們知道,F(xiàn)link 是通過 Java 的 SPI(Service Provider Interface)機制動態(tài)加載 Connector 的,因此我們首先看這個模塊的 src/main/resources/meta-INF/services/org.apache.flink.table.factories.Factory 文件,里面內(nèi)容指向 com.alibaba.ververica.cdc.connectors.mysql.table.MySQLTableSourceFactory 。
打開這個工廠類,我們可以看到它定義了該 Connector 所需的參數(shù),例如 MySQL 數(shù)據(jù)庫的用戶名、密碼、表名等信息,并負責 MySQLTableSource 實例的具體創(chuàng)建,而 MySQLTableSource 類對這些參數(shù)做轉(zhuǎn)換,最終會生成一個上文提到的 DebeziumSourceFunction 對象。
因此我們可以發(fā)現(xiàn),這個模塊作用是一個 MySQL 參數(shù)的封裝和轉(zhuǎn)換層,最終的邏輯實現(xiàn)仍然是由 flink-connector-debezium 完成的。
六、MySQL CDC 常見問題&優(yōu)化
由于 Flink 的 CDC 功能還比較新(1.11 版本剛開始支持,1.12 版本逐步完善),因而在應用過程中,很可能會遇到有各種問題。鑒于大多數(shù)客戶的數(shù)據(jù)源都是 MySQL,我們這里整理了客戶常見的一些問題和優(yōu)化方案,希望能夠幫助到大家。
Debezium 報錯:binlog probably contains events generated with statement or mixed based replication format
當前的 Binlog 格式被設置為了 STATEMENT 或者 MIXED , 這兩種都不被 Debezium 支持。為了使用 Flink CDC 功能,需要把 MySQL 的 binlog-format 設置為 ROW :
SET GLOBAL binlog_format = &39;ROW&39;;SET GLOBAL binlog_row_image = &39;FULL&39;;
如果您使用的是騰訊云的 TencentDB for MySQL,請確認下面設置:
Debezium 報錯:User does not have the &39;LOCK TABLES&39; privilege required to obtain a consistent snapshot 或 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s)
請對作業(yè)中指定的 MySQL 用戶賦予如下權(quán)限: SELECT, RELOAD, SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT ,例如:
GRANT SELECT , RELOAD, SHOW DATAbaseS , REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO &39;用戶名&39; IDENTIFIED BY &39;密碼&39; ;
FLUSH PRIVILEGES ;
如果您使用的數(shù)據(jù)庫不允許或者不希望使用 RELOAD 進行全局鎖,則還需要授予 LOCK TABLES 權(quán)限以令 Debezium 嘗試進行表級鎖。 注意,表級鎖會導致更長的數(shù)據(jù)庫鎖定時間!
如果希望徹底跳過鎖(對數(shù)據(jù)的一致性要求不高,但要求數(shù)據(jù)庫不能被鎖),則可以在 WITH 參數(shù)中設置 &39;debezium.snapshot.locking.mode&39; = &39;none&39; 參數(shù)來跳過鎖操作。但請注意,同步過程中千萬不要隨意變更庫表的結(jié)構(gòu)。
前文講過,F(xiàn)link CDC Connector 在初始的全量快照同步階段,會屏蔽掉快照的執(zhí)行,因此如果 Flink Checkpoint 需要執(zhí)行的話,就會因為一直無法獲得 checkpointLock 對象的鎖而超時。
可以設置 Flink 的 execution.checkpointing.tolerable-failed-checkpoint 參數(shù)以容忍更多的 Checkpoint 失敗事件,同時可以調(diào)大 Checkpoint 周期,避免作業(yè)因 Checkpoint 失敗而一直重啟。
如果發(fā)現(xiàn)數(shù)據(jù)庫中的某些數(shù)據(jù)在 CDC 同步后有缺失,請確認是否仍在使用 Flink 舊版 1.10 的 Flink SQL WITH 語法(例如 WITH 參數(shù)中的 connector.type 是 舊語法 [ http://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.htmljdbc-connector] , connector 是 新語法 [ http://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.htmlhow-to-create-a-jdbc-table] )。
舊版語法的 Connector 在 JDBC 批量寫入 Upsert 數(shù)據(jù)(例如數(shù)據(jù)庫的更新記錄)時,并未考慮到 Upsert 與 Delete 消息之間的順序關(guān)系,因此會出現(xiàn)錯亂的問題,請盡快遷移到新版的 Flink SQL 語法。
默認情況下,如果遇到異常的數(shù)據(jù)(例如消費的 Kafka topic 在無意間混入了其他數(shù)據(jù)),F(xiàn)link 會立刻崩潰重啟,然后從上個快照點(Checkpoint)重新消費。由于某條異常數(shù)據(jù)的存在,作業(yè)會永遠因為異常而重啟。可以在 WITH 參數(shù)中加入 &39;debezium-json.ignore-parse-errors&39; = &39;true&39; 來應對這個問題。
Debezium 服務端發(fā)生異常并恢復后,由于可能沒有及時記錄崩潰前的現(xiàn)場,可能會退化為 At least once 模式,即同樣的數(shù)據(jù)可能被發(fā)送多次,造成下游結(jié)果不準確。
為了應對這個問題,新版的 Flink 1.12 增加了一個 table.exec.source.cdc-events-duplicate 配置項(可以編輯 flink-conf.yaml 文件來配置),建議將其設置為 true 以對這些重復數(shù)據(jù)進行去重。
但是需要注意,該選項需要數(shù)據(jù)源表 定義了主鍵 ,否則也無法進行去重操作。
七、未來展望
在 Flink 1.11 版本中,CDC 功能首次被集成到內(nèi)核中。由于 Flink 1.11.0 版本有個 嚴重 Bug ( http://issues.apache.org/jira/browse/Flink-18461 ) 造成 Upsert 數(shù)據(jù)無法寫入下游,我們建議使用 1.11.1 及以上版本。
在 1.12 版本上,F(xiàn)link 還在配置項中增加了前文提到的 table.exec.source.cdc-events-duplicate 等選項以更好地支持 CDC 去重;還支持 Avro 格式的 Debezium 數(shù)據(jù)流,而不僅僅限于 JSON 了。另外,這個版本增加了對 Maxwell ( http://maxwells-daemon.io/) 格式的 CDC 數(shù)據(jù)流支持,
為了更好地完善 CDC 功能模塊,F(xiàn)link 社區(qū)創(chuàng)建了 [Flink-18822] 以追蹤關(guān)于該模塊的進展??梢詮闹锌吹?,F(xiàn)link 1.13 主要著力于支持更多的類型( Flink-18758 [ http://issues.apache.org/jira/browse/Flink-18758 ] ),以及允許從 Debezium Avro、Canal 等數(shù)據(jù)流中讀取一些元數(shù)據(jù)信息等。
而在更遠的規(guī)劃中,F(xiàn)link 還可能支持基于 CDC 的內(nèi)存數(shù)據(jù)庫緩存,這樣我們可以在內(nèi)存中動態(tài)地 JOIN 一個數(shù)據(jù)庫的副本,而不必每次都查詢源庫,這將極大地提升作業(yè)的處理能力,并降低數(shù)據(jù)庫的查詢壓力。
作者:kyledong
來源:微信公眾號:騰訊云大數(shù)據(jù)
出處:http://mp.weixin.qq.com/s?__biz=MzUzNTc0NTcyMw==&mid=2247485657&idx=1&sn=f6fa360a7c35cb6f03e78222ca8691cb
李同
版權(quán)所有 未經(jīng)許可不得轉(zhuǎn)載
增值電信業(yè)務經(jīng)營許可證備案號:遼ICP備14006349號
網(wǎng)站介紹 商務合作 免責聲明 - html - txt - xml