/*
 * Decompiled with CFR 0.152.
 */
package com.bfd.crawler.utils.kafka.reader;

import com.bfd.crawler.utils.MyDateUtil;
import com.bfd.crawler.utils.kafka.reader.ConsumerInstance;
import com.bfd.dp.kfk.util.LoadConfig;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.log4j.Logger;

public class TopicReader
implements Runnable {
    private volatile boolean flag = true;
    public static final Logger LOG = Logger.getLogger(TopicReader.class);
    private String topic = null;
    private String groupid = null;
    private ArrayBlockingQueue<String> kfkqueue;
    private ConsumerInstance consumer = new ConsumerInstance();

    public TopicReader() {
    }

    public TopicReader(String kafkaTopic, ArrayBlockingQueue<String> kfkqueue) {
        this.topic = kafkaTopic;
        this.groupid = LoadConfig.getInstance().getPro("../etc/config").getProperty("DSP.topicreader.groupid", "DSPkafkacrawl");
        this.kfkqueue = kfkqueue;
    }

    @Override
    public void run() {
        LOG.info((Object)("groupId is " + this.groupid));
        LOG.info((Object)(Thread.currentThread().getName() + ": work on topic " + this.topic));
        this.readTopic();
    }

    private String getThreadName() {
        return Thread.currentThread().getName();
    }

    private void readTopic() {
        LOG.info((Object)(this.getThreadName() + "." + this.topic + " execute readTopic method!"));
        ConsumerInstance.RetInfo retInfo = null;
        long getMessageNums = 0L;
        long todayGetMessageNum = 0L;
        String dateStr = MyDateUtil.getStr(new Date(), "yyyy-MM-dd");
        long beginTime = new Date().getTime();
        while (this.flag) {
            try {
                retInfo = this.consumer.get(this.topic, this.groupid);
            }
            catch (Exception e) {
                LOG.error((Object)("get topic:" + this.topic + " message error!"), (Throwable)e);
                e.printStackTrace();
            }
            LOG.info((Object)(Thread.currentThread().getName() + "." + this.topic + " get one kfk message. " + retInfo.message));
            ++todayGetMessageNum;
            if (!MyDateUtil.getStr(new Date(), "yyyy-MM-dd").equals(dateStr)) {
                todayGetMessageNum = 0L;
                dateStr = MyDateUtil.getStr(new Date(), "yyyy-MM-dd");
            }
            if (retInfo.ret == 1) {
                String jsonStr = retInfo.message;
                try {
                    this.kfkqueue.put(jsonStr);
                }
                catch (Exception e) {
                    LOG.error((Object)(Thread.currentThread().getName() + "\tputqueue error,data=" + jsonStr), (Throwable)e);
                }
            } else if (retInfo.ret == 0 && this.flag) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    LOG.error((Object)(Thread.currentThread().getName() + "\tsleep 1s error"));
                }
            } else if (retInfo.ret == -1) {
                LOG.error((Object)(Thread.currentThread().getName() + "\terror occurs when reading from kafka"));
            }
            ++getMessageNums;
            if (new Date().getTime() - beginTime <= 100000L) continue;
            LOG.info((Object)(this.getThreadName() + ".topic is " + this.topic + ".last 100 second get message nums is " + getMessageNums));
            LOG.info((Object)(this.getThreadName() + ".today get message total is " + todayGetMessageNum));
            getMessageNums = 0L;
            beginTime = new Date().getTime();
        }
        this.consumer.close();
        LOG.warn((Object)(Thread.currentThread().getName() + " exit!"));
    }

    public void stop() {
        this.flag = false;
    }
}

