Browse Source

kafka应用

master
55007 6 months ago
commit
705a25c122
  1. 33
      .gitignore
  2. 1
      README.md
  3. 308
      mvnw
  4. 205
      mvnw.cmd
  5. 119
      pom.xml
  6. 13
      src/main/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplication.java
  7. 61
      src/main/java/com/bfd/crawl/kafkahandler/bean/ResponsePo.java
  8. 63
      src/main/java/com/bfd/crawl/kafkahandler/config/AsyncThreadConfiguration.java
  9. 60
      src/main/java/com/bfd/crawl/kafkahandler/config/Constant.java
  10. 25
      src/main/java/com/bfd/crawl/kafkahandler/config/ZookeeperConfig.java
  11. 53
      src/main/java/com/bfd/crawl/kafkahandler/controller/HandlerDataController.java
  12. 32
      src/main/java/com/bfd/crawl/kafkahandler/enums/ResponseCode.java
  13. 160
      src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandler.java
  14. 73
      src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandlerService.java
  15. 93
      src/main/java/com/bfd/crawl/kafkahandler/service/ProducterHandlerService.java
  16. 102
      src/main/java/com/bfd/crawl/kafkahandler/service/SchHandlerService.java
  17. 61
      src/main/java/com/bfd/crawl/kafkahandler/service/SendService.java
  18. 60
      src/main/java/com/bfd/crawl/kafkahandler/service/StartServcie.java
  19. 66
      src/main/java/com/bfd/crawl/kafkahandler/service/TimeOutHandlerService.java
  20. 66
      src/main/java/com/bfd/crawl/kafkahandler/service/ZookeeperNodeMonitor.java
  21. 65
      src/main/java/com/bfd/crawl/kafkahandler/util/DataUtil.java
  22. 16
      src/main/java/com/bfd/crawl/kafkahandler/util/QueueUtil.java
  23. 147
      src/main/java/com/bfd/crawl/kafkahandler/util/StringUtil.java
  24. 57
      src/main/resources/application.yml
  25. 36
      src/main/resources/logback-spring.xml
  26. 13
      src/test/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplicationTests.java

33
.gitignore

@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

1
README.md

@ -0,0 +1 @@
kafka读取、写入应用

308
mvnw

@ -0,0 +1,308 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Apache Maven Wrapper startup batch script, version 3.2.0
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /usr/local/etc/mavenrc ] ; then
. /usr/local/etc/mavenrc
fi
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "$(uname)" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
else
JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=$(java-config --jre-home)
fi
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
[ -n "$CLASSPATH" ] &&
CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
fi
# For Mingw, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] &&
JAVA_HOME="$(cd "$JAVA_HOME" || (echo "cannot cd into $JAVA_HOME."; exit 1); pwd)"
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="$(which javac)"
if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=$(which readlink)
if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then
if $darwin ; then
javaHome="$(dirname "\"$javaExecutable\"")"
javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac"
else
javaExecutable="$(readlink -f "\"$javaExecutable\"")"
fi
javaHome="$(dirname "\"$javaExecutable\"")"
javaHome=$(expr "$javaHome" : '\(.*\)/bin')
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=$(cd "$wdir/.." || exit 1; pwd)
fi
# end of workaround
done
printf '%s' "$(cd "$basedir" || exit 1; pwd)"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
# Remove \r in case we run on Windows within Git Bash
# and check out the repository with auto CRLF management
# enabled. Otherwise, we may read lines that are delimited with
# \r\n and produce $'-Xarg\r' rather than -Xarg due to word
# splitting rules.
tr -s '\r\n' ' ' < "$1"
fi
}
log() {
if [ "$MVNW_VERBOSE" = true ]; then
printf '%s\n' "$1"
fi
}
BASE_DIR=$(find_maven_basedir "$(dirname "$0")")
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}; export MAVEN_PROJECTBASEDIR
log "$MAVEN_PROJECTBASEDIR"
##########################################################################################
# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
# This allows using the maven wrapper in projects that prohibit checking in binary data.
##########################################################################################
wrapperJarPath="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar"
if [ -r "$wrapperJarPath" ]; then
log "Found $wrapperJarPath"
else
log "Couldn't find $wrapperJarPath, downloading it ..."
if [ -n "$MVNW_REPOURL" ]; then
wrapperUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
else
wrapperUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
fi
while IFS="=" read -r key value; do
# Remove '\r' from value to allow usage on windows as IFS does not consider '\r' as a separator ( considers space, tab, new line ('\n'), and custom '=' )
safeValue=$(echo "$value" | tr -d '\r')
case "$key" in (wrapperUrl) wrapperUrl="$safeValue"; break ;;
esac
done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties"
log "Downloading from: $wrapperUrl"
if $cygwin; then
wrapperJarPath=$(cygpath --path --windows "$wrapperJarPath")
fi
if command -v wget > /dev/null; then
log "Found wget ... using wget"
[ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--quiet"
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
wget $QUIET "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
else
wget $QUIET --http-user="$MVNW_USERNAME" --http-password="$MVNW_PASSWORD" "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
fi
elif command -v curl > /dev/null; then
log "Found curl ... using curl"
[ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--silent"
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
curl $QUIET -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath"
else
curl $QUIET --user "$MVNW_USERNAME:$MVNW_PASSWORD" -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath"
fi
else
log "Falling back to using Java to download"
javaSource="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.java"
javaClass="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.class"
# For Cygwin, switch paths to Windows format before running javac
if $cygwin; then
javaSource=$(cygpath --path --windows "$javaSource")
javaClass=$(cygpath --path --windows "$javaClass")
fi
if [ -e "$javaSource" ]; then
if [ ! -e "$javaClass" ]; then
log " - Compiling MavenWrapperDownloader.java ..."
("$JAVA_HOME/bin/javac" "$javaSource")
fi
if [ -e "$javaClass" ]; then
log " - Running MavenWrapperDownloader.java ..."
("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$wrapperUrl" "$wrapperJarPath") || rm -f "$wrapperJarPath"
fi
fi
fi
fi
##########################################################################################
# End of extension
##########################################################################################
# If specified, validate the SHA-256 sum of the Maven wrapper jar file
wrapperSha256Sum=""
while IFS="=" read -r key value; do
case "$key" in (wrapperSha256Sum) wrapperSha256Sum=$value; break ;;
esac
done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties"
if [ -n "$wrapperSha256Sum" ]; then
wrapperSha256Result=false
if command -v sha256sum > /dev/null; then
if echo "$wrapperSha256Sum $wrapperJarPath" | sha256sum -c > /dev/null 2>&1; then
wrapperSha256Result=true
fi
elif command -v shasum > /dev/null; then
if echo "$wrapperSha256Sum $wrapperJarPath" | shasum -a 256 -c > /dev/null 2>&1; then
wrapperSha256Result=true
fi
else
echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available."
echo "Please install either command, or disable validation by removing 'wrapperSha256Sum' from your maven-wrapper.properties."
exit 1
fi
if [ $wrapperSha256Result = false ]; then
echo "Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised." >&2
echo "Investigate or delete $wrapperJarPath to attempt a clean download." >&2
echo "If you updated your Maven version, you need to update the specified wrapperSha256Sum property." >&2
exit 1
fi
fi
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=$(cygpath --path --windows "$JAVA_HOME")
[ -n "$CLASSPATH" ] &&
CLASSPATH=$(cygpath --path --windows "$CLASSPATH")
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=$(cygpath --path --windows "$MAVEN_PROJECTBASEDIR")
fi
# Provide a "standardized" way to retrieve the CLI args that will
# work with both Windows and non-Windows executions.
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $*"
export MAVEN_CMD_LINE_ARGS
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
# shellcheck disable=SC2086 # safe args
exec "$JAVACMD" \
$MAVEN_OPTS \
$MAVEN_DEBUG_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

205
mvnw.cmd

@ -0,0 +1,205 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Apache Maven Wrapper startup batch script, version 3.2.0
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM set title of command window
title %0
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %*
if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %*
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
set WRAPPER_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
IF "%%A"=="wrapperUrl" SET WRAPPER_URL=%%B
)
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
if exist %WRAPPER_JAR% (
if "%MVNW_VERBOSE%" == "true" (
echo Found %WRAPPER_JAR%
)
) else (
if not "%MVNW_REPOURL%" == "" (
SET WRAPPER_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
)
if "%MVNW_VERBOSE%" == "true" (
echo Couldn't find %WRAPPER_JAR%, downloading it ...
echo Downloading from: %WRAPPER_URL%
)
powershell -Command "&{"^
"$webclient = new-object System.Net.WebClient;"^
"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
"}"^
"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%WRAPPER_URL%', '%WRAPPER_JAR%')"^
"}"
if "%MVNW_VERBOSE%" == "true" (
echo Finished downloading %WRAPPER_JAR%
)
)
@REM End of extension
@REM If specified, validate the SHA-256 sum of the Maven wrapper jar file
SET WRAPPER_SHA_256_SUM=""
FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
IF "%%A"=="wrapperSha256Sum" SET WRAPPER_SHA_256_SUM=%%B
)
IF NOT %WRAPPER_SHA_256_SUM%=="" (
powershell -Command "&{"^
"$hash = (Get-FileHash \"%WRAPPER_JAR%\" -Algorithm SHA256).Hash.ToLower();"^
"If('%WRAPPER_SHA_256_SUM%' -ne $hash){"^
" Write-Output 'Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised.';"^
" Write-Output 'Investigate or delete %WRAPPER_JAR% to attempt a clean download.';"^
" Write-Output 'If you updated your Maven version, you need to update the specified wrapperSha256Sum property.';"^
" exit 1;"^
"}"^
"}"
if ERRORLEVEL 1 goto error
)
@REM Provide a "standardized" way to retrieve the CLI args that will
@REM work with both Windows and non-Windows executions.
set MAVEN_CMD_LINE_ARGS=%*
%MAVEN_JAVA_EXE% ^
%JVM_CONFIG_MAVEN_PROPS% ^
%MAVEN_OPTS% ^
%MAVEN_DEBUG_OPTS% ^
-classpath %WRAPPER_JAR% ^
"-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^
%WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat"
if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%"=="on" pause
if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE%
cmd /C exit /B %ERROR_CODE%

119
pom.xml

@ -0,0 +1,119 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bfd.crawl</groupId>
<artifactId>kafkaHandler</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafkaHandler</name>
<description>kafkaHandler</description>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-client</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
<!--JSON-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.17</version>
</dependency>
<!--OKHTTP-->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<!--redis-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.bfd.util</groupId>
<artifactId>pauseTool</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

13
src/main/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplication.java

@ -0,0 +1,13 @@
package com.bfd.crawl.kafkahandler;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaHandlerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaHandlerApplication.class, args);
}
}

61
src/main/java/com/bfd/crawl/kafkahandler/bean/ResponsePo.java

@ -0,0 +1,61 @@
package com.bfd.crawl.kafkahandler.bean;
import com.bfd.crawl.kafkahandler.enums.ResponseCode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author:jinming
* @className:ResponsePo
* @version:1.0
* @description:
* @Date:2023/4/3 17:23
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResponsePo {
/**
* 响应码
*/
private int code;
/**
* 正常放 返回数据 的JSON串
*/
private Object data;
/**
* 提示消息
*/
private String message;
public static ResponsePo success() {
return setStatus(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getMessage());
}
public static ResponsePo error() {
return setStatus(ResponseCode.FAILURE.getCode(), ResponseCode.FAILURE.getMessage());
}
public static ResponsePo setStatus(int code, String message) {
ResponsePo resultBean = new ResponsePo();
resultBean.code = code;
resultBean.message = message;
return resultBean;
}
public ResponsePo(int code, String message) {
this.code = code;
this.message = message;
this.data = data;
}
public ResponsePo(ResponseCode responseCode){
this.code = responseCode.getCode();
this.message = responseCode.getMessage();
this.data = data;
}
}

63
src/main/java/com/bfd/crawl/kafkahandler/config/AsyncThreadConfiguration.java

@ -0,0 +1,63 @@
package com.bfd.crawl.kafkahandler.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @author jinming
* @version 1.0
* @className AsyncThreadConfiguration
* @Date 2022/2/17 18:37
*/
@Configuration
@EnableAsync
public class AsyncThreadConfiguration {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(50);
// 并发线程的数量限制为2
executor.setMaxPoolSize(50);
// 线程队列
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("handlerData-");
executor.initialize();
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
@Bean
public Executor producterExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(150);
// 并发线程的数量限制为2
executor.setMaxPoolSize(150);
// 线程队列
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("sendData-");
executor.initialize();
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
@Bean
public Executor consumerHandlerExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(150);
// 并发线程的数量限制为2
executor.setMaxPoolSize(150);
// 线程队列
executor.setQueueCapacity(150);
executor.setThreadNamePrefix("consumerHandler-");
executor.initialize();
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}

60
src/main/java/com/bfd/crawl/kafkahandler/config/Constant.java

@ -0,0 +1,60 @@
package com.bfd.crawl.kafkahandler.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author:jinming
* @className:Constant
* @version:1.0
* @description:
* @Date:2023/8/16 15:26
*/
public class Constant {
/**
* kafka producter 对象存储
*/
public static Map<String, KafkaProducer<String, String>> PRODUCER_MAP = new HashMap<>(32);
/**
* kafka consumer 对象存储
*/
public static Map<String, KafkaConsumer<String, String>> CONSUMER_MAP = new HashMap<>(32);
/**
* kafka runTimeMap 对象存储
*/
public static Map<Integer, Boolean> CONSUMER_RUNTIME_MAP = new HashMap<>(32);
/**
* kafka consumer 超时问题存储
*/
public static ConcurrentHashMap<String, Map> TIME_OUT_MAP = new ConcurrentHashMap<>(32);
/**
* 生产者的任务类型
*/
public final static Integer PRODUCER_TYPE = 1;
/**
* 空字符串常量
*/
public static final String EMPTY = "";
/**
* 不需要DataUtil解析的Key
*/
public static final String NOT_KEY = ":$";
public static final String FORM = "form";
public static final String FIELD = "field";
public static final String VALUE = "value";
public static final String ALL= "*";
public static final String DATATYPE = "dataType";
public static final String STRING_TYPE = "String";
}

25
src/main/java/com/bfd/crawl/kafkahandler/config/ZookeeperConfig.java

@ -0,0 +1,25 @@
package com.bfd.crawl.kafkahandler.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author jian.mao
* @date 2024年4月16日
* @description
*/
@Configuration
public class ZookeeperConfig {
@Value("${zookeeper.connection-string}")
private String connectionString;
@Bean
public CuratorFramework curatorFramework() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
return curatorFramework;
}
}

53
src/main/java/com/bfd/crawl/kafkahandler/controller/HandlerDataController.java

@ -0,0 +1,53 @@
package com.bfd.crawl.kafkahandler.controller;
import com.alibaba.fastjson.JSON;
import com.bfd.crawl.kafkahandler.bean.ResponsePo;
import com.bfd.crawl.kafkahandler.enums.ResponseCode;
import com.bfd.crawl.kafkahandler.util.QueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* @author:jinming
* @className:HandlerDataController
* @version:1.0
* @description: 处理接口
* @Date:2023/7/13 14:25
*/
@RestController
@RequestMapping("/handlerdata")
@Slf4j
public class HandlerDataController {
@GetMapping("/test")
public String test() {
return "test";
}
@PostMapping("/kafka")
public ResponsePo kafkaHandler(@RequestBody String dataJson) {
ResponsePo responsePo = ResponsePo.success();
try {
Map parse = (Map) JSON.parse(dataJson);
} catch (Exception e) {
log.error("请求格式发生异常" + e.getMessage());
responsePo.setCode(ResponseCode.FAILURE.getCode());
responsePo.setMessage(ResponseCode.FAILURE.getMessage());
return responsePo;
}
log.info("新增任务:" + dataJson);
try {
QueueUtil.taskQueue.put(dataJson);
} catch (InterruptedException e) {
e.printStackTrace();
}
return responsePo;
}
}

32
src/main/java/com/bfd/crawl/kafkahandler/enums/ResponseCode.java

@ -0,0 +1,32 @@
package com.bfd.crawl.kafkahandler.enums;
/**
* @author:jinming
* @className:ResponseCodeEnum
* @version:1.0
* @description:响应结果码枚举类
* @Date:2023/2/28 11:40
*/
public enum ResponseCode {
//返回结果码枚举类
SUCCESS(200, "操作成功"),
FAILURE(400, "参数错误"),
INTERNAL_SERVER_ERROR(500, "服务器内部错误"),
TYPE_NOT_SUPPORT(601,"文件类型不支持");
private int code;
private String message;
ResponseCode(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
}

160
src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandler.java

@ -0,0 +1,160 @@
package com.bfd.crawl.kafkahandler.service;
import com.alibaba.fastjson.JSON;
import com.bfd.crawl.kafkahandler.config.Constant;
import com.bfd.crawl.kafkahandler.util.QueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
/**
* @author:jinming
* @className:ConsumerHandler
* @version:1.0
* @description:
* @Date:2024/7/9 18:01
*/
@Service
@Slf4j
public class ConsumerHandler {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Async("consumerHandlerExecutor")
public void creatConsumer(Integer id, String name, Map admin, Map parse, Map output, String objectKey, String groupId) {
try {
int size = (int) admin.get("size");
String bootstrapServers = (String) admin.get("bootstrapServers");
String autoOffsetReset = (String) admin.get("autoOffsetReset");
String topic = (String) admin.get("topic");
// 创建Kafka消费者配置
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("auto.offset.reset", autoOffsetReset);
props.put("request.timeout.ms", "60000");
props.put("session.timeout.ms", "60000");
props.put("enable.auto.commit", "false");
// 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Constant.CONSUMER_MAP.put(objectKey, consumer);
List<String> filter = (List<String>) admin.get("filter");
int index = 1;
while (Constant.CONSUMER_RUNTIME_MAP.get(id)) {
boolean isBreak = false;
ConsumerRecords<String, String> msgList = consumer.poll(1000);
for (ConsumerRecord<String, String> record : msgList) {
try {
//取出的任务组装发送至需要流转的Kafka
String value = record.value();
boolean isContinue = true;
for (String filed : filter) {
if (value.contains(filed)) {
log.info("id:{},kafka满足过滤条件:{}", id, value);
isContinue = false;
}
}
if (isContinue) {
continue;
}
//fieldType自定义输出字段 0 关闭1-开启如果开启则拼接form到output里如果关闭则取默认的output拼接
int fieldType = (int) admin.get("fieldType");
Map result = new HashMap(32);
Map resultsMap = new HashMap(32);
resultsMap.put("result", value);
result.put("results", JSON.toJSONString(resultsMap));
parse.put("result", result);
if (fieldType != 0) {
resultsMap.remove("result");
Map valueParse = (Map) JSON.parse(value);
Set set = output.keySet();
for (Object o : set) {
resultsMap.put(o, valueParse.get(o));
}
String resultsMapJson = JSON.toJSONString(resultsMap);
result.put("results", resultsMapJson);
parse.put("result", result);
String empty = "{}";
if (resultsMapJson.equals(empty)) {
continue;
}
}
String message = JSON.toJSONString(parse);
QueueUtil.sendQueue.put(message);
//根据size来判断任务是否要退出size为-1时代表一直消费或当timeout到期时关闭consumer导致异常退出
index++;
if (size != -1 && index == size) {
isBreak = true;
break;
}
} catch (Exception e) {
log.info("id :{},消费者线程读取数据后解析失败", id);
e.printStackTrace();
}
}
// 提交偏移量
try {
consumer.commitSync();
} catch (Exception e) {
log.error("提交偏移量失败", e);
}
if (isBreak) {
consumer.close();
Constant.CONSUMER_MAP.remove(objectKey);
Constant.TIME_OUT_MAP.remove(objectKey);
stringRedisTemplate.delete(objectKey);
}
}
consumer.close();
log.info("id :{},消费者线程已经关闭", id);
Constant.CONSUMER_MAP.remove(objectKey);
Constant.TIME_OUT_MAP.remove(objectKey);
try {
stringRedisTemplate.delete(objectKey);
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (IllegalStateException illegalStateException) {
//这里的异常没有打印考虑超时关闭退出的功能
log.error("检测到消费者被关闭,任务:" + id + "的\t" + name + "\t消费线程退出");
} catch (Throwable throwable) {
throwable.printStackTrace();
log.error("kafka消费线程发生异常" + throwable.getMessage());
Map result = new HashMap(32);
result.put("status", 2);
result.put("results", "");
result.put("message", "未知异常");
parse.put("result", result);
String message = JSON.toJSONString(parse);
try {
QueueUtil.sendQueue.put(message);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
String json = "";
Object parse = JSON.parse(json);
}
}

73
src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandlerService.java

@ -0,0 +1,73 @@
package com.bfd.crawl.kafkahandler.service;
import com.alibaba.fastjson.JSON;
import com.bfd.crawl.kafkahandler.config.Constant;
import com.bfd.crawl.kafkahandler.util.QueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* @author:jinming
* @className:ConsumerHandlerService
* @version:1.0
* @description:
* @Date:2023/8/22 16:54
*/
@Service
@Slf4j
public class ConsumerHandlerService {
@Autowired
private ConsumerHandler consumerHandler;
@Async
public void conumerHandler(Integer id, String name, Map admin, Map parse, Map output, String objectKey) {
Constant.CONSUMER_RUNTIME_MAP.put(id, true);
String topic = (String) admin.get("topic");
String bootstrapServers = (String) admin.get("bootstrapServers");
int totalPartitions = getTotalPartitions(topic, bootstrapServers);
String groupId = UUID.randomUUID().toString();
for (int i = 0; i < totalPartitions; i++) {
consumerHandler.creatConsumer(id, name, admin, parse, output, objectKey, groupId);
}
}
private static Integer getTotalPartitions(String topicName, String bootstrapServers) {
int totalPartitions = 1;
// Kafka管理员客户端的配置
Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 创建Kafka管理员客户端
try (AdminClient adminClient = AdminClient.create(adminClientConfig)) {
// 获取指定Topic的描述信息
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));
Map<String, TopicDescription> topicDescriptionMap = describeTopicsResult.all().get();
// 获取并打印指定Topic的分区数
TopicDescription topicDescription = topicDescriptionMap.get(topicName);
if (topicDescription != null) {
totalPartitions = topicDescription.partitions().size();
log.info("Topic: " + topicName + ", Total Partitions: " + totalPartitions);
} else {
log.info("Topic: " + topicName + " not found.");
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return totalPartitions;
}
}

93
src/main/java/com/bfd/crawl/kafkahandler/service/ProducterHandlerService.java

@ -0,0 +1,93 @@
package com.bfd.crawl.kafkahandler.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bfd.crawl.kafkahandler.config.Constant;
import com.bfd.crawl.kafkahandler.util.DataUtil;
import com.bfd.crawl.kafkahandler.util.QueueUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* @author:jinming
* @className:ProducterHandlerService
* @version:1.0
* @description:
* @Date:2023/8/28 14:40
*/
@Service
@Slf4j
public class ProducterHandlerService {
@Async("producterExecutor")
public void producterHandler(Integer id, String name, Map admin, Map dataMap, Map parse) {
Gson gson = new GsonBuilder().serializeNulls().create();
log.info("任务ID:" + id + "," + name + "生产者线程启动");
String bootstrapServers = (String) admin.get("bootstrapServers");
String topic = (String) admin.get("topic");
String message = "";
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.timeout.ms", "60000");
props.put("session.timeout.ms", "60000");
props.put("max.request.size", "47185920");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 设置请求体包含要写入的文档数据
Map<String, Object> document = new HashMap<String, Object>(16);
//获取写入字段
List<Map<String, Object>> form = (List<Map<String, Object>>) admin.get(Constant.FORM);
for (Map<String, Object> map : form) {
//字段名称
String field = (String) map.get(Constant.FIELD);
//数据源path公式
String valuePath = (String) map.get(Constant.VALUE);
//根据path去数据源data下获取字段值
Object value = DataUtil.getValue(valuePath, dataMap);
//字段为空不处理
// if (value == null) {
// continue;
// }
//判断是不是 String类型 ---暂时先判断String类型
//其他类型直接进行赋值--暂定因为前端传过来的字段是根据我们应用进行定义的类型基本一致
document.put(field, value);
}
// message = JSONObject.toJSONString(document);
document.put("isLast", 1);
message = gson.toJson(document);
try {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, message);
producer.send(producerRecord);
producer.flush();
producer.close();
} catch (Throwable throwable) {
throwable.printStackTrace();
log.error("kafka生产线程发生异常" + throwable.getMessage());
}
Map result = new HashMap(32);
result.put("results", message);
result.put("status", 1);
parse.put("result", result);
String nextTask = JSON.toJSONString(parse);
try {
QueueUtil.sendQueue.put(nextTask);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

102
src/main/java/com/bfd/crawl/kafkahandler/service/SchHandlerService.java

@ -0,0 +1,102 @@
package com.bfd.crawl.kafkahandler.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.bfd.crawl.kafkahandler.config.Constant;
import com.bfd.crawl.kafkahandler.util.QueueUtil;
import com.bfd.crawl.kafkahandler.util.StringUtil;
import com.bfd.util.PauseTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @author:jinming
* @className:SchHandlerService
* @version:1.0
* @description:
* @Date:2023/8/28 14:52
*/
@Service
@Slf4j
public class SchHandlerService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Autowired
private ConsumerHandlerService consumerHandlerService;
@Autowired
private ProducterHandlerService producterHandlerService;
@Autowired
private TimeOutHandlerService timeOutHandlerService;
@Async
public void run() {
//先执行对象的超时处理线程
// timeOutHandlerService.handlerTimeOut();
while (true) {
try {
String dataJson = QueueUtil.taskQueue.take();
Map parse = (Map) JSON.parse(dataJson);
String objectKey = parse.get("scenes_id").toString();
int id = (int) parse.get("scenes_id");
String name = (String) parse.get("name");
Map dataMap = (Map) parse.get("data");
Map admin = (Map) parse.get("input");
Map output = (Map) parse.get("output");
int scenesId = (int) parse.get("scenes_id");
int version = (int) parse.get("version");
String pauseKey = scenesId + "_" + version;
if (!PauseTool.CACHE.containsKey(pauseKey)) {
log.info("流程:{}的版本:{}已失效,任务跳过", scenesId, version);
continue;
}
// dataMap.put("blueprint_id", parse.get("blueprint_id"));
// dataMap.put("scenes_id", parse.get("scenes_id"));
// dataMap.put("id", parse.get("id"));
// dataMap.put("transfer_id", parse.get("transfer_id"));
// dataMap.put("businessKey", parse.get("businessKey"));
int type = (int) admin.get("type");
int timeOut = (int) admin.get("timeOut");
stringRedisTemplate.opsForHash().put(objectKey, "data", dataJson);
if (timeOut > 0) {
stringRedisTemplate.expire(objectKey, timeOut, TimeUnit.SECONDS);
}
//加入超时的相关信息该信息针对生产与消费要求前端进行判断消费者默认是-1生产者默认是600s
// if (!Constant.TIME_OUT_MAP.containsKey(objectKey)) {
// Map timeOutMap = new HashMap(32);
// timeOutMap.put("timeOut", timeOut);
// timeOutMap.put("startTime", System.currentTimeMillis());
// Constant.TIME_OUT_MAP.put(objectKey, timeOutMap);
// }else {
//
// }
if (type == Constant.PRODUCER_TYPE) {
producterHandlerService.producterHandler(id, name, admin, dataMap, parse);
} else {
//根据传入的条件判断当前是否有一样的程序在执行若有则不会重复启动对应的处理线程,生产者不需要这个判断每次收到既生产就可以
if (!Constant.CONSUMER_MAP.containsKey(objectKey)) {
consumerHandlerService.conumerHandler(id, name, admin, parse, output, objectKey);
}
}
} catch (Throwable e) {
e.printStackTrace();
log.error("工作线程发生异常:{}", e.getMessage());
}
}
}
}

61
src/main/java/com/bfd/crawl/kafkahandler/service/SendService.java

@ -0,0 +1,61 @@
package com.bfd.crawl.kafkahandler.service;
import com.alibaba.fastjson.JSON;
import com.bfd.crawl.kafkahandler.util.QueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* @author:jinming
* @className:SendService
* @version:1.0
* @description:
* @Date:2023/7/31 17:53
*/
@Slf4j
@Service
public class SendService {
@Value("${send.topic}")
private String topic;
@Autowired
private KafkaTemplate kafkaTemplate;
@Async()
void sendToKafka() {
while (true) {
if (QueueUtil.sendQueue.size() > 0) {
String message = null;
String version = null;
String id = null;
try {
message = QueueUtil.sendQueue.take();
Map parse = (Map) JSON.parse(message);
version = parse.get("version").toString();
id = parse.get("id").toString();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
kafkaTemplate.send(topic, message);
log.info("id:{},version:{}数据已发出", id, version);
} catch (Exception e) {
log.info("id:{},version:{}数据发送异常:{}", id, version, e.getMessage());
e.printStackTrace();
}
} else {
log.info("任务队列为空,休眠3秒");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

60
src/main/java/com/bfd/crawl/kafkahandler/service/StartServcie.java

@ -0,0 +1,60 @@
package com.bfd.crawl.kafkahandler.service;
import com.bfd.crawl.kafkahandler.util.QueueUtil;
import com.bfd.util.PauseTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Set;
/**
* @author:jinming
* @className:StartServcie
* @version:1.0
* @description:
* @Date:2023/7/31 17:14
*/
@Service
@Slf4j
public class StartServcie implements ApplicationRunner {
@Autowired
private SchHandlerService schHandlerService;
@Value("${zookeeper.connection-string}")
private String connectionString;
@Value("${zookeeper.publish-node}")
private String nodePath;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Value("${thread.send}")
private int sendNumber;
@Autowired
private SendService sendService;
@Override
public void run(ApplicationArguments args) throws Exception {
PauseTool pauseTool = new PauseTool();
pauseTool.initializeRedisCache(stringRedisTemplate);
pauseTool.setupZookeeperListener(connectionString, nodePath);
for (int i = 0; i < sendNumber; i++) {
sendService.sendToKafka();
}
schHandlerService.run();
try {
Set<String> keys = stringRedisTemplate.keys("*");
for (String key : keys) {
String data = (String) stringRedisTemplate.opsForHash().get(key, "data");
QueueUtil.taskQueue.put(data);
}
} catch (Exception e) {
}
}
}

66
src/main/java/com/bfd/crawl/kafkahandler/service/TimeOutHandlerService.java

@ -0,0 +1,66 @@
package com.bfd.crawl.kafkahandler.service;
import com.bfd.crawl.kafkahandler.config.Constant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author:jinming
* @className:TimeOutHandlerService
* @version:1.0
* @description:
* @Date:2023/8/29 17:54
*/
@Service
@Slf4j
public class TimeOutHandlerService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Async("asyncExecutor")
public void handlerTimeOut() {
while (true) {
try {
if (Constant.TIME_OUT_MAP.size() > 0) {
ConcurrentHashMap.KeySetView<String, Map> strings = Constant.TIME_OUT_MAP.keySet();
for (String string : strings) {
Map map = Constant.TIME_OUT_MAP.get(string);
int timeOut = (int) map.get("timeOut");
long startTime = (long) map.get("startTime");
long timeConsuming = (System.currentTimeMillis() - startTime) / 1000;
//如果超时时间为-1或者当前时间减开始时间小于超时时间不处理
if (timeOut == -1 || timeConsuming < timeOut) {
continue;
}
if (Constant.CONSUMER_MAP.containsKey(string)) {
Constant.CONSUMER_MAP.get(string).close();
}
if (Constant.PRODUCER_MAP.containsKey(string)) {
Constant.PRODUCER_MAP.get(string).close();
}
try {
stringRedisTemplate.delete(string);
} catch (Exception e) {
}
}
Thread.sleep(30000);
} else {
log.info("当前队列为空休眠10s");
Thread.sleep(10000);
}
} catch (Throwable throwable) {
throwable.printStackTrace();
log.error("计时线程发生异常" + throwable.getMessage());
}
}
}
}

66
src/main/java/com/bfd/crawl/kafkahandler/service/ZookeeperNodeMonitor.java

@ -0,0 +1,66 @@
package com.bfd.crawl.kafkahandler.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bfd.crawl.kafkahandler.config.Constant;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @author jian.mao
* @date 2024年4月17日
* @description
*/
@Component
@Slf4j
public class ZookeeperNodeMonitor {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Autowired
private CuratorFramework curatorFramework;
@Value("${zookeeper.publish-node}")
private String nodePath;
@PostConstruct
public void init() {
try {
// 创建节点监听器
NodeCache nodeCache = new NodeCache(curatorFramework, nodePath);
nodeCache.start();
// 监听节点变化
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData();
String nodeData = new String(data);
log.info("Node data changed: " + nodeData);
JSONObject jsonObject = JSON.parseObject(nodeData);
int scenesId = jsonObject.getIntValue("scenes_id");
Constant.CONSUMER_RUNTIME_MAP.put(scenesId, false);
if (Constant.PRODUCER_MAP.containsKey(nodeData)) {
Constant.PRODUCER_MAP.get(nodeData).close();
Constant.PRODUCER_MAP.remove(scenesId);
}
try {
stringRedisTemplate.delete(nodeData);
} catch (Exception e) {
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}

65
src/main/java/com/bfd/crawl/kafkahandler/util/DataUtil.java

@ -0,0 +1,65 @@
package com.bfd.crawl.kafkahandler.util;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.bfd.crawl.kafkahandler.config.Constant;
/**
* @author:jinming
* @className:DataUtil
* @version:1.0
* @description: 获取dataValue的值
* @Date:2023/11/1 9:54
*/
@Slf4j
public class DataUtil {
/**
* @param key 传入的key
* @param dataMap 数据map
* @return 根据传入的参数进行判断解析返回正确的dataValue
*/
public static Object getValue(String key, Map dataMap) {
try {
//公式为空直接就返回
if (key.equals(Constant.EMPTY)) {
return Constant.EMPTY;
}
if (!key.contains(Constant.NOT_KEY)) {
return key;
}
Object dataValue;
String isJson = "#json#";
if (key.contains(isJson)) {
//进行第一次拆分获取#json#前面的部分
String[] keySplit = key.split(isJson);
String firstDataKey = keySplit[0];
String[] firstDataKeySplit = firstDataKey.split(":");
//取出前半部分对应的JSON数据并转换为JSONObject
String dataJson = (String) dataMap.get(firstDataKeySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
//根据key的后半部分取出对应JSONObject中的值
String firstDataKeyJson = (String) JSONPath.eval(dataJsonObject, firstDataKeySplit[1]);
String secDataKey = keySplit[1];
JSONObject firstDataJsonObject = JSON.parseObject(firstDataKeyJson);
dataValue = JSONPath.eval(firstDataJsonObject, secDataKey);
return dataValue;
}
String[] keySplit = key.split(":");
String jsonPath = keySplit[1];
String dataJson = (String) dataMap.get(keySplit[0]);
JSONObject dataJsonObject = JSON.parseObject(dataJson);
dataValue = JSONPath.eval(dataJsonObject, jsonPath);
return dataValue;
} catch (Exception e) {
// TODO: handle exception
log.error("jsonpath公式取值异常,", e);
return null;
}
}
}

16
src/main/java/com/bfd/crawl/kafkahandler/util/QueueUtil.java

@ -0,0 +1,16 @@
package com.bfd.crawl.kafkahandler.util;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author:jinming
* @className:QueueUtil
* @version:1.0
* @description:
* @Date:2023/7/13 15:00
*/
public class QueueUtil {
public static LinkedBlockingDeque<String> taskQueue = new LinkedBlockingDeque<String>();
public static LinkedBlockingDeque<String> sendQueue = new LinkedBlockingDeque<String>();
}

147
src/main/java/com/bfd/crawl/kafkahandler/util/StringUtil.java

@ -0,0 +1,147 @@
package com.bfd.crawl.kafkahandler.util;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import java.security.MessageDigest;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author jinming
* @version 1.0
* @className StringUtile
* @Date 2022/1/21 11:46
*/
@Slf4j
public class StringUtil {
public static boolean hasValue(String str) {
return str != null && !"".equals(str.trim());
}
public static String getRegexGroup(String regex, String str, int id) {
String resultStr = "";
if (hasValue(str)) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(str);
if (m.find()) {
resultStr = m.group(id);
}
}
if ("".equals(resultStr)) {
}
return resultStr;
}
/**
*
* @param str1 原始字符串1
* @param str2 原始字符串2
* @return 并集后的新字符串
*/
public static String stringUnion(String str1, String str2) {
// 将两个字符串的字符合并到一个集合中自动去除重复字符
Set<Character> unionSet = new HashSet<>();
for (char c : str1.toCharArray()) {
unionSet.add(c);
}
for (char c : str2.toCharArray()) {
unionSet.add(c);
}
// 将集合转换为字符串
StringBuilder result = new StringBuilder();
for (char c : unionSet) {
result.append(c);
}
return result.toString();
}
// 递归函数来插入值
public static void insertValue(JSONObject jsonObject, String[] path, String value) {
if (path.length == 0) {
return;
}
String key = path[0];
if (path.length == 1) {
jsonObject.put(key, value);
} else {
JSONObject subObject = jsonObject.getJSONObject(key);
if (subObject == null) {
subObject = new JSONObject();
jsonObject.put(key, subObject);
}
String[] newPath = new String[path.length - 1];
System.arraycopy(path, 1, newPath, 0, newPath.length);
insertValue(subObject, newPath, value);
}
}
public static Set<String> getEmailAddress(String message) {
Set<String> emailList = new HashSet<>();
Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+");
Matcher m = pattern.matcher(message);
while (m.find()) {
emailList.add(m.group(0));
}
return emailList;
}
public static String getMd5(String string) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] bs = md5.digest(string.getBytes("UTF-8"));
StringBuilder sb = new StringBuilder(40);
for (byte x : bs) {
if ((x & 0xff) >> 4 == 0) {
sb.append("0").append(Integer.toHexString(x & 0xff));
} else {
sb.append(Integer.toHexString(x & 0xff));
}
}
return sb.toString();
} catch (Exception e) {
//LOG.error("获取md5异常", e);
return "nceaform" + System.currentTimeMillis();
}
}
public static String removeAllHtmlTags(String str) {
return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : "";
}
public static String getRegexGroup(Pattern regex, String str, int id) {
String resultStr = "";
if (hasValue(str)) {
Matcher m = regex.matcher(str);
if (m.find()) {
resultStr = m.group(id);
}
}
if ("".equals(resultStr)) {
log.error(regex + " parser error!");
}
return resultStr;
}
public static String getStrByPattern(String str, String regex) {
Pattern pattern = Pattern.compile(regex);
Matcher m = pattern.matcher(str);
return m.find() ? m.group(0) : "";
}
}

57
src/main/resources/application.yml

@ -0,0 +1,57 @@
server:
port: 13188
spring:
application:
name: 过滤器
boot:
admin:
client:
health:
timeout: 10s
url: http://172.16.12.55:8001
instance:
service-base-url: http://10.10.144.49:9080
kafka:
bootstrap-servers: 172.18.1.146:9092,172.18.1.147:9092,172.18.1.148:9092
producer:
retries: 3
acks: all
batch-size: 4096
buffer-memory: 102476800
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
max.request.size: 47185920
redis:
host: 172.18.1.157
port: 6379
timeout: 10000
database: 6
jedis:
pool:
max-active: 8 # ????????????????????
max-wait: 800 # ???????????????????????
max-idle: 8 # ???????????
min-idle: 2 # ???????????
logging:
file:
path: ./logs
thread:
handler: 1
send: 1
zookeeper:
connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181
publish-node: /analyze
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
send:
topic: analyze12321

36
src/main/resources/logback-spring.xml

@ -0,0 +1,36 @@
<configuration>
<!-- 属性文件:在properties文件中找到对应的配置项 -->
<springProperty scope="context" name="logging.file.path" source="logging.file.path"/>
<springProperty scope="context" name="logging.level" source="logging.level"/>
<!-- 默认的控制台日志输出,一般生产环境都是后台启动,这个没太大作用 -->
<appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<appender name="GLMAPPER-LOGGERONE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>${logging.level}</level>
</filter>
<file>
${logging.file.path}/kafkaHandler.log
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logging.file.path}/kafkaHandler.log.%d{yyyy-MM-dd}</FileNamePattern>
<MaxHistory>3</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="GLMAPPER-LOGGERONE"/>
<appender-ref ref="STDOUT"/>
</root>
</configuration>

13
src/test/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplicationTests.java

@ -0,0 +1,13 @@
package com.bfd.crawl.kafkahandler;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class KafkaHandlerApplicationTests {
@Test
void contextLoads() {
}
}
Loading…
Cancel
Save