#coding:utf8 import traceback from pykafka import KafkaClient # from pykafka import partitioners # from pykafka.simpleconsumer import OwnedPartition, OffsetType import json from tqdm import tqdm # from kafka import KafkaProducer from pykafka.simpleconsumer import OwnedPartition, OffsetType def send_kafka(data,logging): try: producer = None # client = KafkaClient(hosts='172.26.28.30:9092', socket_timeout_ms=10 * 1000) topic = 'analyze' # producer = client.topics[topic].get_sync_producer(**{'max_request_size': 3000012 * 5}) #producer = client.topics[topic].get_producer(sync=True) client = KafkaClient(hosts='172.26.28.30:9092', socket_timeout_ms=10 * 1000) # topic = client.topics['analyze'] producer = client.topics[topic].get_producer() data1=json.dumps(data,ensure_ascii=False) producer.produce(bytes(data1, encoding='utf-8')) # kafkaProduce(topic,bytes(data1, encoding='utf-8')) logging.info("数据推入kafka!") except Exception as e: logging.info(traceback.format_exc()) logging.info('写入kafka失败') # def kafkaProduce(topic,resultData): # producer = KafkaProducer(bootstrap_servers = '{}'.format("172.26.28.30:9092")) # topics = topic.split(',') # for tc in topics: # future = producer.send(tc,resultData) # producer.flush() def consumer(): # topic = 'ais_caiji_kg_210'.encode('utf-8') # client = KafkaClient(hosts='172.16.3.153:9092,172.16.3.154:9092,172.16.3.155:9092') # topic = 'test_mysql_topic'.encode('utf-8') # client = KafkaClient(hosts='localhost:9092') # topic = client.topics[topic] # consumer = topic.get_simple_consumer(consumer_group='test1', # auto_commit_enable=True, # 去重消费 # auto_commit_interval_ms=1000, # # consumer_id='test1', # 消费者ID # reset_offset_on_start=True, # # auto_offset_reset=OffsetType.LATEST, # consumer_timeout_ms=100000) # c = 0 # for msg in consumer: # c += 1 # if msg: # val = msg.value.decode('utf-8') # print(c,val) # client = KafkaClient(hosts='localhost:9092') # topic = client.topics['test_mysql_topic'] client = KafkaClient(hosts='172.26.28.30:9092') topic = client.topics['analyze'] consumer = topic.get_simple_consumer(consumer_group='my_consumer_group', auto_offset_reset=OffsetType.LATEST, reset_offset_on_start=True) # 消费数据 for message in consumer: if message is not None: print(message.offset, message.value.decode()) if __name__=="__main__": # send_kafka() consumer()