#coding:utf8 import traceback import json from kafka import KafkaProducer from text_analysis.read_config import load_config config=load_config() def send_kafka(data,logging): try: producer = None topic = config["kafka"]["topic"] data1=json.dumps(data,ensure_ascii=False) kafkaProduce(topic,bytes(data1, encoding='utf-8')) logging.info("数据推入kafka!") except Exception as e: logging.info(traceback.format_exc()) logging.info('写入kafka失败') def kafkaProduce(topic,resultData): producer = KafkaProducer(bootstrap_servers = '{}'.format(config["kafka"]["bootstrap_servers"]),max_request_size=52428800) topics = topic.split(',') for tc in topics: future = producer.send(tc,resultData) producer.flush()