You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
#coding:utf8 import traceback import json from kafka import KafkaProducer from text_analysis.read_config import load_config config=load_config()
def send_kafka(data,logging): try: producer = None topic = config["kafka"]["topic"] data1=json.dumps(data,ensure_ascii=False) kafkaProduce(topic,bytes(data1, encoding='utf-8')) logging.info("数据推入kafka!")
except Exception as e: logging.info(traceback.format_exc()) logging.info('写入kafka失败')
def kafkaProduce(topic,resultData): producer = KafkaProducer(bootstrap_servers = '{}'.format(config["kafka"]["bootstrap_servers"]),max_request_size=52428800) topics = topic.split(',') for tc in topics: future = producer.send(tc,resultData) producer.flush()
|