/*
 * Decompiled with CFR 0.152.
 */
package com.bfd.crawler.kafka7;

import com.bfd.crawler.utils.ConfigUtils;
import com.bfd.crawler.utils.MyDateUtil;
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.log4j.Logger;

public class KfkConsumer {
    public static final Logger LOG = Logger.getLogger(KfkConsumer.class);
    public static volatile boolean runFlag = true;

    private static ConsumerConfig createConsumerConfig(String groupId) {
        Properties props = new Properties();
        ConfigUtils.getInstance().readFile(new File("../etc/crawl-config.properties"));
        props.put("zookeeper.connect", ConfigUtils.getInstance().getProp("crawl.kfk.zkConnect", "172.18.1.115:24002/kafka"));
        LOG.info((Object)("zk.connect is " + props.getProperty("zk.connect")));
        props.put("group.id", groupId);
        LOG.info((Object)("groupid is " + groupId));
        props.put("auto.offset.reset", ConfigUtils.getInstance().getProp("auto.offset.reset", "smallest"));
        return new ConsumerConfig(props);
    }

    public static Map<String, Integer> getTopicMap(String topic, int threadNumPerTopic) {
        HashMap<String, Integer> topicMap = new HashMap<String, Integer>();
        topicMap.put(topic, threadNumPerTopic);
        return topicMap;
    }

    private static int getThreadPoolSize(Map<String, Integer> topicCountMap) {
        int threadPoolSize = 0;
        for (Map.Entry<String, Integer> entry : topicCountMap.entrySet()) {
            threadPoolSize += entry.getValue().intValue();
        }
        return threadPoolSize;
    }

    public static void startMonitor(final CountDownLatch cdLatch, final BlockingQueue<String> queue, final String topic, final int threadNumPerTopic, final String groupId, final ExecutorService executor) {
        new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    System.out.println("monitor thread name:" + Thread.currentThread().getName());
                    cdLatch.await();
                    System.out.println("all thread stop countdonwlatch is 0.runFlag is " + runFlag);
                    executor.shutdownNow();
                    if (runFlag) {
                        KfkConsumer.startReadThread(queue, topic, threadNumPerTopic, groupId);
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public static void startReadThread(final BlockingQueue<String> queue, String topic, int threadNumPerTopic, String groupId) {
        Map<String, Integer> topicCountMap = KfkConsumer.getTopicMap(topic, threadNumPerTopic);
        int threadPoolSize = KfkConsumer.getThreadPoolSize(topicCountMap);
        LOG.info((Object)("threadPoolSize is " + threadPoolSize));
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)KfkConsumer.createConsumerConfig(groupId));
        Map consumerMap = consumer.createMessageStreams(topicCountMap);
        Iterator it = consumerMap.entrySet().iterator();
        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
        final CountDownLatch cdLatch = new CountDownLatch(threadPoolSize);
        KfkConsumer.startMonitor(cdLatch, queue, topic, threadNumPerTopic, groupId, executor);
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            final String key = (String)entry.getKey();
            List value = (List)entry.getValue();
            for (final KafkaStream ks : value) {
                executor.execute(new Runnable(){

                    public String getThreadName() {
                        return Thread.currentThread().getName();
                    }

                    @Override
                    public void run() {
                        LOG.info((Object)("begin run method !thread name is " + Thread.currentThread().getName() + ".topic is " + key));
                        ConsumerIterator it = ks.iterator();
                        String temp = "";
                        long getMessageNums = 0L;
                        long todayGetMessageNum = 0L;
                        String dateStr = MyDateUtil.getStr((Date)new Date(), (String)"yyyy-MM-dd");
                        long beginTime = new Date().getTime();
                        while (it.hasNext() && runFlag) {
                            try {
                                temp = new String((byte[])it.next().message());
                                try {
                                    queue.put(temp);
                                }
                                catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                ++todayGetMessageNum;
                                if (!MyDateUtil.getStr((Date)new Date(), (String)"yyyy-MM-dd").equals(dateStr)) {
                                    todayGetMessageNum = 0L;
                                    dateStr = MyDateUtil.getStr((Date)new Date(), (String)"yyyy-MM-dd");
                                }
                                ++getMessageNums;
                                if (new Date().getTime() - beginTime <= 100000L) continue;
                                LOG.info((Object)(this.getThreadName() + ".topic is " + key + ".last 100 second get message nums is " + getMessageNums));
                                LOG.info((Object)(this.getThreadName() + ".today get message total is " + todayGetMessageNum));
                                getMessageNums = 0L;
                                beginTime = new Date().getTime();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        cdLatch.countDown();
                        LOG.info((Object)("threadname is " + Thread.currentThread().getName() + " thread stop!"));
                    }
                });
            }
        }
    }

    public static void stopKfkConsumerThreads() {
        runFlag = false;
    }

    public static void main(String[] args) {
        LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<String>();
        KfkConsumer.startReadThread(queue, "fuwenchaotest2", 1, args[0]);
        while (true) {
            try {
                while (true) {
                    LOG.info(queue.take());
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                continue;
            }
            break;
        }
    }
}

