中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

python如何實(shí)現(xiàn)對(duì)kafka的基本操作

這篇文章主要為大家展示了“python如何實(shí)現(xiàn)對(duì)kafka的基本操作”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“python如何實(shí)現(xiàn)對(duì)kafka的基本操作”這篇文章吧。

成都創(chuàng)新互聯(lián)是一家專業(yè)的成都網(wǎng)站建設(shè)公司,我們專注網(wǎng)站建設(shè)、網(wǎng)站制作、網(wǎng)絡(luò)營銷、企業(yè)網(wǎng)站建設(shè),賣鏈接,廣告投放為企業(yè)客戶提供一站式建站解決方案,能帶給客戶新的互聯(lián)網(wǎng)理念。從網(wǎng)站結(jié)構(gòu)的規(guī)劃UI設(shè)計(jì)到用戶體驗(yàn)提高,創(chuàng)新互聯(lián)力求做到盡善盡美。

-- coding:utf-8 --

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生產(chǎn)者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一個(gè)消費(fèi)者消費(fèi)一個(gè)topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #獲取test主題的分區(qū)信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當(dāng)前消費(fèi)者訂閱的主題
print consumer.assignment()  #獲取當(dāng)前消費(fèi)者topic、分區(qū)信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當(dāng)前消費(fèi)者可消費(fèi)的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,從第1個(gè)偏移量消費(fèi)
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一個(gè)消費(fèi)者訂閱多個(gè)topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #訂閱要消費(fèi)的主題
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當(dāng)前主題的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""消費(fèi)者(手動(dòng)拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #從kafka獲取消息
        if message:
        print message
        time.sleep(1)

def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

以上是“python如何實(shí)現(xiàn)對(duì)kafka的基本操作”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

文章題目:python如何實(shí)現(xiàn)對(duì)kafka的基本操作
標(biāo)題路徑:http://www.rwnh.cn/article28/jdcgjp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗(yàn)、自適應(yīng)網(wǎng)站、網(wǎng)站設(shè)計(jì)、電子商務(wù)、建站公司、品牌網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)
紫阳县| 佳木斯市| 简阳市| 黎川县| 东兰县| 永平县| 任丘市| 瑞昌市| 藁城市| 新闻| 鄄城县| 项城市| 威信县| 惠水县| 梁山县| 德令哈市| 文山县| 双牌县| 乐东| 石台县| 福鼎市| 武夷山市| 大名县| 大埔县| 台南市| 区。| 湘西| 金湖县| 临清市| 达拉特旗| 涡阳县| 清徐县| 舒城县| 琼海市| 冕宁县| 苏尼特右旗| 饶平县| 红原县| 巢湖市| 河北区| 南乐县|