2021-02-08 分類: 網(wǎng)站建設(shè)
宜人貸蜂巢團(tuán)隊,由Michael創(chuàng)立于2013年,通過使用互聯(lián)網(wǎng)科技手段助力金融生態(tài)和諧健康發(fā)展。自成立起一直致力于多維度數(shù)據(jù)閉環(huán)平臺建設(shè)。目前團(tuán)隊規(guī)模超過百人,涵蓋征信、
圖1 - API網(wǎng)關(guān)項目框架
圖中描繪了API網(wǎng)關(guān)系統(tǒng)的處理流程,以及與服務(wù)注冊發(fā)現(xiàn)、日志分析、報警系統(tǒng)、各類爬蟲的關(guān)系。其中API網(wǎng)關(guān)系統(tǒng)接收請求,對請求進(jìn)行編解碼、鑒權(quán)、限流、加解密,再基于Eureka服務(wù)注冊發(fā)現(xiàn)模塊,將請求發(fā)送到有效的服務(wù)節(jié)點上;網(wǎng)關(guān)及抓取系統(tǒng)的日志,會被收集到elk平臺中,做業(yè)務(wù)分析及報警處理。
二、BIO vs NIO
API網(wǎng)關(guān)承載數(shù)倍于爬蟲的流量,提升服務(wù)器的并發(fā)處理能力、縮短系統(tǒng)的響應(yīng)時間,通信模型的選擇是至關(guān)重要的,是選擇BIO,還是NIO?
1. Streamvs Buffer & 阻塞 vs 非阻塞
BIO是面向流的,io的讀寫,每次只能處理一個或者多個bytes,如果數(shù)據(jù)沒有讀寫完成,線程將一直等待于此,而不能暫時跳過io或者等待io讀寫完成異步通知,線程滯留在io讀寫上,不能充分利用機(jī)器有限的線程資源,造成server的吞吐量較低,見圖2。而NIO與此不同,面向Buffer,線程不需要滯留在io讀寫上,采用操作系統(tǒng)的epoll模式,在io數(shù)據(jù)準(zhǔn)備好了,才由線程來處理,見圖3。
NioEvenrLoopGroup的創(chuàng)建,具體執(zhí)行過程是執(zhí)行類MultithreadEventExecutorGroup的構(gòu)造方法:
其中,創(chuàng)建細(xì)節(jié)見下:
chooser的創(chuàng)建細(xì)節(jié),見下:
DefaultEventExecutorChooserFactory根據(jù)線程數(shù)創(chuàng)建具體的EventExecutorChooser,線程數(shù)如果等于2^n,可使用按位與替代取模運(yùn)算,節(jié)省cpu的計算資源,見源碼:
- @SuppressWarnings("unchecked")
- @Override
- public EventExecutorChooser newChooser(EventExecutor[] executors) {
- if (isPowerOfTwo(executors.length)) {
- return new PowerOfTowEventExecutorChooser(executors);
- } else {
- return new GenericEventExecutorChooser(executors);
- }
- }
- private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
- private final AtomicInteger idx = new AtomicInteger();
- private final EventExecutor[] executors;
- PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- return executors[idx.getAndIncrement() & executors.length - 1];
- }
- }
- private static final class GenericEventExecutorChooser implements EventExecutorChooser {
- private final AtomicInteger idx = new AtomicInteger();
- private final EventExecutor[] executors;
- GenericEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- return executors[Math.abs(idx.getAndIncrement() % executors.length)];
- }
- }
newChild(executor, args)的創(chuàng)建細(xì)節(jié),見下:
MultithreadEventExecutorGroup的newChild方法是一個抽象方法,故使用NioEventLoopGroup的newChild方法,即調(diào)用NioEventLoop的構(gòu)造函數(shù):
- @Override
- protected EventLoop newChild(Executor executor, Object... args) throws Exception {
- return new NioEventLoop(this, executor, (SelectorProvider) args[0],
- ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
- }
在這里先看下NioEventLoop的類層次關(guān)系:
創(chuàng)建任務(wù)隊列tailTasks(內(nèi)部為有界的LinkedBlockingQueue):
創(chuàng)建線程的任務(wù)隊列taskQueue(內(nèi)部為有界的LinkedBlockingQueue),以及任務(wù)過多防止系統(tǒng)宕機(jī)的拒絕策略rejectedHandler。
其中tailTasks和taskQueue均是任務(wù)隊列,而優(yōu)先級不同,taskQueue的優(yōu)先級高于tailTasks,定時任務(wù)的優(yōu)先級高于taskQueue。
五、ServerBootstrap初始化及啟動
了解了Netty線程池NioEvenrLoopGroup的創(chuàng)建過程后,下面看下API網(wǎng)關(guān)服務(wù)ServerBootstrap的是如何使用線程池引入服務(wù)中,為高并發(fā)訪問服務(wù)的。
API網(wǎng)關(guān)ServerBootstrap初始化及啟動代碼,見下:
- serverBootstrap = new ServerBootstrap();
- bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());
- workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());
- serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
- .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())
- .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())
- // Memory pooled
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childHandler(channelInitializer);
- ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();
- log.info("API-gateway started on port: {}", config.getPort());
- future.channel().closeFuture().sync();
API網(wǎng)關(guān)系統(tǒng)使用netty自帶的線程池,共有三組線程池,分別為bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暫不作介紹)。其中,bossGroup用于接收客戶端的TCP連接,workerGroup用于處理I/O、執(zhí)行系統(tǒng)task和定時任務(wù),executorGroup用于處理網(wǎng)關(guān)業(yè)務(wù)加解密、限流、路由,及將請求轉(zhuǎn)發(fā)給后端的抓取服務(wù)等業(yè)務(wù)操作。
六、Channel與線程池的綁定
ServerBootstrap初始化后,通過調(diào)用bind(port)方法啟動Server,bind的調(diào)用鏈如下:
- AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister
其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化過程指定channel為NioServerSocketChannel.class,至此將NioServerSocketChannel與bossGroup綁定到一起,bossGroup負(fù)責(zé)客戶端連接的建立。那么NioSocketChannel是如何與workerGroup綁定到一起的?
調(diào)用鏈AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead:
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final Channel child = (Channel) msg;
- child.pipeline().addLast(childHandler);
- for (Entry<ChannelOption<?>, Object> e: childOptions) {
- try {
- if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
- logger.warn("Unknown channel option: " + e);
- }
- } catch (Throwable t) {
- logger.warn("Failed to set a channel option: " + child, t);
- }
- }
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
- try {
- childGroup.register(child).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
其中,childGroup.register(child)就是將NioSocketChannel與workderGroup綁定到一起,那又是什么觸發(fā)了ServerBootstrapAcceptor的channelRead方法?
其實當(dāng)一個 client 連接到 server 時,Java 底層的 NIO ServerSocketChannel 會有一個SelectionKey.OP_ACCEPT 就緒事件,接著就會調(diào)用到 NioServerSocketChannel.doReadMessages方法。
- @Override
- protected int doReadMessages(List<Object> buf) throws Exception {
- SocketChannel ch = javaChannel().accept();
- try {
- if (ch != null) {
- buf.add(new NioSocketChannel(this, ch));
- return 1;
- }
- } catch (Throwable t) { …
- }
- return 0;
- }
javaChannel().accept() 會獲取到客戶端新連接的SocketChannel,實例化為一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this),由此可知, 我們創(chuàng)建的這個NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 。
接下來就經(jīng)由 Netty 的 ChannelPipeline 機(jī)制,將讀取事件逐級發(fā)送到各個 handler 中,于是就會觸發(fā)前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。
至此,分析了Netty線程池的初始化、ServerBootstrap的啟動及channel與線程池的綁定過程,能夠看出Netty中線程池的優(yōu)雅設(shè)計,使用不同的線程池負(fù)責(zé)連接的建立、IO讀寫等,為API網(wǎng)關(guān)項目的高并發(fā)訪問提供了技術(shù)基礎(chǔ)。
七、總結(jié)
網(wǎng)站題目:宜人貸蜂巢API網(wǎng)關(guān)技術(shù)解密之Netty使用實踐
文章分享:http://www.rwnh.cn/news46/99846.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供面包屑導(dǎo)航、定制開發(fā)、動態(tài)網(wǎng)站、網(wǎng)站改版、做網(wǎng)站、網(wǎng)頁設(shè)計公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容