查询知识库应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

25 lines
804 B

  1. #coding:utf8
  2. import traceback
  3. import json
  4. from kafka import KafkaProducer
  5. from text_analysis.read_config import load_config
  6. config=load_config()
  7. def send_kafka(data,logging):
  8. try:
  9. producer = None
  10. topic = config["kafka"]["topic"]
  11. data1=json.dumps(data,ensure_ascii=False)
  12. kafkaProduce(topic,bytes(data1, encoding='utf-8'))
  13. logging.info("数据推入kafka!")
  14. except Exception as e:
  15. logging.info(traceback.format_exc())
  16. logging.info('写入kafka失败')
  17. def kafkaProduce(topic,resultData):
  18. producer = KafkaProducer(bootstrap_servers = '{}'.format(config["kafka"]["bootstrap_servers"]),max_request_size=52428800)
  19. topics = topic.split(',')
  20. for tc in topics:
  21. future = producer.send(tc,resultData)
  22. producer.flush()