DynamoShake怎么從dynamodb遷移到mongodb,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
創(chuàng)新互聯(lián)長期為近1000家客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為崇義企業(yè)提供專業(yè)的做網(wǎng)站、網(wǎng)站設(shè)計,崇義網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。DynamoDB支持全量和增量的同步,進程啟動后會先進行全量同步,全量同步結(jié)束后進入增量同步的階段。
全量同步分為數(shù)據(jù)同步和索引同步兩部分,數(shù)據(jù)同步用于同步數(shù)據(jù),數(shù)據(jù)同步結(jié)束后將會進行索引的同步,索引同步會同步默認的primary key,用戶自建的索引GSI如果MongoDB是副本集支持,集群版目前暫時不支持同步。
增量同步只同步數(shù)據(jù),不同步增量同步過程中產(chǎn)生的索引。
此外,全量和增量同步階段不支持對原來的庫表進行DDL操作,比如刪表,建表,建索引等。
斷點續(xù)傳
全量同步不支持斷點續(xù)傳功能,增量同步支持斷點續(xù)傳,也就是說如果增量斷開了,一定時間內(nèi)恢復(fù)是可以只進行增量的斷點續(xù)傳。但在某些情況下,比如斷開的時間過久,或者之前的位點(參考下文)丟失,那么都會導(dǎo)致重新觸發(fā)全量同步。
所有源端的表會寫入到目的的一個庫(默認是dynamo-shake)的不同表中,比如用戶有table1,table2,那么同步完后,目的端會有個dynamo-shake的庫,庫里面有table1和table2的表。
在原生的dynamodb中,協(xié)議是包裹了一層類型字段,其格式是“key: type: value”格式,例如用戶插入了一條{hello: 1},那么dynamodb接口獲取的數(shù)據(jù)是{"hello": {"N": 1}}的格式。
Dynamo所有的數(shù)據(jù)類型:
String
Binary
Number
StringSet
NumberSet
BinarySet
Map
List
Boolean
Null
那么我們提供2種轉(zhuǎn)換方式,raw和change,其中raw就是按照裸的dynamodb接口獲取的數(shù)據(jù)寫入:
rszz-4.0-2:PRIMARY> use dynamo-shake switched to db dynamo-shake rszz-4.0-2:PRIMARY> db.zhuzhao.find() { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : { "L" : [ { "S" : "aa1" }, { "N" : "1234" } ] }, "hello_world" : { "S" : "f2" } } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : { "N" : "222" }, "qqq" : { "SS" : [ "h2", "h3" ] }, "hello_world" : { "S" : "yyyyyyyyyyy" }, "test" : { "S" : "aaa" } } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : { "L" : [ { "N" : "0" }, { "N" : "1" }, { "N" : "2" } ] }, "hello_world" : { "S" : "測試中文" } }
change表示剝離類型字段:
rszz-4.0-2:PRIMARY> use dynamo-shake switched to db dynamo-shake rszz-4.0-2:PRIMARY> db.zhuzhao.find() { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : [ "aa1", 1234 ] , "hello_world" : "f2" } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : 222, "qqq" : [ "h2", "h3" ] , "hello_world" : "yyyyyyyyyyy", "test" : "aaa" } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : [ 0, 1, 2 ], "hello_world" : "測試中文" }
用戶可以根據(jù)自己的需求制定自己的同步類型。
增量的斷點續(xù)傳是根據(jù)位點來實現(xiàn)的,默認的位點是寫入到目的MongoDB中,庫名是dynamo-shake-checkpoint。每個表都會記錄一個checkpoint的表,同樣還會有一個status_table表記錄當前是全量同步還是增量同步階段。
rszz-4.0-2:PRIMARY> use dynamo-shake42-checkpoint switched to db dynamo-shake42-checkpoint rszz-4.0-2:PRIMARY> show collections status_table zz_incr0 zz_incr1 rszz-4.0-2:PRIMARY> rszz-4.0-2:PRIMARY> rszz-4.0-2:PRIMARY> db.status_table.find() { "_id" : ObjectId("5d6e0ef77e592206a8c86bfd"), "key" : "status_key", "status_value" : "incr_sync" } rszz-4.0-2:PRIMARY> db.zz_incr0.find() { "_id" : ObjectId("5d6e0ef17e592206a8c8643a"), "shard_id" : "shardId-00000001567391596311-61ca009c", "father_id" : "shardId-00000001567375527511-6a3ba193", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c8644c"), "shard_id" : "shardId-00000001567406847810-f5b6578b", "father_id" : "shardId-00000001567391596311-61ca009c", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c86456"), "shard_id" : "shardId-00000001567422218995-fe7104bc", "father_id" : "shardId-00000001567406847810-f5b6578b", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c86460"), "shard_id" : "shardId-00000001567438304561-d3dc6f28", "father_id" : "shardId-00000001567422218995-fe7104bc", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c8646a"), "shard_id" : "shardId-00000001567452243581-ed601f96", "father_id" : "shardId-00000001567438304561-d3dc6f28", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c86474"), "shard_id" : "shardId-00000001567466737539-cc721900", "father_id" : "shardId-00000001567452243581-ed601f96", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef27e592206a8c8647e"), "shard_id" : "shardId-00000001567481807517-935745a3", "father_id" : "shardId-00000001567466737539-cc721900", "seq_num" : "", "status" : "done", "worker_id" : "unknown-worker", "iterator_type" : "LATEST", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAGsTOg0+3HY+yzzD1cTzc7TPXi/iBi7sA5Q6SGSoaAJ2gz2deQu5aPRW/flYK0pG9ZUvmCfWqe1A5usMFWfVvd+yubMwWSHfV2IPVs36TaQnqpMvsywll/x7IVlCgmsjr6jStyonbuHlUYwKtUSq8t0tFvAQXtKi0zzS25fQpITy/nIb2y/FLppcbV/iZ+ae1ujgWGRoojhJ0FiYPhmbrR5ZBY2dKwEpok+QeYMfF3cEOkA4iFeuqtboUMgVqBh0zUn87iyTFRd6Xm49PwWZHDqtj/jtpdFn0CPoQPj2ilapjh9lYq/ArXMai5DUHJ7xnmtSITsyzUHakhYyIRXQqF2UWbDK3F7+Bx5d4rub1d4S2yqNUYA2eZ5CySeQz7CgvzaZT391axoqKUjjPpdUsm05zS003cDDwrzxmLnFi0/mtoJdGoO/FX9LXuvk8G3hgsDXBLSyTggRE0YM+feER8hPgjRBqbfubhdjUxR+VazwjcVO3pzt2nIkyKPStPXJZIf4cjCagTxQpC/UPMtcwWNo2gQjM2XSkWpj7DGS2E4738biV3mtKXGUXtMFVecxTL/qXy2qpLgy4dD3AG0Z7pE+eJ9qP5YRE6pxQeDlgbERg==", "update_date" : "" } { "_id" : ObjectId("5d6e1d807e592206a8c9a102"), "shard_id" : "shardId-00000001567497561747-03819eba", "father_id" : "shardId-00000001567481807517-935745a3", "seq_num" : "39136900000000000325557205", "status" : "in processing", "worker_id" : "unknown", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAFw/qdbPLjsXMlPalnhh65koia44yz6A1W2uwUyu/MzRUhaaqnI0gPM8ebVgy7dW7dDWLTh/WXYyDNNyXR3Hvk01IfEKDf+FSLMNvh3iELdrO5tRoLtZI2fxxrPZvudRc3KShX0Pvqy2YYwl4nlBR6QezHTWx5H2AU22MGPTx8aMRbjUgPwvgEExRgdzfhG6G9gkc7C71fIc98azwpSm/IW+mV/h/doFndme47k2v8g0GNJvgLSoET7HdJYH3XFdqh5QVDIP4sbz8X1cpN3y8AlT7Muk2/yXOdNeTL6tApuonCrUpJME9/qyBYQVI5dsAHnAWaP2Te3EAvz3ao7oNdnA8O6uz5VF9zFdN1OUHWM40kLUsX4sHve7McEzFLgf4NL1WTAnPN13cFhEm9BS8M7tiJqZ0OzgkbF1AWfq+xg/O6c57/Vvx/G/75DZ8XcWIABgGNkWBET/vLDrgjJQ0PUZJZKNmmbgKKTyHgSl4YOXNEeyH7l6atuc2WaREDjbf7lnQO5No11sz4g3O+AreBcpGVhdZNhGGcrG/wduPYEZfg2hG1sfYiSAM8GipUPMA0PM7JPIJmqCaY90JxRcI1By24tpp9Th45/5rLTGPYJZA==", "update_date" : "" }
其中status_table表中"status_value" : "incr_sync"
表示進入了增量階段。增量的每個shard都會記錄一個checkpoint,關(guān)于具體shard分裂的規(guī)則可以參考dynamodb的guan'fa官方文檔。下面是增量表checkpoint的各個字段的說明:
_id
:mongodb自帶主鍵id
shard_id
:shard的id,每個shard有一個唯一的id
father_id
:父shard的id,shard可能有一個父shard。
seq_num
: 目前處理到的shard內(nèi)部的sequence number,這個是主要的位點信息。
status
: 目前同步的階段,一共有以下幾個狀態(tài):
"not process": 未處理
"no need to process": 沒有必要處理
"prepare stage":準備處理
"in processing": 處理中
"wait father finish":等待父節(jié)點處理完畢再進行處理
"done": 處理完畢
worker_id
:處理的worker id,目前暫未啟用
iterator_type
:shard的遍歷方式
shard_it
:shard的迭代器地址,次要位點信息。
update_date
:checkpoint更新的時間戳
根據(jù)默認的primary key創(chuàng)建一個唯一索引,并且根據(jù)partition key創(chuàng)建shard key。用戶自己的索引gsi目前不進行創(chuàng)建。
本小節(jié)主要介紹DynamoShake的部分架構(gòu)細節(jié)
下圖是基本的一個table的數(shù)據(jù)同步架構(gòu)圖(dynamo-shake會啟動多個并發(fā)線程tableSyncer進行拉取,用戶可控并發(fā)度),fetcher線程從源端dynamodb拉取數(shù)據(jù)后將數(shù)據(jù)推入隊列,緊接著parser線程從隊列中拿取數(shù)據(jù)并進行解析(dynamo協(xié)議轉(zhuǎn)bson),executor負責聚合部分數(shù)據(jù)并寫入mongodb。
fetcher。目前fetcher線程只有1個,用的是協(xié)議轉(zhuǎn)換驅(qū)動是aws提供的driver。fetcher的原理是調(diào)用driver進行批量抓取源庫的數(shù)據(jù),抓到了就塞入隊列中,直到抓完當前table的所有數(shù)據(jù)。fetcher單獨分離出來主要是出于網(wǎng)絡(luò)IO考慮的,目前拉取受網(wǎng)絡(luò)影響,會比較慢。
parser。parser可以啟動多個,默認目前是2個,用戶可以通過FullDocumentParser
進行控制。其主要就是從隊列中讀取數(shù)據(jù),并解析成bson結(jié)構(gòu)。parser解析后,數(shù)據(jù)按條寫入executor的隊列。parser線程單獨獨立出來主要是出于解析比較耗CPU資源考慮。
executor。executor也可以啟動多個,默認目前是4個,用戶可以通過FullDocumentConcurrency
進行控制。executor從隊列中拉取,并進行batch聚合(聚合上限16MB,總條數(shù)1024)后寫入目的mongodb。
當前所有表的數(shù)據(jù)寫入完后,tableSyncer將會退出。
增量整體架構(gòu)如下:
Fetcher線程負責感知stream中shard的變化,Manager負責進行消息的通知,或者創(chuàng)建新的Dispatcher進行消息的處理,一個shard對應(yīng)一個Dispatcher。Dispatcher從源端拉取增量數(shù)據(jù),并通過Batcher進行數(shù)據(jù)解析和打包整合,然后通過executor進行寫入到MongoDB,同時會更新checkpoint。另外,如果是斷點續(xù)傳,那么Dispatcher會從舊的checkpoint位點開始拉取,而不是從頭開始拉。
啟動:./dynamo-shake -conf=dynamo-shake.conf
,配置參數(shù)在dynamo-shake.conf中指定,以下是各個參數(shù)的意義:
id: 修改會影響MongoDB上目的庫的名字
log.file:日志文件,不配置將打印到標準輸出
log.level: log級別。推薦默認。
log.buffer: 打印是否帶緩存。推薦默認。
system_profile:打印內(nèi)部堆棧的端口號。推薦默認。
http_profile:暫未啟用
sync_mode:同步模式,all表示全量+增量,full表示僅全量,incr表示僅增量(目前不支持)
source.access_key_id: dynamodb連接配置參數(shù)
source.secret_access_key: dynamodb連接配置參數(shù)
source.session_token: dynamodb連接配置參數(shù),沒有可以留空
source.region: dynamodb連接配置參數(shù)
filter.collection.white:過濾白名單,只同步指定的表
filter.collection.black:過濾黑名單,不通過指定的表。
qps.full:全量階段限速,1秒鐘發(fā)送多少個請求
qps.full.batch_num:全量階段限速,1個請求最多包括多少個item。
qps.incr:增量階段限速,1秒鐘發(fā)送多少個請求
qps.incr.batch_num:增量階段限速,1個請求最多包括多少個item。
target.type:目的端配置,目前僅支持mongodb
target.address: 目的端mongodb的連接串地址。
target.mongodb.type: mongodb是replica還是sharding
target.mongodb.exist:如果目的庫同名表存在,執(zhí)行什么行為。drop表示刪除,rename表示重命名,留空表示不處理。
full.concurrency:全量同步的線程個數(shù),1個線程對應(yīng)1個表
full.document.concurrency:全量同步1個表內(nèi)的并發(fā)個數(shù)。
full.document.parser:1個表內(nèi)parser線程的個數(shù)
full.enable_index.primary:是否同步dynamodb的primary key。
full.enable_index.user:是否同步用戶自建的索引,目前不支持
convert.type:寫入的模式,raw表示裸寫入, change表示解析類型字段后寫入,參考上述文檔。
increase.concurrency:增量同步并發(fā)參數(shù),1次最多抓取的shard個數(shù)
checkpoint.address = checkpont的存儲地址,默認不配置與目的庫一致。
checkpoint.db = checkpoint寫入的db的名字,默認是$db-checkpoint。
DynamoFullCheck是一個用于校驗DynamoDB和MongoDB數(shù)據(jù)是否一致的工具,目前僅支持全量校驗,不支持增量,也就是說,如果增量同步階段,那么源和目的是不一致的。
DynamoFullCheck只支持單向校驗,也就是校驗DynamoDB的數(shù)據(jù)是否是MongoDB的子集,反向不進行校驗。
另外,還支持抽樣校驗,支持只校驗感興趣的表。
校驗主要分為以下幾部分:
輪廓校驗。首先,校驗兩邊的表中數(shù)目是否一致;接著,校驗索引是否一致(目前沒做索引校驗)。注意,如果表中數(shù)目不一致,將會直接退出,不會進行后續(xù)的校驗。
精確校驗。精確校驗數(shù)據(jù),原理是從源端拉取數(shù)據(jù)并解析,如果有唯一索引,那么根據(jù)唯一索引查找MongoDB的doc,并對比一致性;如果沒有唯一索引,那么會根據(jù)整個doc在MongoDB中進行查找(比較重)。
抽樣原理:
精確校驗的時候,如果啟用抽樣,那么會對每個doc進行抽樣,判斷當前doc是否需要抽樣。原理比較簡單,比如按30%抽樣,那么再0~100中產(chǎn)生一個隨機數(shù),如果是0~30的就校驗,反之不校驗。
DynamoFullCheck由于從源DynamoDB拉取也需要經(jīng)過fetch,parse階段,所以一定程度上,該部分代碼復(fù)用了DynamoShake,不同的是DynamoFullCheck內(nèi)部各個fetcher, parser, executor線程并發(fā)度都是1。
full-check參數(shù)稍微簡單點,直接用的命令行注入,例如:./dynamo-full-check --sourceAccessKeyID=BUIASOISUJPYS5OP3P5Q --sourceSecretAccessKey=TwWV9reJCrZhHKSYfqtTaFHW0qRPvjXb3m8TYHMe --sourceRegion=ap-east-1 -t="10.1.1.1:30441" --sample=300
Usage: dynamo-full-check.darwin [OPTIONS] Application Options: -i, --id= target database collection name (default: dynamo-shake) -l, --logLevel= -s, --sourceAccessKeyID= dynamodb source access key id --sourceSecretAccessKey= dynamodb source secret access key --sourceSessionToken= dynamodb source session token --sourceRegion= dynamodb source region --qpsFull= qps of scan command, default is 10000 --qpsFullBatchNum= batch number in each scan command, default is 128 -t, --targetAddress= mongodb target address -d, --diffOutputFile= diff output file name (default: dynamo-full-check-diff) -p, --parallel= how many threads used to compare, default is 16 (default: 16) -e, --sample= comparison sample number for each table, 0 means disable (default: 1000) --filterCollectionWhite= only compare the given tables, split by ';' --filterCollectionBlack= do not compare the given tables, split by ';' -c, --convertType= convert type (default: raw) -v, --version print version Help Options: -h, --help Show this help message
看完上述內(nèi)容,你們掌握DynamoShake怎么從dynamodb遷移到mongodb的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道,感謝各位的閱讀!
當前文章:DynamoShake怎么從dynamodb遷移到mongodb-創(chuàng)新互聯(lián)
文章鏈接:http://www.rwnh.cn/article42/dcdshc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計公司、品牌網(wǎng)站制作、定制開發(fā)、網(wǎng)站排名、網(wǎng)站維護、網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)