這篇文章將為大家詳細(xì)講解有關(guān)flink中如何使用sql將流式數(shù)據(jù)寫入hive,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
創(chuàng)新互聯(lián)是一家專業(yè)提供威寧企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)、H5技術(shù)、小程序制作等業(yè)務(wù)。10年已為威寧眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。
上一篇介紹了使用sql將流式數(shù)據(jù)寫入文件系統(tǒng),這次我們來(lái)介紹下使用sql將文件寫入hive,對(duì)于如果想寫入已經(jīng)存在的hive表,則至少需要添加以下兩個(gè)屬性. 寫入hive底層還是和寫入文件系統(tǒng)一樣的,所以對(duì)于其他具體的配置參考上一篇.
alter table table_name set TBLPROPERTIES ('is_generic'='false');
alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore');
//如果想使用eventtime分區(qū)
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time');
下面我們講解一下,如何使用java程序來(lái)構(gòu)建一個(gè)flink程序來(lái)寫入hive。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
//構(gòu)造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");
如果目前系統(tǒng)中沒(méi)有存在相應(yīng)的hive表,可以通過(guò)在程序中執(zhí)行相應(yīng)的DDL建表語(yǔ)句來(lái)建表,如果已經(jīng)存在了,就把這段代碼省略,使用上面的hive命令修改現(xiàn)有表,添加相應(yīng)的屬性。
CREATE EXTERNAL TABLE `fs_table`(
`user_id` string,
`order_amount` double)
PARTITIONED BY (
`dt` string,
`h` string,
`m` string)
stored as ORC
TBLPROPERTIES (
'sink.partition-commit.policy.kind'='metastore',
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)
String insertSql = "insert into fs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);
完整的代碼請(qǐng)參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteHive.java
對(duì)于如上的程序和sql,如果配置了是使用eventtime,在此程序中配置了'sink.partition-commit.trigger'='partition-time',最后發(fā)現(xiàn)程序沒(méi)法提交分區(qū)。
分析了一下源碼,問(wèn)題是出在了這個(gè)方法,org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions。先貼上代碼:
@Override
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
//通過(guò)分區(qū)的值抽取分區(qū)的時(shí)間.
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
//判斷水印是否大于分區(qū)創(chuàng)建時(shí)間+延遲時(shí)間
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
系統(tǒng)通過(guò)分區(qū)值來(lái)抽取相應(yīng)的分區(qū)創(chuàng)建時(shí)間,然后進(jìn)行比對(duì),比如我們?cè)O(shè)置的pattern是 h:$m:00 , 某一時(shí)刻我們正在往 /2020-07-06/18/20/ 這個(gè)分區(qū)下寫數(shù)據(jù),那么程序根據(jù)分區(qū)值,得到的pattern將會(huì)是2020-07-06 18:20:00,這個(gè)值在sql中是根據(jù)DATA_FORMAT函數(shù)獲取的。
這個(gè)值是帶有時(shí)區(qū)的, 也是我想要的, 比如我們的時(shí)區(qū)設(shè)置為東八區(qū),2020-07-06 18:20:00這個(gè)時(shí)間是東八區(qū)的時(shí)間,換成標(biāo)準(zhǔn)UTC時(shí)間是減去八個(gè)小時(shí),也就是2020-07-06 10:20:00,而源碼中的toMills函數(shù)在處理這個(gè)東八區(qū)的時(shí)間時(shí),并沒(méi)有任何加入任何時(shí)區(qū)的處理,把這個(gè)其實(shí)應(yīng)該是東八區(qū)的時(shí)間當(dāng)做了UTC時(shí)間來(lái)處理,這樣計(jì)算出來(lái)的值就比實(shí)際值大8小時(shí),導(dǎo)致一直沒(méi)有觸發(fā)分區(qū)的提交。
如果我們?cè)跀?shù)據(jù)源構(gòu)造的分區(qū)是UTC時(shí)間,也就是不帶分區(qū)的時(shí)間,那么這個(gè)邏輯就是沒(méi)有問(wèn)題的,但是這樣又不符合我們的實(shí)際情況,比如對(duì)于分區(qū)2020-07-06 18:20:00,我希望我的分區(qū)肯定是東八區(qū)的時(shí)間,而不是比東八區(qū)小8個(gè)小時(shí)的UTC時(shí)間2020-07-06 10:20:00。
所以針對(duì)上述情況,有兩種解決方案,一種是自定義一個(gè)分區(qū)抽取類,第二,就是修改源碼,改一下現(xiàn)在的缺省的時(shí)間分區(qū)抽取類。我個(gè)人認(rèn)為修改一下缺省類更好理解,因?yàn)槟壳皩懭胛募蚳ive這塊配置和概念有點(diǎn)多,我不想太增加過(guò)多的配置來(lái)增加用戶的難度,應(yīng)該盡可能的用缺省值就能使程序很好的運(yùn)行。
我們看下flink中的StreamingFileSink類,構(gòu)造分區(qū)桶的時(shí)候默認(rèn)是使用的DateTimeBucketAssigner,其構(gòu)造分區(qū)路徑就是帶有時(shí)區(qū)概念的,默認(rèn)就用的是本地時(shí)區(qū)。
public DateTimeBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
這個(gè)問(wèn)題,也不知道算不算一個(gè)bug,我給官方提交了一個(gè)ISSUE,但是官方?jīng)]有采納,不過(guò)我覺(jué)得不符合我的習(xí)慣,所以我對(duì)這個(gè)功能進(jìn)行了修改,讓partition.time-extractor.timestamp-pattern提取的partiiton是帶有時(shí)區(qū)的,默認(rèn)情況下是本地時(shí)區(qū)。如果是非本地時(shí)區(qū),可以指定時(shí)區(qū),通過(guò)參數(shù)partition.time-extractor.time-zone來(lái)指定,我們可以通下面的代碼獲取有效的時(shí)區(qū)。
Set<String> zoneIds = ZoneId.getAvailableZoneIds();
zoneIds.stream().forEach(System.out::println);
比如我們東八區(qū)默認(rèn)使用 Asia/Shanghai。
我基于社區(qū)的flink的tag release-1.11.0-rc4,我改了一下代碼 將代碼放到了github上。
關(guān)于flink中如何使用sql將流式數(shù)據(jù)寫入hive就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
當(dāng)前題目:flink中如何使用sql將流式數(shù)據(jù)寫入hive
本文地址:http://www.rwnh.cn/article16/pcojdg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、網(wǎng)站改版、外貿(mào)網(wǎng)站建設(shè)、定制開(kāi)發(fā)、定制網(wǎng)站、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)