You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
#coding:utf8 from collections import deque from kafka import KafkaProducer import json from config_loader import load_config
# 加载配置 config = load_config()
# 创建全局双向队列实例 task_queue = deque()
def add_task(task): """将任务添加到队列尾部
"""
task_queue.append(task)
def add_task_left(task): """将任务添加到队列头部
"""
task_queue.appendleft(task)
def get_task(): """从队列头部获取任务
"""
return task_queue.popleft()
def get_task_right(): """从队列尾部获取任务
"""
return task_queue.pop()
def get_size(): """获取队列长度
:return: """
return len(task_queue)
def task_done(result): """标记任务完成并将结果发送到 Kafka""" # 初始化 Kafka 生产者 producer = KafkaProducer( bootstrap_servers=config['kafka']['bootstrap_servers'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) producer.send(config['kafka']['topic'], result) producer.flush() producer.close()
|