chroma新增、删除、知识库应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

67 lines
2.0 KiB

  1. # coding=utf-8
  2. from kafka import KafkaProducer
  3. from kafka import KafkaConsumer
  4. import json
  5. import traceback
  6. import time
  7. import traceback
  8. import datetime
  9. import queue
  10. from logUtil import get_logger
  11. logger = get_logger("crawlWebsrcCode.log")
  12. """
  13. kafka
  14. """
  15. def kafkaProduce(topic,resultData,address):
  16. producer = KafkaProducer(bootstrap_servers = '{}'.format(address),request_timeout_ms=120000)
  17. topics = topic.split(',')
  18. for tc in topics:
  19. future = producer.send(tc,resultData)
  20. result = future.get(timeout=60)
  21. producer.flush()
  22. print (result)
  23. #写入文件
  24. def writeTxt(filePath,result):
  25. f = open(filePath,'a',encoding='utf-8')
  26. f.write(result.encode('utf-8').decode('unicode_escape')+'\n')
  27. f.close
  28. def KafkaConsume(topic,address,group_id,task_queue,logger):
  29. '''
  30. kafka
  31. :param topic:
  32. :param address:
  33. :param group_id:
  34. :param task_queue:
  35. :return:
  36. '''
  37. try:
  38. consumer = KafkaConsumer(topic, auto_offset_reset='earliest',fetch_max_bytes=1024768000,fetch_max_wait_ms=5000, bootstrap_servers=address,group_id = group_id)
  39. i = 1
  40. while True:
  41. for msg in consumer:
  42. print('第{}条数据'.format(i))
  43. data = str(msg.value, encoding = "utf-8")
  44. print(data)
  45. task_queue.put(data)
  46. i = i+1
  47. else:
  48. print('暂无任务------')
  49. time.sleep(10)
  50. except Exception as e:
  51. print('kafka未知异常----')
  52. traceback.print_exc()
  53. def writeTxt(filePath,result):
  54. f = open(filePath,'a')
  55. f.write(result+'\n')
  56. f.close
  57. if __name__ == '__main__':
  58. # resultData = {'id': '中文', 'url': 'https://zh.wikipedia.org/zh/%E8%94%A1%E8%8B%B1%E6%96%87'}
  59. # kafkaProduce('test', json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),'121.4.41.194:8008')
  60. task_queue = queue.Queue()
  61. KafkaConsume('fq-Taobao-eccontent','39.129.129.172:6666,39.129.129.172:6668,39.129.129.172:6669,39.129.129.172:6670,39.129.129.172:6671','news_sche_8',task_queue,logger)
  62. # KafkaConsume('zxbnewstopic','120.133.14.71:9992','group3',task_queue,logger)