這篇文章主要為大家展示了“python如何實(shí)現(xiàn)對(duì)kafka的基本操作”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“python如何實(shí)現(xiàn)對(duì)kafka的基本操作”這篇文章吧。
成都創(chuàng)新互聯(lián)是一家專業(yè)的成都網(wǎng)站建設(shè)公司,我們專注網(wǎng)站建設(shè)、網(wǎng)站制作、網(wǎng)絡(luò)營銷、企業(yè)網(wǎng)站建設(shè),賣鏈接,廣告投放為企業(yè)客戶提供一站式建站解決方案,能帶給客戶新的互聯(lián)網(wǎng)理念。從網(wǎng)站結(jié)構(gòu)的規(guī)劃UI設(shè)計(jì)到用戶體驗(yàn)提高,創(chuàng)新互聯(lián)力求做到盡善盡美。
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
"""生產(chǎn)者""" def produce(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) for i in range(4): msg = "msg%d" %i producer.send(self.topic,key=str(i),value=msg) producer.close() """一個(gè)消費(fèi)者消費(fèi)一個(gè)topic""" def consume(self): #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers) print consumer.partitions_for_topic(self.topic) #獲取test主題的分區(qū)信息 print consumer.topics() #獲取主題列表 print consumer.subscription() #獲取當(dāng)前消費(fèi)者訂閱的主題 print consumer.assignment() #獲取當(dāng)前消費(fèi)者topic、分區(qū)信息 print consumer.beginning_offsets(consumer.assignment()) #獲取當(dāng)前消費(fèi)者可消費(fèi)的偏移量 consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) #重置偏移量,從第1個(gè)偏移量消費(fèi) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value)) """一個(gè)消費(fèi)者訂閱多個(gè)topic """ def consume2(self): consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) #訂閱要消費(fèi)的主題 print consumer.topics() print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當(dāng)前主題的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) """消費(fèi)者(手動(dòng)拉取消息)""" def consume3(self): consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) while True: message = consumer.poll(timeout_ms=5) #從kafka獲取消息 if message: print message time.sleep(1)
def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()
以上是“python如何實(shí)現(xiàn)對(duì)kafka的基本操作”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
文章題目:python如何實(shí)現(xiàn)對(duì)kafka的基本操作
標(biāo)題路徑:http://www.rwnh.cn/article28/jdcgjp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗(yàn)、自適應(yīng)網(wǎng)站、網(wǎng)站設(shè)計(jì)、電子商務(wù)、建站公司、品牌網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)