千问开源大模型
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

65 lines
1.9 KiB

  1. # coding=utf-8
  2. from kafka import KafkaProducer
  3. from kafka import KafkaConsumer
  4. import json
  5. import traceback
  6. import time
  7. import traceback
  8. import datetime
  9. import queue
  10. from logUtil import get_logger
  11. """
  12. kafka
  13. """
  14. def kafkaProduce(topic,resultData,address):
  15. producer = KafkaProducer(bootstrap_servers = '{}'.format(address),request_timeout_ms=120000)
  16. topics = topic.split(',')
  17. for tc in topics:
  18. future = producer.send(tc,resultData)
  19. result = future.get(timeout=60)
  20. producer.flush()
  21. print (result)
  22. #写入文件
  23. def writeTxt(filePath,result):
  24. f = open(filePath,'a',encoding='utf-8')
  25. f.write(result.encode('utf-8').decode('unicode_escape')+'\n')
  26. f.close
  27. def KafkaConsume(topic,address,group_id,task_queue,logger):
  28. '''
  29. kafka
  30. :param topic:
  31. :param address:
  32. :param group_id:
  33. :param task_queue:
  34. :return:
  35. '''
  36. try:
  37. consumer = KafkaConsumer(topic, auto_offset_reset='earliest',fetch_max_bytes=1024768000,fetch_max_wait_ms=5000, bootstrap_servers=address,group_id = group_id)
  38. i = 1
  39. while True:
  40. for msg in consumer:
  41. print('第{}条数据'.format(i))
  42. data = str(msg.value, encoding = "utf-8")
  43. print(data)
  44. task_queue.put(data)
  45. i = i+1
  46. else:
  47. print('暂无任务------')
  48. time.sleep(10)
  49. except Exception as e:
  50. print('kafka未知异常----')
  51. traceback.print_exc()
  52. def writeTxt(filePath,result):
  53. f = open(filePath,'a')
  54. f.write(result+'\n')
  55. f.close
  56. if __name__ == '__main__':
  57. resultData = {'id': '中文', 'url': 'https://zh.wikipedia.org/zh/%E8%94%A1%E8%8B%B1%E6%96%87'}
  58. kafkaProduce('test', json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),'172.26.28.30:9092')
  59. #task_queue = queue.Queue()
  60. #KafkaConsume('fq-Taobao-eccontent','39.129.129.172:6666,39.129.129.172:6668,39.129.129.172:6669,39.129.129.172:6670,39.129.129.172:6671','news_sche_8',task_queue,logger)
  61. # KafkaConsume('zxbnewstopic','120.133.14.71:9992','group3',task_queue,logger)