# coding=utf-8 from kafka import KafkaProducer from kafka import KafkaConsumer import json import traceback import time import traceback import datetime import queue from logUtil import get_logger logger = get_logger("crawlWebsrcCode.log") """ 写到kafka """ def kafkaProduce(topic,resultData,address): producer = KafkaProducer(bootstrap_servers = '{}'.format(address),request_timeout_ms=120000) topics = topic.split(',') for tc in topics: future = producer.send(tc,resultData) result = future.get(timeout=60) producer.flush() print (result) #写入文件 def writeTxt(filePath,result): f = open(filePath,'a',encoding='utf-8') f.write(result.encode('utf-8').decode('unicode_escape')+'\n') f.close def KafkaConsume(topic,address,group_id,task_queue,logger): ''' 监控kafka,读取数据写到任务队列 :param topic: :param address: :param group_id: :param task_queue: :return: ''' try: consumer = KafkaConsumer(topic, auto_offset_reset='earliest',fetch_max_bytes=1024768000,fetch_max_wait_ms=5000, bootstrap_servers=address,group_id = group_id) i = 1 while True: for msg in consumer: print('第{}条数据'.format(i)) data = str(msg.value, encoding = "utf-8") print(data) task_queue.put(data) i = i+1 else: print('暂无任务------') time.sleep(10) except Exception as e: print('kafka未知异常----') traceback.print_exc() def writeTxt(filePath,result): f = open(filePath,'a') f.write(result+'\n') f.close if __name__ == '__main__': # resultData = {'id': '中文', 'url': 'https://zh.wikipedia.org/zh/%E8%94%A1%E8%8B%B1%E6%96%87'} # kafkaProduce('test', json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),'121.4.41.194:8008') task_queue = queue.Queue() KafkaConsume('fq-Taobao-eccontent','39.129.129.172:6666,39.129.129.172:6668,39.129.129.172:6669,39.129.129.172:6670,39.129.129.172:6671','news_sche_8',task_queue,logger) # KafkaConsume('zxbnewstopic','120.133.14.71:9992','group3',task_queue,logger)