千问开源大模型
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

65 lines
1.9 KiB

# coding=utf-8
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
import traceback
import time
import traceback
import datetime
import queue
from logUtil import get_logger
"""
写到kafka
"""
def kafkaProduce(topic,resultData,address):
producer = KafkaProducer(bootstrap_servers = '{}'.format(address),request_timeout_ms=120000)
topics = topic.split(',')
for tc in topics:
future = producer.send(tc,resultData)
result = future.get(timeout=60)
producer.flush()
print (result)
#写入文件
def writeTxt(filePath,result):
f = open(filePath,'a',encoding='utf-8')
f.write(result.encode('utf-8').decode('unicode_escape')+'\n')
f.close
def KafkaConsume(topic,address,group_id,task_queue,logger):
'''
监控kafka,读取数据写到任务队列
:param topic:
:param address:
:param group_id:
:param task_queue:
:return:
'''
try:
consumer = KafkaConsumer(topic, auto_offset_reset='earliest',fetch_max_bytes=1024768000,fetch_max_wait_ms=5000, bootstrap_servers=address,group_id = group_id)
i = 1
while True:
for msg in consumer:
print('第{}条数据'.format(i))
data = str(msg.value, encoding = "utf-8")
print(data)
task_queue.put(data)
i = i+1
else:
print('暂无任务------')
time.sleep(10)
except Exception as e:
print('kafka未知异常----')
traceback.print_exc()
def writeTxt(filePath,result):
f = open(filePath,'a')
f.write(result+'\n')
f.close
if __name__ == '__main__':
resultData = {'id': '中文', 'url': 'https://zh.wikipedia.org/zh/%E8%94%A1%E8%8B%B1%E6%96%87'}
kafkaProduce('test', json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),'172.26.28.30:9092')
#task_queue = queue.Queue()
#KafkaConsume('fq-Taobao-eccontent','39.129.129.172:6666,39.129.129.172:6668,39.129.129.172:6669,39.129.129.172:6670,39.129.129.172:6671','news_sche_8',task_queue,logger)
# KafkaConsume('zxbnewstopic','120.133.14.71:9992','group3',task_queue,logger)