亚洲精品中文免费|亚洲日韩中文字幕制服|久久精品亚洲免费|一本之道久久免费

      
      

            <dl id="hur0q"><div id="hur0q"></div></dl>

                圖解 Kafka 網(wǎng)絡(luò)層實現(xiàn)機制之上篇

                圖解 Kafka 網(wǎng)絡(luò)層實現(xiàn)機制之上篇

                在上一篇中,主要帶大家深度剖析了「 生產(chǎn)者元數(shù)據(jù) 」的拉取、管理全流程,今天我們就來聊聊 Kafka 是如何對 Java NIO 進行封裝的 ,本系列總共分為3篇,主要剖析以下幾個問題:

              1. 針對 Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來實現(xiàn)最基礎(chǔ)的網(wǎng)絡(luò)連接以及讀寫操作的?
              2. 剖析 KafkaChannel 是如何對傳輸層、讀寫 buffer 操作進行封裝的?
              3. 剖析工業(yè)級 NIO 實戰(zhàn):如何基于位運算來控制事件的監(jiān)聽以及拆包、粘包是如何實現(xiàn)的?
              4. 剖析 Kafka 是如何封裝 Selector 多路復(fù)用器的?
              5. 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進行連接以及網(wǎng)絡(luò)讀寫的?
              6. 剖析 Kafka 網(wǎng)絡(luò)發(fā)送消息和接收響應(yīng)的整個過程是怎樣的?
              7. 本篇只討論前3個問題,剩余的放到后2篇中。

                認(rèn)真讀完這篇文章,我相信你會對 Kafka 封裝 Java NIO 源碼有更加深刻的理解。

                這篇文章干貨很多,希望你可以耐心讀完。

                01 總體概述

                上篇剖析了「 生產(chǎn)者元數(shù)據(jù)的拉取和管理的全過程 」,此時發(fā)送消息的時候就有了元數(shù)據(jù),但是還沒有進行網(wǎng)絡(luò)通信,而網(wǎng)絡(luò)通信是一個相對復(fù)雜的過程,對于 Java 系統(tǒng)來說網(wǎng)絡(luò)通信一般會采用 NIO 庫來實現(xiàn),所以 Kafka 對 Java NIO 封裝了統(tǒng)一的框架,來實現(xiàn)多路復(fù)用的網(wǎng)絡(luò) I/O 操作 。

                為了方便大家理解,所有的源碼只保留骨干。

                02 Kafka 對 Java NIO 的封裝

                如果大家對 Java NIO 不了解的話,可以看下這個文檔,這里就不過多介紹了。

                https://pdai.tech/md/java/io/java-io-nio.html

                我們來看看 Kafka 對 Java NIO 組件做了哪些封裝? 這里先說下結(jié)果,后面會深度剖析。

              8. TransportLayer:它是一個接口,封裝了底層 NIO 的 SocketChannel。
              9. NetworkReceive:封裝了 NIO 的 ByteBuffer 中的讀 Buffer, 對網(wǎng)絡(luò)編程中的粘包、拆包經(jīng)典實現(xiàn) 。
              10. NetworkSend:封裝了 NIO 的 ByteBuffer 中的寫 Buffer。
              11. KafkaChannel:對 TransportLayer、NetworkReceive、NetworkSend 進一步封裝,屏蔽了底層的實現(xiàn)細(xì)節(jié),對上層更友好。
              12. KafkaSelector:封裝了 NIO 的 Selector 多路復(fù)用器組件。
              13. 接下來我們挨個對上面組件進行剖析。

                02 TransportLayer 封裝過程

                TransportLayer 接口是對 NIO 中 「 SocketChannel 」 的封裝。它的實現(xiàn)類總共有 2 個:

              14. PlaintextTransportLayer:明文網(wǎng)絡(luò)傳輸實現(xiàn)。
              15. SslTransportLayer:SSL 加密網(wǎng)絡(luò)傳輸實現(xiàn)。
              16. 本篇只剖析 PlaintextTransportLayer 的實現(xiàn)。

                github 源碼地址如下:

                https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.javapublic class PlaintextTransportLayer implements TransportLayer { // java nio 中 SelectionKey 事件 private final SelectionKey key; // java nio 中的SocketChannel private final SocketChannel socketChannel; // 安全相關(guān) private final Principal principal = KafkaPrincipal.ANONYMOUS; // 初始化 public PlaintextTransportLayer(SelectionKey key) throws IOException { // 對 NIO 中 SelectionKey 類的對象引用 this.key = key; // 對 NIO 中 SocketChannel 類的對象引用 this.socketChannel = (SocketChannel) key.channel(); }}

                從上面代碼可以看出,該類就是 對底層 NIO 的 socketChannel 封裝引用 。將構(gòu)造函數(shù)的 SelectionKey 類對象賦值給 key,然后從 key 中取出對應(yīng)的 SocketChannel 賦值給 socketChannel,這樣就完成了初始化工作。

                接下來,我們看看幾個重要方法是如何使用這2個 NIO 組件的。

                02.1 finishConnect()

                @Override// 判斷網(wǎng)絡(luò)連接是否完成public boolean finishConnect() throws IOException { // 1. 調(diào)用socketChannel的finishConnect方法,返回該連接是否已經(jīng)連接完成 boolean connected = socketChannel.finishConnect(); // 2. 如果網(wǎng)絡(luò)連接完成以后就刪除對OP_CONNECT事件的監(jiān)聽,同時添加對OP_READ事件的監(jiān)聽 if (connected) // 事件操作 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); // 3. 最后返回網(wǎng)絡(luò)連接 return connected;}

                該方法主要用來 判斷網(wǎng)絡(luò)連接是否完成 ,如果完成就關(guān)注 「 OP_READ 」 事件,并取消 「 OP_CONNECT 」 事件。

              17. 首先調(diào)用 socketChannel 通道的 finishConnect() 判斷連接是否完成。
              18. 如果網(wǎng)絡(luò)連接完成以后就刪除對 OP_CONNECT 事件的監(jiān)聽,同時添加對 OP_READ 事件的監(jiān)聽,因為連接完成后就可能接收數(shù)據(jù)了。
              19. 最后返回網(wǎng)絡(luò)連接 connected。
              20. 二進制位運算事件監(jiān)聽

                這里通過「 二進制位運算 」巧妙的解決了網(wǎng)絡(luò)事件的監(jiān)聽操作,實現(xiàn)非常經(jīng)典。

                通過 socketChannel 在 Selector 多路復(fù)用器注冊事件返回 SelectionKey ,SelectionKey 的類型包括:

              21. OP_READ:可讀事件,值為:1<<0 == 1 == 00000001。
              22. OP_WRITE:可寫事件,值為:1<<2 == 4 == 00000100。
              23. OP_CONNECT:客戶端連接服務(wù)端的事件,一般為創(chuàng)建 SocketChannel 客戶端 channel,值為:1<<3 == 8 ==00001000。
              24. OP_ACCEPT:服務(wù)端接收客戶端連接的事件,一般為創(chuàng)建 ServerSocketChannel 服務(wù)端 channel,值為:1<<4 == 16 == 00010000。
              25. key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

                首先” “符號代表按位取反,”&”代表按位取與,通過 key.interestOps() 獲取當(dāng)前的事件,然后和 OP_CONNECT事件取反「 11110111 」 后按位與操作。

                所以,”& xx” 代表刪除 xx 事件, 有就刪除,沒有就不變 ;而 “| xx” 代表將 xx 事件添加進去。

                02.2 read()

                @Overridepublic int read(ByteBuffer dst) throws IOException { // 調(diào)用 NIO 的通道實現(xiàn)數(shù)據(jù)的讀取 return socketChannel.read(dst);}

                該方法主要用來 把 socketChannel 里面的數(shù)據(jù)讀取緩沖區(qū) ByteBuffer 里 ,通過調(diào)用 socketChannel.read() 實現(xiàn)。

                02.3 write()

                @Overridepublic int write(ByteBuffer src) throws IOException { return socketChannel.write(src);}

                該方法主要用來 把緩沖區(qū) ByteBuffer 的數(shù)據(jù)寫到 SocketChannel 里 ,通過調(diào)用 socketChannel.write() 實現(xiàn)。

                大家都知道在網(wǎng)絡(luò)編程中,一次讀寫操作并一定能把數(shù)據(jù)讀寫完,所以就需要判斷是否讀寫完成,勢必會涉及數(shù)據(jù)的「 拆包 」、「 粘包 」操作。 這些操作比較繁瑣,因此 Kafka 將 ByteBuffer 的讀寫操作進行重新封裝,分別對應(yīng) NetworkReceive 讀操作、NetworkSend 寫操作,對于上層調(diào)用無需判斷是否讀寫完成,更加友好 。

                接下來我們就來分別剖析下這2個類的實現(xiàn)。

                03 NetworkReceive 封裝過程

                public class NetworkReceive implements Receive { …. // 空 ByteBuffer private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); private final String source; // 存儲響應(yīng)消息數(shù)據(jù)長度 private final ByteBuffer size; // 響應(yīng)消息數(shù)據(jù)的最大長度 private final int maxSize; // ByteBuffer 內(nèi)存池 private final MemoryPool memoryPool; // 已讀取字節(jié)大小 private int requestedBufferSize = -1; // 存儲響應(yīng)消息數(shù)據(jù)體 private ByteBuffer buffer; // 初始化構(gòu)造函數(shù) public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) { this.source = source; // 分配4個字節(jié)大小的數(shù)據(jù)長度 this.size = ByteBuffer.allocate(4); this.buffer = null; // 能接收消息的最大長度 this.maxSize = maxSize; this.memoryPool = memoryPool; }}

              26. EMPTY_BUFFER:空 Buffer,值為 ByteBuffer.allocate(0)。
              27. source:final類型,用來確定對應(yīng) channel id。
              28. size:final類型,存儲響應(yīng)消息數(shù)據(jù)長度,大小為4字節(jié)。
              29. maxSize:final類型,接收響應(yīng)消息數(shù)據(jù)的最大長度。
              30. memoryPool:final類型,ByteBuffer 內(nèi)存池。
              31. requestedBufferSize:已讀取字節(jié)大小。
              32. buffer:存儲響應(yīng)消息數(shù)據(jù)體。
              33. 從屬性可以看出,包含2個 ByteBuffer,分別是 size 和 buffer。這里重點說下源碼中的 size字段 的初始化。通過長度編碼方式實現(xiàn),上來就先分配了 4字節(jié) 大小的 ByteBuffer 來存儲響應(yīng)消息數(shù)據(jù)長度,即32位,與 Java int 占用相同的字節(jié)數(shù),完全滿足表示消息長度的值。

                介紹完字段后,我們來深度剖析下該類的幾個重要的方法。

                03.1 readFrom()

                public long readFrom(ScatteringByteChannel channel) throws IOException { // 讀取數(shù)據(jù)總大小 int read = 0; // 1.判斷響應(yīng)消息數(shù)據(jù)長度的 ByteBuffer 是否讀完 if (size.hasRemaining()) { // 2.還有剩余,直接讀取消息數(shù)據(jù)的長度 int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); // 3.每次讀取后,累加到總讀取數(shù)據(jù)大小里 read += bytesRead; // 4.判斷響應(yīng)消息數(shù)據(jù)長度的緩存是否讀完了 if (!size.hasRemaining()) { // 5.重置position size.rewind(); // 6.讀取響應(yīng)消息數(shù)據(jù)長度 int receiveSize = size.getInt(); // 7.如果有異常就拋出 if (receiveSize maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); // 8.將讀到數(shù)據(jù)長度賦值已讀取字節(jié)大小,即數(shù)據(jù)體的大小 requestedBufferSize = receiveSize; if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } } // 9.如果數(shù)據(jù)體buffer還沒有分配,且響應(yīng)消息數(shù)據(jù)頭已讀完 if (buffer == null && requestedBufferSize != -1) { // 10.分配requestedBufferSize字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體buffer buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null) log.trace("Broker low on memory – could not allocate buffer of size {} for source {}", requestedBufferSize, source); } // 11.判斷buffer是否分配成功 if (buffer != null) { // 12.把channel里的數(shù)據(jù)讀到buffer中 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); // 13.累計讀取數(shù)據(jù)總大小 read += bytesRead; } // 14. 返回總大小 return read;}

                該方法主要用來 把對應(yīng) channel 中的數(shù)據(jù)讀到 ByteBuffer 中 ,包括響應(yīng)消息數(shù)據(jù)長度的 size 和響應(yīng)消息數(shù)據(jù)體長度的 buffer,可能會被多次調(diào)用,每次都需要判斷 size 和 buffer 的狀態(tài)并讀取。

                在讀取時,先讀取4字節(jié)到 size 中,再根據(jù) size 的大小為 buffer 分配內(nèi)存,然后讀滿整個 buffer 時就表示讀取完成了。

                通過短短的30行左右代碼就解決了工業(yè)級「 拆包 」 、「 粘包 」 問題,相當(dāng)?shù)慕?jīng)典 。

                如果要解決「 粘包 」問題,就是在每個響應(yīng)數(shù)據(jù)中間插入一個特殊的字節(jié)大小的「 分隔符 」,這里就在響應(yīng)消息體前面插入4個字節(jié),代表響應(yīng)消息自己本身的數(shù)據(jù)大小,如下圖所示:

                具體「 拆包 」的操作步驟如下:

              34. 調(diào)用 size.hasRemaining() 返回 position 至 limit 之間的字節(jié)大小 來判斷響應(yīng)消息數(shù)據(jù)長度的 ByteBuffer 是否讀完。
              35. 當(dāng)未讀完則通過調(diào)用 NIO 的方法 channel.read(size), 直接把讀取4字節(jié)的響應(yīng)消息數(shù)據(jù)的長度寫入到 ByteBuffer size 中 ,如果已經(jīng)讀取到了4字節(jié),此時 position=4,與 limit 相同, 表示 ByteBuffer size 已經(jīng)讀滿了 。
              36. 每次讀取后,累加到總讀取數(shù)據(jù)大小里
              37. 再次判斷響應(yīng)消息數(shù)據(jù)長度的緩存是否讀完了。
              38. 如果讀完了,先重置 position 位置為0,此時就可以從 ByteBuffer 中讀取數(shù)據(jù)了,然后 調(diào)用 size.getInt() 從 ByteBuffer 當(dāng)前 position 位置讀取4個字節(jié),并轉(zhuǎn)化成int 類型數(shù)值賦給 receiveSize ,即響應(yīng)體的長度。
              39. 如果有異常就拋出,包括響應(yīng)數(shù)據(jù)體的長度無效或者大于最大長度等。
              40. 將讀到響應(yīng)數(shù)據(jù)長度賦值 requestedBufferSize,即數(shù)據(jù)體的大小。
              41. 如果響應(yīng)數(shù)據(jù)體 buffer 還沒有分配,且響應(yīng)數(shù)據(jù)頭已讀完,分配 requestedBufferSize 字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體 buffer。
              42. 如果 buffer 分配成功, 表示 size 已讀完,此時直接把 channel 里的響應(yīng)數(shù)據(jù)讀到跟它大小一致的 ByteBuffer 中 ,再次累計讀取數(shù)據(jù)總大小。
              43. 最后返回數(shù)據(jù)總大小。
              44. 03.2 complete()

                @Overridepublic boolean complete() { // 響應(yīng)消息頭已讀完 && 響應(yīng)消息體已讀完 return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

                該方法主要用來判斷是否都讀取完成, 即響應(yīng)頭大小和響應(yīng)體大小都讀取完 。

                03.3 size()

                // 返回大小public int size() { return payload().limit() + size.limit();}public ByteBuffer payload() { return this.buffer;}

                該方法主要用來返回 響應(yīng)頭和響應(yīng)體還有多少數(shù)據(jù)需要讀出 。

                此時已經(jīng)剖析完讀 Buffer 的封裝,接下來我們看看寫 Buffer。

                04 NetworkSend 封裝過程

                github 源碼地址如下:

                https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Send.java

                調(diào)用關(guān)系圖如下:

                04.1 Send 接口

                我們先看一下接口 Send 都定義了哪些方法。

                public interface Send { // 要把數(shù)據(jù)寫入目標(biāo)的 channel id String destination(); // 要發(fā)送的數(shù)據(jù)是否發(fā)送完了 boolean completed(); // 把數(shù)據(jù)寫到對應(yīng) channel 中 long writeTo(GatheringByteChannel channel) throws IOException; // 發(fā)送數(shù)據(jù)的大小 long size();}

                Send 作為要發(fā)送數(shù)據(jù)的接口, 子類 ByteBufferSend 實現(xiàn) complete() 方法用于判斷是否已經(jīng)發(fā)送完成,實現(xiàn) writeTo() 方法來實現(xiàn)寫入數(shù)據(jù)到Channel中。

                04.2 ByteBufferSend 類

                ByteBufferSend 類實現(xiàn)了 Send 接口, 即實現(xiàn)了數(shù)據(jù)從 ByteBuffer 數(shù)組發(fā)送到 channel :

                public class ByteBufferSend implements Send { private final String destination; // 總共要寫多少字節(jié)數(shù)據(jù) private final int size; // 用于寫入channel里的ByteBuffer數(shù)組,說明kafka一次最大傳輸字節(jié)是有限定的 protected final ByteBuffer[] buffers; // 總共還剩多少字節(jié)沒有寫完 private int remaining; private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer… buffers) { this.destination = destination; this.buffers = buffers; for (ByteBuffer buffer : buffers) remaining += buffer.remaining(); // 計算需要寫入字節(jié)的總和 this.size = remaining; }}

                我們來看下這個類中的幾個重要字段:

              45. destination:數(shù)據(jù)寫入的目標(biāo) channel id。
              46. size:總共需要往 channel 里寫多少字節(jié)數(shù)據(jù)。
              47. buffers:ByteBuffer數(shù)組類型,用來存儲要寫入 channel 里的數(shù)據(jù)。
              48. remaining:ByteBuffer數(shù)組所有的ByteBuffer 還剩多少字節(jié)沒有寫完。
              49. 介紹完字段后,我們來深度剖析下該類的幾個重要的方法。

                04.2.1 writeTo()

                @Override// 將字節(jié)流數(shù)據(jù)寫入到channel中public long writeTo(GatheringByteChannel channel) throws IOException { // 1.調(diào)用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù) long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); // 2.計算還剩多少字節(jié)沒有寫入傳輸層 remaining -= written; // 每次發(fā)送 都檢查是否 pending = TransportLayers.hasPendingWrites(channel); return written;}

                該方法主要用來 把 buffers 數(shù)組寫入到 SocketChannel里 ,因為在網(wǎng)絡(luò)編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調(diào)用底層 channel.write(buffers) 方法會返回「 已經(jīng)寫入成功多少字節(jié) 」的返回值,這樣調(diào)用一次后就知道已經(jīng)寫入多少字節(jié)了。

                04.2.2 some other

                @Overridepublic String destination() { // 返回對應(yīng)的channel id return destination;}@Overridepublic boolean completed() { // 判斷是否完成 即沒有剩余&pending=false return remaining <= 0 && !pending;}/** * always returns false as there will be not be any * pending writes since we directly write to socketChannel. */@Overridepublic boolean hasPendingWrites() { // 在PLAINTEXT下 pending 始終為 false return false;}@Overridepublic long size() { // 返回寫入字節(jié)的總和 return this.size;}

                04.3 NetworkSend 類

                NetworkSend 類繼承了 ByteBufferSend 類,真正用來寫 Buffer。

                public class NetworkSend extends ByteBufferSend { // 實例化 public NetworkSend(String destination, ByteBuffer buffer) { // 調(diào)用父類的方法初始化 super(destination, sizeBuffer(buffer.remaining()), buffer); } // 用來構(gòu)造4個字節(jié)的 sizeBuffer private static ByteBuffer sizeBuffer(int size) { // 先分配一個4個字節(jié)的ByteBuffer ByteBuffer sizeBuffer = ByteBuffer.allocate(4); // 寫入size長度值 sizeBuffer.putInt(size); // 重置 position sizeBuffer.rewind(); // 返回 sizeBuffer return sizeBuffer; }}

                該類相對簡單些,就是構(gòu)建一個發(fā)往 channel 對應(yīng)的節(jié)點 id 的消息數(shù)據(jù),它的實例化過程如下:

              50. 先分配一個4個字節(jié)的 ByteBuffer 的變量 sizeBuffer,再把要發(fā)送的數(shù)據(jù)長度賦值給 sizeBuffer。
              51. 此時 sizeBuffer 的響應(yīng)頭字節(jié)數(shù)和 sizeBuffer 的響應(yīng)數(shù)據(jù)就都有了。
              52. 然后調(diào)用父類 ByteBufferSend 的方法進行初始化。
              53. 另外 ByteBuffer[] 為兩個 buffer,可以理解為一個消息頭 buffer 即 size,一個消息體 buffer。消息頭 buffer 的長度為4byte,存放的是消息體 buffer 的長度。而消息體 buffer 是上層傳入的業(yè)務(wù)數(shù)據(jù),所以 send 就是持有一個待發(fā)送的 ByteBuffer 。

                接下來我們來看看 KafkaChannel 是如何對上面幾個類進行封裝的。

                05 KafkaChannel 封裝過程

                github 源碼地址如下:

                https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.javapublic class KafkaChannel implements AutoCloseable { …. // 節(jié)點 id private final String id; // 傳輸層對象 private final TransportLayer transportLayer; …. // 最大能接收請求的字節(jié)數(shù) private final int maxReceiveSize; // 內(nèi)存池,用來分配指定大小的 ByteBuffer private final MemoryPool memoryPool; // NetworkReceive 類的實例 private NetworkReceive receive; // NetworkSend 類的實例 private Send send; // 是否關(guān)閉連接 private boolean disconnected; …. // 連接狀態(tài) private ChannelState state; // 需要連接的遠(yuǎn)端地址 private SocketAddress remoteAddress; // 初始化 public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator,int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) { this.id = id; this.transportLayer = transportLayer; this.authenticatorCreator = authenticatorCreator; this.authenticator = authenticatorCreator.get(); this.networkThreadTimeNanos = 0L; this.maxReceiveSize = maxReceiveSize; this.memoryPool = memoryPool; this.metadataRegistry = metadataRegistry; this.disconnected = false; this.muteState = ChannelMuteState.NOT_MUTED; this.state = ChannelState.NOT_CONNECTED; }}

                我們來看下這個類中的幾個重要字段:

              54. id:channel 對應(yīng)的節(jié)點 id。
              55. transportLayer:傳輸層對象。
              56. maxReceiveSize:最大能接收請求的字節(jié)數(shù)。
              57. memoryPool:內(nèi)存池,用來分配指定大小的 ByteBuffer。
              58. receive:NetworkReceive 類的實例。
              59. send:NetworkSend 類的實例。
              60. disconnected:是否關(guān)閉連接。
              61. state:KafkaChannel 的狀態(tài)。
              62. remoteAddress:需要連接的遠(yuǎn)端地址。
              63. 從屬性可以看出, 有3個最重要的成員變量:TransportLayer、NetworkReceive、Send 。KafkaChannel 通過 TransportLayer 進行讀寫操作,NetworkReceive 用來讀取,Send 用來寫出。

                為了封裝普通和加密的Channel「 TransportLayer根據(jù)網(wǎng)絡(luò)協(xié)議的不同,提供不同的子類 」而對于 KafkaChannel 提供統(tǒng)一的接口,「 這是策略模式很好的應(yīng)用 」。

              64. 每個 NetworkReceive 代表一個單獨的響應(yīng),KafkaChannel 讀取的數(shù)據(jù)會存儲到 NetworkReceive 中,當(dāng) NetworkReceive 讀滿,一個請求就完整讀取了。
              65. 每個 Send 代表一個單獨的請求,需要寫出時只需賦值此變量,之后調(diào)用 write() 方法將其中的數(shù)據(jù)寫出。
              66. 介紹完字段后,我們來深度剖析下其 網(wǎng)絡(luò)讀寫操作 是如何實現(xiàn)的?

                05.1 setSend()

                public void setSend(Send send) { if (this.send != null) throw new IllegalStateException(“Attempt to begin a send operation with prior send operation still in progress, connection id is ” + id); // 設(shè)置要發(fā)送消息的字段 this.send = send; // 調(diào)用傳輸層增加寫事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}// PlaintextTransportLayer 類方法@Overridepublic void addInterestOps(int ops) { //通過 key.interestOps() | ops 來添加事件 key.interestOps(key.interestOps() | ops);}

                該方法主要用來 預(yù)發(fā)送,即在發(fā)送網(wǎng)絡(luò)請求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中 ,然后調(diào)用傳輸層方法增加對這個 channel 上「 OP_WRITE 」事件的關(guān)注。當(dāng)真正執(zhí)行發(fā)送的時候,會從 send 中讀取數(shù)據(jù)。

                05.2 write()

                public long write() throws IOException { // 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了 if (send == null) return 0; midWrite = true; // 調(diào)用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去 return send.writeTo(transportLayer);}

                該方法主要用來 把保存在 send 上的數(shù)據(jù)真正發(fā)送出去 。

              67. 首先判斷要發(fā)送的 send 是否為空,如果為空則表示在 KafkaChannel 的 Buffer 的數(shù)據(jù)都發(fā)送完畢了。
              68. 如果不為空就調(diào)用ByteBufferSend.writeTo() 方法通過網(wǎng)絡(luò) I/O 操作將數(shù)據(jù)發(fā)送出去。
              69. 05.3 read()

                public long read() throws IOException { // 如果receive為空表示數(shù)據(jù)已經(jīng)讀完,需要重新實例化對象 if (receive == null) { // 確保分配了 NetworkReceive receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } //如果未讀完,嘗試讀取該對象 long bytesReceived = receive(this.receive); if (this.receive.requiredMemoryAmountKnown() && !this.receive.memoryAllocated() && isInMutableState()) { //pool must be out of memory, mute ourselves. mute(); } return bytesReceived;}

                該方法主要用來 把從網(wǎng)絡(luò)I/O操作中讀出的數(shù)據(jù)保存到 NetworkReceive 中 。

              70. 判斷 receive 是否為空,如果為空 表示上次已讀完 ,需要重新實例化 NetworkReceive 對象。
              71. 如果 receive 不為空, 表示未讀完,此時讀取的還是原先的 NetworkReceive 對象 ,然后再調(diào)用 receive() 方法嘗試把 channel 的數(shù)據(jù)讀到 NetworkReceive 對象中。
              72. 最后返回讀到的字節(jié)數(shù)。
              73. 05.4 maybeCompleteReceive()

                public NetworkReceive maybeCompleteReceive() { if (receive != null && receive.complete()) { receive.payload().rewind(); NetworkReceive result = receive; receive = null; return result; } return null;}// NetworkReceivepublic boolean complete() { return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

                該方法主要用來 判斷數(shù)據(jù)已經(jīng)讀取完畢了 ,而判斷是否讀完的條件是 NetworkReceive 里的 buffer 是否用完 ,包括上面說過的表示響應(yīng)消息頭 size ByteBuffer 和響應(yīng)消息體本身的 buffer ByteBuffer。這兩個都讀完才算真正讀完了。

              74. 當(dāng) buffer 讀完后調(diào)用 rewind 重置 position位置。
              75. 將 receive 賦值給結(jié)果集 result
              76. 此時讀完后將 receive 清空,以便下次讀。
              77. 最后返回結(jié)果集 result,完成一次讀操作。
              78. 05.5 maybeCompleteSend()

                // 可能完成發(fā)送public Send maybeCompleteSend() { if (send != null && send.completed()) { midWrite = false; transportLayer.removeInterestOps(SelectionKey.OP_WRITE); Send result = send; send = null; return result; } return null;}// PlaintextTransportLayer 類方法@Overridepublic void removeInterestOps(int ops) { // 通過 key.interestOps() & ~ops 來刪除事件 key.interestOps(key.interestOps() & ~ops);}// ByteBufferSend@Overridepublic boolean completed() { return remaining <= 0 && !pending;}

                該方法主要用來 是否寫數(shù)據(jù)完畢了 ,而判斷的寫數(shù)據(jù)完畢的條件是 buffer 中沒有剩余且pending為false 。

              79. 當(dāng)寫數(shù)據(jù)完畢后,取消傳輸層對 OP_WRITE 事件的監(jiān)聽,完成一次寫操作。
              80. 將 send 賦值給結(jié)果集 result。
              81. 此時讀完后將 send 清空,以便下次寫。
              82. 最后返回結(jié)果集 result,完成一次寫操作。
              83. 最后我們來聊聊事件注冊和取消的具體時機,以便更好的理解網(wǎng)絡(luò) I/O 操作。

                06 事件注冊與取消時機

                我們知道 Java NIO 是基于 epoll 模型來實現(xiàn)的。所有基于 epoll 的框架,都有3個階段:

              84. 注冊事件(OP_CONNECT, OP_ACCEPT, OP_READ, OP_WRITE)。
              85. 輪詢網(wǎng)絡(luò)I/O是否就緒。
              86. 執(zhí)行實際網(wǎng)絡(luò)I/O操作。
              87. 這里我們來看下相關(guān)事件是何時被注冊和取消的。

                06.1 OP_CONNECT 事件

                06.1.1 OP_CONNECT 事件注冊時機

                在 Selector 發(fā)起網(wǎng)絡(luò)連接的時候進行「 OP_CONNECT 」事件注冊。

                public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { SocketChannel socketChannel = SocketChannel.open(); SelectionKey key = null; try { // 注冊 OP_CONNECT 到 selector 上 key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT); } catch (IOException | RuntimeException e){}}

                06.1.2 OP_CONNECT 事件取消時機

                在 PlainTransportLayer 明文傳輸層完成連接的時候取消 「 OP_CONNECT 」事件。

                public boolean finishConnect() throws IOException { // 刪除連接事件,添加讀事件 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);}

                06.2 OP_READ 事件

                06.2.1 OP_READ 事件注冊時機

                從上面也可以看出,「 OP_READ 」事件的注冊和「 OP_CONNECT 」事件的取消是同時進行的。

                06.2.2 OP_READ 事件取消時機

                由于 「 OP_READ 」事件是要一直監(jiān)聽是否有新數(shù)據(jù)到來,所以不會取消。并且因為是 Java NIO 使用的 「 epoll 的 LT 模式 」,只要「 讀緩沖區(qū) 」有數(shù)據(jù),就會一直觸發(fā)。

                06.3 OP_WRITE 事件

                06.3.1 OP_WRITE 事件注冊時機

                在 KafkaChannel 真正發(fā)送網(wǎng)絡(luò)請求之前注冊「 OP_WRITE 」事件。

                public void setSend(Send send) { // 調(diào)用傳輸層增加寫事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}

                06.3.2 OP_WRITE 事件取消時機

                public Send maybeCompleteSend() { if (send != null && send.completed()) { //完成一次發(fā)送后取消 OP_WRITE 事件 transportLayer.removeInterestOps(SelectionKey.OP_WRITE); }}

                06.4 事件總結(jié)

              88. 對于不同事件類型的「 事件就緒 」:
              89. OP_READ事件就緒:即當(dāng)有新數(shù)據(jù)到來,需要去讀取。由于是基于 LT 模式,只要讀緩沖區(qū)有數(shù)據(jù),會一直觸發(fā)。
              90. OP_WRITE事件就緒:即本地 socketchannel 緩沖區(qū)有沒有寫滿。如果沒有寫滿的話,就會一直觸發(fā)寫事件。所以要避免「 寫的死循環(huán) 」問題,寫完就要取消寫事件。
              91. OP_CONNECT事件就緒: 即 connect 連接完成。
              92. OP_ACCEPT事件就緒:即有新的連接進來,調(diào)用 accept處理。
              93. 不同類型事件處理方式是不一樣的:
              94. OP_CONNECT事件:注冊1次,連接成功之后,就取消了。有且僅有1次。
              95. OP_READ事件:注冊之后不取消,一直監(jiān)聽。
              96. OP_WRITE事件:每調(diào)用一次send,注冊1次。send成功,取消注冊。
              97. 07 總結(jié)

                這里,我們一起來總結(jié)一下這篇文章的重點。

                1、帶你先整體的梳理了 Kafka 對 Java NIO 封裝的組件以及調(diào)用關(guān)系圖。

                2、分別帶你梳理了傳輸層 TransportLayer 的明文網(wǎng)絡(luò)傳輸層的實現(xiàn)、網(wǎng)絡(luò)讀操作 NetworkReceive、網(wǎng)絡(luò)寫操作 NetworkSend 的實現(xiàn)、以及 KafkaChannel 是如何進一步對上面組件進行封裝提供更加友好的網(wǎng)絡(luò)連接、讀寫操作的。

                3、最后剖析了網(wǎng)絡(luò) I/O 操作過程中的事件注冊和取消時機。

                鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
                用戶投稿
                上一篇 2022年6月23日 06:23
                下一篇 2022年6月23日 06:23

                相關(guān)推薦

                聯(lián)系我們

                聯(lián)系郵箱:admin#wlmqw.com
                工作時間:周一至周五,10:30-18:30,節(jié)假日休息