中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!-創(chuàng)新互聯(lián)

bitchat 是一個(gè)基于 Netty 的 IM 即時(shí)通訊框架

成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),西烏珠穆沁企業(yè)網(wǎng)站建設(shè),西烏珠穆沁品牌網(wǎng)站建設(shè),網(wǎng)站定制,西烏珠穆沁網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,西烏珠穆沁網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

項(xiàng)目地址:https://github.com/all4you/bitchat

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

快速開始

bitchat-example模塊提供了一個(gè)服務(wù)端與客戶端的實(shí)現(xiàn)示例,可以參照該示例進(jìn)行自己的業(yè)務(wù)實(shí)現(xiàn)。

啟動服務(wù)端

要啟動服務(wù)端,需要獲取一個(gè) Server 的實(shí)例,可以通過 ServerFactory 來獲取。

目前只實(shí)現(xiàn)了單機(jī)模式下的 Server ,通過 SimpleServerFactory 只需要定義一個(gè)端口即可獲取一個(gè)單機(jī)的 Server 實(shí)例,如下所示:

public class StandaloneServerApplication {
    public static void main(String[] args) {
        Server server = SimpleServerFactory.getInstance()
            .newServer(8864);
        server.start();
    }
}

服務(wù)端啟動成功后,將顯示如下信息:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

啟動客戶端

目前只實(shí)現(xiàn)了直連服務(wù)器的客戶端,通過 SimpleClientFactory 只需要指定一個(gè) ServerAttr 即可獲取一個(gè)客戶端,然后進(jìn)行客戶端與服務(wù)端的連接,如下所示:

public class DirectConnectServerClientApplication {

    public static void main(String[] args) {
        Client client = SimpleClientFactory.getInstance()
            .newClient(ServerAttr.getLocalServer(8864));
        client.connect();

        doClientBiz(client);
    }
}

客戶端連接上服務(wù)端后,將顯示如下信息:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

體驗(yàn)客戶端的功能

目前客戶端提供了三種 Func,分別是:登錄,查看在線用戶列表,發(fā)送單聊消息,每種 Func 有不同的命令格式。

登錄

通過在客戶端中執(zhí)行以下命令?-lo houyi 123456即可實(shí)現(xiàn)登錄,目前用戶中心還未實(shí)現(xiàn),通過 Mock 的方式實(shí)現(xiàn)一個(gè)假的用戶服務(wù),所以輸入任何的用戶名密碼都會登錄成功,并且會為用戶創(chuàng)建一個(gè)用戶id。

登錄成功后,顯示如下:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

查看在線用戶

再啟動一個(gè)客戶端,并且也執(zhí)行登錄,登錄成功后,可以執(zhí)行?-lu命令,獲取在線用戶列表,目前用戶是保存在內(nèi)存中,獲取的結(jié)果如下所示:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

發(fā)送單聊信息

用 gris 這個(gè)用戶向 houyi 這個(gè)用戶發(fā)送單聊信息,只要執(zhí)行?-pc 1 hello,houyi命令即可

其中第二個(gè)參數(shù)數(shù)要發(fā)送消息給那個(gè)用戶的用戶id,第三個(gè)參數(shù)是消息內(nèi)容

消息發(fā)送方,發(fā)送完消息:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

消息接收方,接收到消息:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

客戶端斷線重連

客戶端和服務(wù)端之間維持著心跳,雙方都會檢查連接是否可用,客戶端每隔5s會向服務(wù)端發(fā)送一個(gè) PingPacket,而服務(wù)端接收到這個(gè) PingPacket 之后,會回復(fù)一個(gè) PongPacket,這樣表示雙方都是健康的。

當(dāng)因?yàn)槟撤N原因,服務(wù)端沒有收到客戶端發(fā)送的消息,服務(wù)端將會把該客戶端的連接斷開,同樣的客戶端也會做這樣的檢查。

當(dāng)客戶端與服務(wù)端之間的連接斷開之后,將會觸發(fā)客戶端 HealthyChecker 的 channelInactive 方法,從而進(jìn)行客戶端的斷線重連。

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

整體架構(gòu)

單機(jī)版

單機(jī)版的架構(gòu)只涉及到服務(wù)端、客戶端,另外有兩者之間的協(xié)議層,如下圖所示:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

除了服務(wù)端和客戶端之外,還有三大中心:消息中心,用戶中心,鏈接中心。

  • 消息中心:主要負(fù)責(zé)消息的存儲與歷史、離線消息的查詢

  • 用戶中心:主要負(fù)責(zé)用戶和群組相關(guān)的服務(wù)

  • 鏈接中心:主要負(fù)責(zé)保存客戶端的鏈接,服務(wù)端從鏈接中心獲取客戶端的鏈接,向其推送消息

集群版

單機(jī)版無法做到高可用,性能與可服務(wù)的用戶數(shù)也有一定的限制,所以需要有可擴(kuò)展的集群版,集群版在單機(jī)版的基礎(chǔ)上增加了一個(gè)路由層,客戶端通過路由層來獲得可用的服務(wù)端地址,然后與服務(wù)端進(jìn)行通訊,如下圖所示:

Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!

客戶端發(fā)送消息給另一個(gè)用戶,服務(wù)端接收到這個(gè)請求后,從 Connection中心中獲取目標(biāo)用戶“掛”在哪個(gè)服務(wù)端下,如果在自己名下,那最簡單直接將消息推送給目標(biāo)用戶即可,如果在其他服務(wù)端,則需要將該請求轉(zhuǎn)交給目標(biāo)服務(wù)端,讓目標(biāo)服務(wù)端將消息推送給目標(biāo)用戶。

自定義協(xié)議

通過一個(gè)自定義協(xié)議來實(shí)現(xiàn)服務(wù)端與客戶端之間的通訊,協(xié)議中有如下幾個(gè)字段:

*
* <p>
* The structure of a Packet is like blow:
* +----------+----------+----------------------------+
* |  size    |  value   |  intro                     |
* +----------+----------+----------------------------+
* | 1 bytes  | 0xBC     |  magic number              |
* | 1 bytes  |          |  serialize algorithm       |
* | 4 bytes  |          |  packet symbol             |
* | 4 bytes  |          |  content length            |
* | ? bytes  |          |  the content               |
* +----------+----------+----------------------------+
* </p>
*

每個(gè)字段的含義

所占字節(jié)用途
1魔數(shù),默認(rèn)為 0xBC
1序列化的算法
4Packet 的類型
4Packet 的內(nèi)容長度
?Packet 的內(nèi)容

序列化算法將會決定該 Packet 在編解碼時(shí),使用何種序列化方式。

Packet 的類型將會決定到達(dá)服務(wù)端的字節(jié)流將被反序列化為何種 Packet,也決定了該 Packet 將會被哪個(gè) PacketHandler 進(jìn)行處理。

內(nèi)容長度將會解決 Packet 的拆包與粘包問題,服務(wù)端在解析字節(jié)流時(shí),將會等到字節(jié)的長度達(dá)到內(nèi)容的長度時(shí),才進(jìn)行字節(jié)的讀取。

除此之外,Packet 中還會存儲一個(gè) sync 字段,該字段將指定服務(wù)端在處理該 Packet 的數(shù)據(jù)時(shí)是否需要使用異步的業(yè)務(wù)線程池來處理。

健康檢查

服務(wù)端與客戶端各自維護(hù)了一個(gè)健康檢查的服務(wù),即 Netty 為我們提供的 IdleStateHandler,通過繼承該類,并且實(shí)現(xiàn) channelIdle 方法即可實(shí)現(xiàn)連接 “空閑” 時(shí)的邏輯處理,當(dāng)出現(xiàn)空閑時(shí),目前我們只關(guān)心讀空閑,我們既可以認(rèn)為這條鏈接出現(xiàn)問題了。

那么只需要在鏈接出現(xiàn)問題時(shí),將這條鏈接關(guān)閉即可,如下所示:

public class IdleStateChecker extends IdleStateHandler {

    private static final int DEFAULT_READER_IDLE_TIME = 15;

    private int readerTime;

    public IdleStateChecker(int readerIdleTime) {
        super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS);
        readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime;
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}", 
        IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel());
        ctx.channel().close();
    }

}

另外,客戶端需要額外再維護(hù)一個(gè)健康檢查器,正常情況下他負(fù)責(zé)定時(shí)向服務(wù)端發(fā)送心跳,當(dāng)鏈接的狀態(tài)變成 inActive 時(shí),該檢查器將負(fù)責(zé)進(jìn)行重連,如下所示:

public class HealthyChecker extends ChannelInboundHandlerAdapter {

    private static final int DEFAULT_PING_INTERVAL = 5;

    private Client client;

    private int pingInterval;

    public HealthyChecker(Client client, int pingInterval) {
        Assert.notNull(client, "client can not be null");
        this.client = client;
        this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        schedulePing(ctx);
    }

    private void schedulePing(ChannelHandlerContext ctx) {
        ctx.executor().schedule(() -> {
            Channel channel = ctx.channel();
            if (channel.isActive()) {
                log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName());
                channel.writeAndFlush(new PingPacket());
                schedulePing(ctx);
            }
        }, pingInterval, TimeUnit.SECONDS);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(() -> {
            log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName());
            client.connect();
        }, 5, TimeUnit.SECONDS);
        ctx.fireChannelInactive();
    }

}

業(yè)務(wù)線程池

我們知道,Netty 中維護(hù)著兩個(gè) IO 線程池,一個(gè) boss 主要負(fù)責(zé)鏈接的建立,另外一個(gè) worker 主要負(fù)責(zé)鏈接上的數(shù)據(jù)讀寫,我們不應(yīng)該使用 IO 線程來處理我們的業(yè)務(wù),因?yàn)檫@樣很可能會對 IO 線程造成阻塞,導(dǎo)致新鏈接無法及時(shí)建立或者數(shù)據(jù)無法及時(shí)讀寫。

為了解決這個(gè)問題,我們需要在業(yè)務(wù)線程池中來處理我們的業(yè)務(wù)邏輯,但是這并不是絕對的,如果我們要執(zhí)行的邏輯很簡單,不會造成太大的阻塞,則可以直接在 IO 線程中處理,比如客戶端發(fā)送一個(gè) Ping 服務(wù)端回復(fù)一個(gè) Pong,這種情況是沒有必要在業(yè)務(wù)線程池中進(jìn)行處理的,因?yàn)樘幚硗炅俗罱K還是要交給 IO 線程去寫數(shù)據(jù)。但是如果一個(gè)業(yè)務(wù)邏輯需要查詢數(shù)據(jù)庫或者讀取文件,這種操作往往比較耗時(shí)間,所以就需要將這些操作封裝起來交給業(yè)務(wù)線程池去處理。

服務(wù)端允許客戶端在傳輸?shù)?Packet 中指定采用何種方式進(jìn)行業(yè)務(wù)的處理,服務(wù)端在將字節(jié)流解碼成 Packet 之后,會根據(jù) Packet 中的 sync 字段的值,確定怎樣對該 Packet 進(jìn)行處理,如下所示:

public class ServerPacketDispatcher extends 
    SimpleChannelInboundHandler<Packet> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Packet request) {
        // if the packet should be handled async
        if (request.getAsync() == AsyncHandle.ASYNC) {
            EventExecutor channelExecutor = ctx.executor();
            // create a promise
            Promise<Packet> promise = new DefaultPromise<>(channelExecutor);
            // async execute and get a future
            Future<Packet> future = executor.asyncExecute(promise, ctx, request);
            future.addListener(new GenericFutureListener<Future<Packet>>() {
                @Override
                public void operationComplete(Future<Packet> f) throws Exception {
                    if (f.isSuccess()) {
                        Packet response = f.get();
                        writeResponse(ctx, response);
                    }
                }
            });
        } else {
            // sync execute and get the response packet
            Packet response = executor.execute(ctx, request);
            writeResponse(ctx, response);
        }
    }
}

不止是IM框架

bitchat除了可以作為 IM 框架之外,還可以作為一個(gè)通用的通訊框架。

Packet 作為通訊的載體,通過繼承 AbstractPacket 即可快速實(shí)現(xiàn)自己的業(yè)務(wù),搭配 PacketHandler 作為數(shù)據(jù)處理器即可實(shí)現(xiàn)客戶端與服務(wù)端的通訊。

創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務(wù)器,動態(tài)BGP最優(yōu)骨干路由自動選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動現(xiàn)已開啟,新人活動云服務(wù)器買多久送多久。

本文標(biāo)題:Netty實(shí)戰(zhàn):設(shè)計(jì)一個(gè)IM框架就這么簡單!-創(chuàng)新互聯(lián)
網(wǎng)站地址:http://www.rwnh.cn/article38/cepesp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、全網(wǎng)營銷推廣、搜索引擎優(yōu)化、微信小程序、動態(tài)網(wǎng)站企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司
苍南县| 林周县| 宜章县| 鹤壁市| 巨野县| 九寨沟县| 永新县| 古浪县| 鄂伦春自治旗| 梓潼县| 凉城县| 多伦县| 府谷县| 吉林市| 八宿县| 滁州市| 巴中市| 靖宇县| 板桥市| 双牌县| 夏邑县| 宝应县| 新昌县| 揭西县| 鹤庆县| 乐都县| 修水县| 万全县| 吉林省| 泗阳县| 岑巩县| 鸡东县| 广南县| 大关县| 呈贡县| 桦甸市| 延安市| 齐河县| 天峻县| 玛多县| 冷水江市|