You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
65 lines
1.9 KiB
65 lines
1.9 KiB
# coding=utf-8
|
|
from kafka import KafkaProducer
|
|
from kafka import KafkaConsumer
|
|
import json
|
|
import traceback
|
|
import time
|
|
import traceback
|
|
import datetime
|
|
import queue
|
|
from logUtil import get_logger
|
|
"""
|
|
写到kafka
|
|
"""
|
|
def kafkaProduce(topic,resultData,address):
|
|
producer = KafkaProducer(bootstrap_servers = '{}'.format(address),request_timeout_ms=120000)
|
|
topics = topic.split(',')
|
|
for tc in topics:
|
|
future = producer.send(tc,resultData)
|
|
result = future.get(timeout=60)
|
|
producer.flush()
|
|
print (result)
|
|
|
|
#写入文件
|
|
def writeTxt(filePath,result):
|
|
f = open(filePath,'a',encoding='utf-8')
|
|
f.write(result.encode('utf-8').decode('unicode_escape')+'\n')
|
|
f.close
|
|
|
|
def KafkaConsume(topic,address,group_id,task_queue,logger):
|
|
'''
|
|
监控kafka,读取数据写到任务队列
|
|
:param topic:
|
|
:param address:
|
|
:param group_id:
|
|
:param task_queue:
|
|
:return:
|
|
'''
|
|
try:
|
|
consumer = KafkaConsumer(topic, auto_offset_reset='earliest',fetch_max_bytes=1024768000,fetch_max_wait_ms=5000, bootstrap_servers=address,group_id = group_id)
|
|
i = 1
|
|
while True:
|
|
for msg in consumer:
|
|
print('第{}条数据'.format(i))
|
|
data = str(msg.value, encoding = "utf-8")
|
|
print(data)
|
|
task_queue.put(data)
|
|
i = i+1
|
|
else:
|
|
print('暂无任务------')
|
|
time.sleep(10)
|
|
except Exception as e:
|
|
print('kafka未知异常----')
|
|
traceback.print_exc()
|
|
|
|
def writeTxt(filePath,result):
|
|
f = open(filePath,'a')
|
|
f.write(result+'\n')
|
|
f.close
|
|
|
|
if __name__ == '__main__':
|
|
resultData = {'id': '中文', 'url': 'https://zh.wikipedia.org/zh/%E8%94%A1%E8%8B%B1%E6%96%87'}
|
|
kafkaProduce('test', json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),'172.26.28.30:9092')
|
|
#task_queue = queue.Queue()
|
|
#KafkaConsume('fq-Taobao-eccontent','39.129.129.172:6666,39.129.129.172:6668,39.129.129.172:6669,39.129.129.172:6670,39.129.129.172:6671','news_sche_8',task_queue,logger)
|
|
# KafkaConsume('zxbnewstopic','120.133.14.71:9992','group3',task_queue,logger)
|