commit 705a25c12267f896a163716d0420556ef12109bd Author: 55007 <55007@maojian> Date: Tue Jan 7 18:08:47 2025 +0800 kafka应用 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..8e1309d --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +kafka读取、写入应用 diff --git a/mvnw b/mvnw new file mode 100644 index 0000000..66df285 --- /dev/null +++ b/mvnw @@ -0,0 +1,308 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Apache Maven Wrapper startup batch script, version 3.2.0 +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /usr/local/etc/mavenrc ] ; then + . /usr/local/etc/mavenrc + fi + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "$(uname)" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME + else + JAVA_HOME="/Library/Java/Home"; export JAVA_HOME + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=$(java-config --jre-home) + fi +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$JAVA_HOME" ] && + JAVA_HOME=$(cygpath --unix "$JAVA_HOME") + [ -n "$CLASSPATH" ] && + CLASSPATH=$(cygpath --path --unix "$CLASSPATH") +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] && + JAVA_HOME="$(cd "$JAVA_HOME" || (echo "cannot cd into $JAVA_HOME."; exit 1); pwd)" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="$(which javac)" + if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=$(which readlink) + if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then + if $darwin ; then + javaHome="$(dirname "\"$javaExecutable\"")" + javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac" + else + javaExecutable="$(readlink -f "\"$javaExecutable\"")" + fi + javaHome="$(dirname "\"$javaExecutable\"")" + javaHome=$(expr "$javaHome" : '\(.*\)/bin') + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=$(cd "$wdir/.." || exit 1; pwd) + fi + # end of workaround + done + printf '%s' "$(cd "$basedir" || exit 1; pwd)" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + # Remove \r in case we run on Windows within Git Bash + # and check out the repository with auto CRLF management + # enabled. Otherwise, we may read lines that are delimited with + # \r\n and produce $'-Xarg\r' rather than -Xarg due to word + # splitting rules. + tr -s '\r\n' ' ' < "$1" + fi +} + +log() { + if [ "$MVNW_VERBOSE" = true ]; then + printf '%s\n' "$1" + fi +} + +BASE_DIR=$(find_maven_basedir "$(dirname "$0")") +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}; export MAVEN_PROJECTBASEDIR +log "$MAVEN_PROJECTBASEDIR" + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +wrapperJarPath="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" +if [ -r "$wrapperJarPath" ]; then + log "Found $wrapperJarPath" +else + log "Couldn't find $wrapperJarPath, downloading it ..." + + if [ -n "$MVNW_REPOURL" ]; then + wrapperUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + else + wrapperUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + fi + while IFS="=" read -r key value; do + # Remove '\r' from value to allow usage on windows as IFS does not consider '\r' as a separator ( considers space, tab, new line ('\n'), and custom '=' ) + safeValue=$(echo "$value" | tr -d '\r') + case "$key" in (wrapperUrl) wrapperUrl="$safeValue"; break ;; + esac + done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" + log "Downloading from: $wrapperUrl" + + if $cygwin; then + wrapperJarPath=$(cygpath --path --windows "$wrapperJarPath") + fi + + if command -v wget > /dev/null; then + log "Found wget ... using wget" + [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--quiet" + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget $QUIET "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" + else + wget $QUIET --http-user="$MVNW_USERNAME" --http-password="$MVNW_PASSWORD" "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + log "Found curl ... using curl" + [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--silent" + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl $QUIET -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" + else + curl $QUIET --user "$MVNW_USERNAME:$MVNW_PASSWORD" -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" + fi + else + log "Falling back to using Java to download" + javaSource="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.java" + javaClass="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.class" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaSource=$(cygpath --path --windows "$javaSource") + javaClass=$(cygpath --path --windows "$javaClass") + fi + if [ -e "$javaSource" ]; then + if [ ! -e "$javaClass" ]; then + log " - Compiling MavenWrapperDownloader.java ..." + ("$JAVA_HOME/bin/javac" "$javaSource") + fi + if [ -e "$javaClass" ]; then + log " - Running MavenWrapperDownloader.java ..." + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$wrapperUrl" "$wrapperJarPath") || rm -f "$wrapperJarPath" + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +# If specified, validate the SHA-256 sum of the Maven wrapper jar file +wrapperSha256Sum="" +while IFS="=" read -r key value; do + case "$key" in (wrapperSha256Sum) wrapperSha256Sum=$value; break ;; + esac +done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" +if [ -n "$wrapperSha256Sum" ]; then + wrapperSha256Result=false + if command -v sha256sum > /dev/null; then + if echo "$wrapperSha256Sum $wrapperJarPath" | sha256sum -c > /dev/null 2>&1; then + wrapperSha256Result=true + fi + elif command -v shasum > /dev/null; then + if echo "$wrapperSha256Sum $wrapperJarPath" | shasum -a 256 -c > /dev/null 2>&1; then + wrapperSha256Result=true + fi + else + echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available." + echo "Please install either command, or disable validation by removing 'wrapperSha256Sum' from your maven-wrapper.properties." + exit 1 + fi + if [ $wrapperSha256Result = false ]; then + echo "Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised." >&2 + echo "Investigate or delete $wrapperJarPath to attempt a clean download." >&2 + echo "If you updated your Maven version, you need to update the specified wrapperSha256Sum property." >&2 + exit 1 + fi +fi + +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$JAVA_HOME" ] && + JAVA_HOME=$(cygpath --path --windows "$JAVA_HOME") + [ -n "$CLASSPATH" ] && + CLASSPATH=$(cygpath --path --windows "$CLASSPATH") + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=$(cygpath --path --windows "$MAVEN_PROJECTBASEDIR") +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $*" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +# shellcheck disable=SC2086 # safe args +exec "$JAVACMD" \ + $MAVEN_OPTS \ + $MAVEN_DEBUG_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/mvnw.cmd b/mvnw.cmd new file mode 100644 index 0000000..95ba6f5 --- /dev/null +++ b/mvnw.cmd @@ -0,0 +1,205 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM https://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Apache Maven Wrapper startup batch script, version 3.2.0 +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %* +if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %* +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set WRAPPER_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + +FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET WRAPPER_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET WRAPPER_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %WRAPPER_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%WRAPPER_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM If specified, validate the SHA-256 sum of the Maven wrapper jar file +SET WRAPPER_SHA_256_SUM="" +FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperSha256Sum" SET WRAPPER_SHA_256_SUM=%%B +) +IF NOT %WRAPPER_SHA_256_SUM%=="" ( + powershell -Command "&{"^ + "$hash = (Get-FileHash \"%WRAPPER_JAR%\" -Algorithm SHA256).Hash.ToLower();"^ + "If('%WRAPPER_SHA_256_SUM%' -ne $hash){"^ + " Write-Output 'Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised.';"^ + " Write-Output 'Investigate or delete %WRAPPER_JAR% to attempt a clean download.';"^ + " Write-Output 'If you updated your Maven version, you need to update the specified wrapperSha256Sum property.';"^ + " exit 1;"^ + "}"^ + "}" + if ERRORLEVEL 1 goto error +) + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% ^ + %JVM_CONFIG_MAVEN_PROPS% ^ + %MAVEN_OPTS% ^ + %MAVEN_DEBUG_OPTS% ^ + -classpath %WRAPPER_JAR% ^ + "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^ + %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat" +if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%"=="on" pause + +if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE% + +cmd /C exit /B %ERROR_CODE% diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c78335c --- /dev/null +++ b/pom.xml @@ -0,0 +1,119 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.2.4.RELEASE + + + com.bfd.crawl + kafkaHandler + 0.0.1-SNAPSHOT + kafkaHandler + kafkaHandler + + 8 + + + + de.codecentric + spring-boot-admin-client + 2.2.4 + + + org.springframework.boot + spring-boot-starter + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-web + + + com.google.code.gson + gson + 2.8.8 + + + + com.alibaba + fastjson + 2.0.17 + + + + com.squareup.okhttp3 + okhttp + 3.9.1 + + + org.apache.curator + curator-framework + 5.2.0 + + + org.apache.curator + curator-recipes + 5.2.0 + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.apache.kafka + kafka-clients + 2.3.1 + + + + org.redisson + redisson-spring-boot-starter + 3.13.6 + + + org.springframework.boot + spring-boot-starter-data-redis + + + com.bfd.util + pauseTool + 1.0 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/src/main/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplication.java b/src/main/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplication.java new file mode 100644 index 0000000..b8b7377 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplication.java @@ -0,0 +1,13 @@ +package com.bfd.crawl.kafkahandler; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaHandlerApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaHandlerApplication.class, args); + } + +} diff --git a/src/main/java/com/bfd/crawl/kafkahandler/bean/ResponsePo.java b/src/main/java/com/bfd/crawl/kafkahandler/bean/ResponsePo.java new file mode 100644 index 0000000..663e7b9 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/bean/ResponsePo.java @@ -0,0 +1,61 @@ +package com.bfd.crawl.kafkahandler.bean; + + + + +import com.bfd.crawl.kafkahandler.enums.ResponseCode; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author:jinming + * @className:ResponsePo + * @version:1.0 + * @description: + * @Date:2023/4/3 17:23 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ResponsePo { + /** + * 响应码 + */ + private int code; + + /** + * 正常放 返回数据 的JSON串 + */ + private Object data; + + /** + * 提示消息 + */ + private String message; + + public static ResponsePo success() { + return setStatus(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getMessage()); + } + + public static ResponsePo error() { + return setStatus(ResponseCode.FAILURE.getCode(), ResponseCode.FAILURE.getMessage()); + } + + public static ResponsePo setStatus(int code, String message) { + ResponsePo resultBean = new ResponsePo(); + resultBean.code = code; + resultBean.message = message; + return resultBean; + } + public ResponsePo(int code, String message) { + this.code = code; + this.message = message; + this.data = data; + } + public ResponsePo(ResponseCode responseCode){ + this.code = responseCode.getCode(); + this.message = responseCode.getMessage(); + this.data = data; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/config/AsyncThreadConfiguration.java b/src/main/java/com/bfd/crawl/kafkahandler/config/AsyncThreadConfiguration.java new file mode 100644 index 0000000..5c1fbdd --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/config/AsyncThreadConfiguration.java @@ -0,0 +1,63 @@ +package com.bfd.crawl.kafkahandler.config; + + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +/** + * @author jinming + * @version 1.0 + * @className AsyncThreadConfiguration + * @Date 2022/2/17 18:37 + */ +@Configuration +@EnableAsync +public class AsyncThreadConfiguration { + @Bean + public Executor asyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(50); + // 并发线程的数量限制为2 + executor.setMaxPoolSize(50); + // 线程队列 + executor.setQueueCapacity(50); + executor.setThreadNamePrefix("handlerData-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(true); + return executor; + } + @Bean + public Executor producterExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(150); + // 并发线程的数量限制为2 + executor.setMaxPoolSize(150); + // 线程队列 + executor.setQueueCapacity(1000); + executor.setThreadNamePrefix("sendData-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(true); + return executor; + } + + @Bean + public Executor consumerHandlerExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(150); + // 并发线程的数量限制为2 + executor.setMaxPoolSize(150); + // 线程队列 + executor.setQueueCapacity(150); + executor.setThreadNamePrefix("consumerHandler-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(true); + return executor; + } +} diff --git a/src/main/java/com/bfd/crawl/kafkahandler/config/Constant.java b/src/main/java/com/bfd/crawl/kafkahandler/config/Constant.java new file mode 100644 index 0000000..a8fffd6 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/config/Constant.java @@ -0,0 +1,60 @@ +package com.bfd.crawl.kafkahandler.config; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author:jinming + * @className:Constant + * @version:1.0 + * @description: + * @Date:2023/8/16 15:26 + */ +public class Constant { + /** + * kafka producter 对象存储 + */ + public static Map> PRODUCER_MAP = new HashMap<>(32); + + /** + * kafka consumer 对象存储 + */ + public static Map> CONSUMER_MAP = new HashMap<>(32); + + /** + * kafka runTimeMap 对象存储 + */ + public static Map CONSUMER_RUNTIME_MAP = new HashMap<>(32); + + /** + * kafka consumer 超时问题存储 + */ + public static ConcurrentHashMap TIME_OUT_MAP = new ConcurrentHashMap<>(32); + + /** + * 生产者的任务类型 + */ + public final static Integer PRODUCER_TYPE = 1; + + /** + * 空字符串常量 + */ + public static final String EMPTY = ""; + + /** + * 不需要DataUtil解析的Key + */ + public static final String NOT_KEY = ":$"; + + + public static final String FORM = "form"; + public static final String FIELD = "field"; + public static final String VALUE = "value"; + public static final String ALL= "*"; + public static final String DATATYPE = "dataType"; + public static final String STRING_TYPE = "String"; +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/config/ZookeeperConfig.java b/src/main/java/com/bfd/crawl/kafkahandler/config/ZookeeperConfig.java new file mode 100644 index 0000000..57efbf2 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/config/ZookeeperConfig.java @@ -0,0 +1,25 @@ +package com.bfd.crawl.kafkahandler.config; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author jian.mao + * @date 2024年4月16日 + * @description + */ +@Configuration +public class ZookeeperConfig { + @Value("${zookeeper.connection-string}") + private String connectionString; + @Bean + public CuratorFramework curatorFramework() { + CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3)); + curatorFramework.start(); + return curatorFramework; + } +} diff --git a/src/main/java/com/bfd/crawl/kafkahandler/controller/HandlerDataController.java b/src/main/java/com/bfd/crawl/kafkahandler/controller/HandlerDataController.java new file mode 100644 index 0000000..53c50d7 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/controller/HandlerDataController.java @@ -0,0 +1,53 @@ +package com.bfd.crawl.kafkahandler.controller; + +import com.alibaba.fastjson.JSON; + +import com.bfd.crawl.kafkahandler.bean.ResponsePo; +import com.bfd.crawl.kafkahandler.enums.ResponseCode; +import com.bfd.crawl.kafkahandler.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import java.util.Map; + +/** + * @author:jinming + * @className:HandlerDataController + * @version:1.0 + * @description: 处理接口 + * @Date:2023/7/13 14:25 + */ + +@RestController +@RequestMapping("/handlerdata") +@Slf4j +public class HandlerDataController { + + @GetMapping("/test") + public String test() { + return "test"; + } + + + + + @PostMapping("/kafka") + public ResponsePo kafkaHandler(@RequestBody String dataJson) { + ResponsePo responsePo = ResponsePo.success(); + try { + Map parse = (Map) JSON.parse(dataJson); + } catch (Exception e) { + log.error("请求格式发生异常" + e.getMessage()); + responsePo.setCode(ResponseCode.FAILURE.getCode()); + responsePo.setMessage(ResponseCode.FAILURE.getMessage()); + return responsePo; + } + log.info("新增任务:" + dataJson); + try { + QueueUtil.taskQueue.put(dataJson); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return responsePo; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/enums/ResponseCode.java b/src/main/java/com/bfd/crawl/kafkahandler/enums/ResponseCode.java new file mode 100644 index 0000000..4b35317 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/enums/ResponseCode.java @@ -0,0 +1,32 @@ +package com.bfd.crawl.kafkahandler.enums; + +/** + * @author:jinming + * @className:ResponseCodeEnum + * @version:1.0 + * @description:响应结果码枚举类 + * @Date:2023/2/28 11:40 + */ +public enum ResponseCode { + //返回结果码枚举类 + SUCCESS(200, "操作成功"), + FAILURE(400, "参数错误"), + INTERNAL_SERVER_ERROR(500, "服务器内部错误"), + TYPE_NOT_SUPPORT(601,"文件类型不支持"); + + private int code; + private String message; + + ResponseCode(int code, String message) { + this.code = code; + this.message = message; + } + + public int getCode() { + return code; + } + + public String getMessage() { + return message; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandler.java b/src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandler.java new file mode 100644 index 0000000..23f8804 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandler.java @@ -0,0 +1,160 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.alibaba.fastjson.JSON; +import com.bfd.crawl.kafkahandler.config.Constant; +import com.bfd.crawl.kafkahandler.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + + +import javax.annotation.Resource; +import java.util.*; + +/** + * @author:jinming + * @className:ConsumerHandler + * @version:1.0 + * @description: + * @Date:2024/7/9 18:01 + */ +@Service +@Slf4j +public class ConsumerHandler { + @Resource + private StringRedisTemplate stringRedisTemplate; + + + @Async("consumerHandlerExecutor") + public void creatConsumer(Integer id, String name, Map admin, Map parse, Map output, String objectKey, String groupId) { + try { + int size = (int) admin.get("size"); + String bootstrapServers = (String) admin.get("bootstrapServers"); + + String autoOffsetReset = (String) admin.get("autoOffsetReset"); + String topic = (String) admin.get("topic"); + // 创建Kafka消费者配置 + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("group.id", groupId); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("auto.offset.reset", autoOffsetReset); + props.put("request.timeout.ms", "60000"); + props.put("session.timeout.ms", "60000"); + props.put("enable.auto.commit", "false"); + // 创建Kafka消费者 + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Arrays.asList(topic)); + Constant.CONSUMER_MAP.put(objectKey, consumer); + List filter = (List) admin.get("filter"); + int index = 1; + while (Constant.CONSUMER_RUNTIME_MAP.get(id)) { + boolean isBreak = false; + ConsumerRecords msgList = consumer.poll(1000); + for (ConsumerRecord record : msgList) { + try { + //取出的任务组装发送至需要流转的Kafka + String value = record.value(); + boolean isContinue = true; + for (String filed : filter) { + if (value.contains(filed)) { + log.info("id:{},kafka满足过滤条件:{}", id, value); + isContinue = false; + } + } + if (isContinue) { + continue; + } + + //fieldType:自定义输出字段: 0 关闭,1-开启,如果开启则拼接form到output里(如果关闭,则取默认的output拼接) + int fieldType = (int) admin.get("fieldType"); + Map result = new HashMap(32); + Map resultsMap = new HashMap(32); + resultsMap.put("result", value); + result.put("results", JSON.toJSONString(resultsMap)); + parse.put("result", result); + if (fieldType != 0) { + resultsMap.remove("result"); + Map valueParse = (Map) JSON.parse(value); + Set set = output.keySet(); + for (Object o : set) { + resultsMap.put(o, valueParse.get(o)); + } + String resultsMapJson = JSON.toJSONString(resultsMap); + result.put("results", resultsMapJson); + parse.put("result", result); + String empty = "{}"; + if (resultsMapJson.equals(empty)) { + continue; + } + } + String message = JSON.toJSONString(parse); + QueueUtil.sendQueue.put(message); + //根据size来判断任务是否要退出,size为-1时代表一直消费,或当timeout到期时关闭consumer导致异常退出 + index++; + if (size != -1 && index == size) { + isBreak = true; + break; + } + } catch (Exception e) { + log.info("id :{},消费者线程读取数据后解析失败", id); + e.printStackTrace(); + } + } + // 提交偏移量 + try { + consumer.commitSync(); + } catch (Exception e) { + log.error("提交偏移量失败", e); + } + + if (isBreak) { + consumer.close(); + Constant.CONSUMER_MAP.remove(objectKey); + Constant.TIME_OUT_MAP.remove(objectKey); + stringRedisTemplate.delete(objectKey); + } + } + consumer.close(); + log.info("id :{},消费者线程已经关闭", id); + Constant.CONSUMER_MAP.remove(objectKey); + Constant.TIME_OUT_MAP.remove(objectKey); + try { + stringRedisTemplate.delete(objectKey); + } catch (Exception e) { + throw new RuntimeException(e); + } + } catch (IllegalStateException illegalStateException) { + //这里的异常没有打印,考虑超时关闭退出的功能 + log.error("检测到消费者被关闭,任务:" + id + "的\t" + name + "\t消费线程退出"); + } catch (Throwable throwable) { + throwable.printStackTrace(); + log.error("kafka消费线程发生异常" + throwable.getMessage()); + Map result = new HashMap(32); + result.put("status", 2); + result.put("results", ""); + result.put("message", "未知异常"); + parse.put("result", result); + String message = JSON.toJSONString(parse); + try { + QueueUtil.sendQueue.put(message); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + } + + public static void main(String[] args) { + + String json = ""; + Object parse = JSON.parse(json); + + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandlerService.java b/src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandlerService.java new file mode 100644 index 0000000..b3e93e6 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/ConsumerHandlerService.java @@ -0,0 +1,73 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.alibaba.fastjson.JSON; +import com.bfd.crawl.kafkahandler.config.Constant; +import com.bfd.crawl.kafkahandler.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.*; +import java.util.concurrent.ExecutionException; + +/** + * @author:jinming + * @className:ConsumerHandlerService + * @version:1.0 + * @description: + * @Date:2023/8/22 16:54 + */ +@Service +@Slf4j +public class ConsumerHandlerService { + @Autowired + private ConsumerHandler consumerHandler; + + @Async + public void conumerHandler(Integer id, String name, Map admin, Map parse, Map output, String objectKey) { + Constant.CONSUMER_RUNTIME_MAP.put(id, true); + String topic = (String) admin.get("topic"); + String bootstrapServers = (String) admin.get("bootstrapServers"); + int totalPartitions = getTotalPartitions(topic, bootstrapServers); + String groupId = UUID.randomUUID().toString(); + for (int i = 0; i < totalPartitions; i++) { + consumerHandler.creatConsumer(id, name, admin, parse, output, objectKey, groupId); + } + } + + private static Integer getTotalPartitions(String topicName, String bootstrapServers) { + int totalPartitions = 1; + // Kafka管理员客户端的配置 + Properties adminClientConfig = new Properties(); + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + // 创建Kafka管理员客户端 + try (AdminClient adminClient = AdminClient.create(adminClientConfig)) { + // 获取指定Topic的描述信息 + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName)); + Map topicDescriptionMap = describeTopicsResult.all().get(); + + // 获取并打印指定Topic的分区数 + TopicDescription topicDescription = topicDescriptionMap.get(topicName); + if (topicDescription != null) { + totalPartitions = topicDescription.partitions().size(); + log.info("Topic: " + topicName + ", Total Partitions: " + totalPartitions); + } else { + log.info("Topic: " + topicName + " not found."); + } + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return totalPartitions; + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/ProducterHandlerService.java b/src/main/java/com/bfd/crawl/kafkahandler/service/ProducterHandlerService.java new file mode 100644 index 0000000..8fda621 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/ProducterHandlerService.java @@ -0,0 +1,93 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bfd.crawl.kafkahandler.config.Constant; +import com.bfd.crawl.kafkahandler.util.DataUtil; +import com.bfd.crawl.kafkahandler.util.QueueUtil; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * @author:jinming + * @className:ProducterHandlerService + * @version:1.0 + * @description: + * @Date:2023/8/28 14:40 + */ +@Service +@Slf4j +public class ProducterHandlerService { + + @Async("producterExecutor") + public void producterHandler(Integer id, String name, Map admin, Map dataMap, Map parse) { + Gson gson = new GsonBuilder().serializeNulls().create(); + log.info("任务ID:" + id + "," + name + "生产者线程启动"); + String bootstrapServers = (String) admin.get("bootstrapServers"); + String topic = (String) admin.get("topic"); + String message = ""; + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("request.timeout.ms", "60000"); + props.put("session.timeout.ms", "60000"); + props.put("max.request.size", "47185920"); + + KafkaProducer producer = new KafkaProducer(props); + + // 设置请求体,包含要写入的文档数据 + Map document = new HashMap(16); + //获取写入字段 + List> form = (List>) admin.get(Constant.FORM); + for (Map map : form) { + //字段名称 + String field = (String) map.get(Constant.FIELD); + //数据源path公式 + String valuePath = (String) map.get(Constant.VALUE); + //根据path去数据源data下获取字段值 + Object value = DataUtil.getValue(valuePath, dataMap); + //字段为空不处理 +// if (value == null) { +// continue; +// } + //判断是不是 String类型 ---暂时先判断String类型 + //其他类型直接进行赋值,--暂定,因为前端传过来的字段是根据我们应用进行定义的,类型基本一致 + document.put(field, value); + } +// message = JSONObject.toJSONString(document); + document.put("isLast", 1); + message = gson.toJson(document); + try { + ProducerRecord producerRecord = new ProducerRecord(topic, message); + producer.send(producerRecord); + producer.flush(); + producer.close(); + } catch (Throwable throwable) { + throwable.printStackTrace(); + log.error("kafka生产线程发生异常" + throwable.getMessage()); + } + Map result = new HashMap(32); + result.put("results", message); + result.put("status", 1); + parse.put("result", result); + String nextTask = JSON.toJSONString(parse); + try { + QueueUtil.sendQueue.put(nextTask); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/SchHandlerService.java b/src/main/java/com/bfd/crawl/kafkahandler/service/SchHandlerService.java new file mode 100644 index 0000000..03d536a --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/SchHandlerService.java @@ -0,0 +1,102 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSONPath; +import com.bfd.crawl.kafkahandler.config.Constant; +import com.bfd.crawl.kafkahandler.util.QueueUtil; +import com.bfd.crawl.kafkahandler.util.StringUtil; +import com.bfd.util.PauseTool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * @author:jinming + * @className:SchHandlerService + * @version:1.0 + * @description: + * @Date:2023/8/28 14:52 + */ + +@Service +@Slf4j +public class SchHandlerService { + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Autowired + private ConsumerHandlerService consumerHandlerService; + @Autowired + private ProducterHandlerService producterHandlerService; + @Autowired + private TimeOutHandlerService timeOutHandlerService; + + + @Async + public void run() { + //先执行对象的超时处理线程 +// timeOutHandlerService.handlerTimeOut(); + while (true) { + try { + String dataJson = QueueUtil.taskQueue.take(); + Map parse = (Map) JSON.parse(dataJson); + String objectKey = parse.get("scenes_id").toString(); + int id = (int) parse.get("scenes_id"); + String name = (String) parse.get("name"); + Map dataMap = (Map) parse.get("data"); + Map admin = (Map) parse.get("input"); + Map output = (Map) parse.get("output"); + int scenesId = (int) parse.get("scenes_id"); + int version = (int) parse.get("version"); + String pauseKey = scenesId + "_" + version; + if (!PauseTool.CACHE.containsKey(pauseKey)) { + log.info("流程:{}的版本:{}已失效,任务跳过", scenesId, version); + continue; + } +// dataMap.put("blueprint_id", parse.get("blueprint_id")); +// dataMap.put("scenes_id", parse.get("scenes_id")); +// dataMap.put("id", parse.get("id")); +// dataMap.put("transfer_id", parse.get("transfer_id")); +// dataMap.put("businessKey", parse.get("businessKey")); + int type = (int) admin.get("type"); + int timeOut = (int) admin.get("timeOut"); + + stringRedisTemplate.opsForHash().put(objectKey, "data", dataJson); + if (timeOut > 0) { + stringRedisTemplate.expire(objectKey, timeOut, TimeUnit.SECONDS); + } + //加入超时的相关信息,该信息针对生产与消费,要求前端进行判断,消费者默认是-1,生产者默认是600s +// if (!Constant.TIME_OUT_MAP.containsKey(objectKey)) { +// Map timeOutMap = new HashMap(32); +// timeOutMap.put("timeOut", timeOut); +// timeOutMap.put("startTime", System.currentTimeMillis()); +// Constant.TIME_OUT_MAP.put(objectKey, timeOutMap); +// }else { +// +// } + if (type == Constant.PRODUCER_TYPE) { + producterHandlerService.producterHandler(id, name, admin, dataMap, parse); + } else { + //根据传入的条件判断当前是否有一样的程序在执行,若有则不会重复启动对应的处理线程,生产者不需要这个判断,每次收到既生产就可以 + if (!Constant.CONSUMER_MAP.containsKey(objectKey)) { + consumerHandlerService.conumerHandler(id, name, admin, parse, output, objectKey); + } + } + } catch (Throwable e) { + e.printStackTrace(); + log.error("工作线程发生异常:{}", e.getMessage()); + } + + } + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/SendService.java b/src/main/java/com/bfd/crawl/kafkahandler/service/SendService.java new file mode 100644 index 0000000..addbe29 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/SendService.java @@ -0,0 +1,61 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.alibaba.fastjson.JSON; +import com.bfd.crawl.kafkahandler.util.QueueUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.Map; + +/** + * @author:jinming + * @className:SendService + * @version:1.0 + * @description: + * @Date:2023/7/31 17:53 + */ +@Slf4j +@Service +public class SendService { + @Value("${send.topic}") + private String topic; + @Autowired + private KafkaTemplate kafkaTemplate; + + @Async() + void sendToKafka() { + while (true) { + if (QueueUtil.sendQueue.size() > 0) { + String message = null; + String version = null; + String id = null; + try { + message = QueueUtil.sendQueue.take(); + Map parse = (Map) JSON.parse(message); + version = parse.get("version").toString(); + id = parse.get("id").toString(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + kafkaTemplate.send(topic, message); + log.info("id:{},version:{}数据已发出", id, version); + } catch (Exception e) { + log.info("id:{},version:{}数据发送异常:{}", id, version, e.getMessage()); + e.printStackTrace(); + } + } else { + log.info("任务队列为空,休眠3秒"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } +} diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/StartServcie.java b/src/main/java/com/bfd/crawl/kafkahandler/service/StartServcie.java new file mode 100644 index 0000000..db8638d --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/StartServcie.java @@ -0,0 +1,60 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.bfd.crawl.kafkahandler.util.QueueUtil; +import com.bfd.util.PauseTool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Set; + +/** + * @author:jinming + * @className:StartServcie + * @version:1.0 + * @description: + * @Date:2023/7/31 17:14 + */ +@Service +@Slf4j +public class StartServcie implements ApplicationRunner { + @Autowired + private SchHandlerService schHandlerService; + @Value("${zookeeper.connection-string}") + private String connectionString; + @Value("${zookeeper.publish-node}") + private String nodePath; + @Resource + private StringRedisTemplate stringRedisTemplate; + @Value("${thread.send}") + private int sendNumber; + @Autowired + private SendService sendService; + + @Override + public void run(ApplicationArguments args) throws Exception { + PauseTool pauseTool = new PauseTool(); + pauseTool.initializeRedisCache(stringRedisTemplate); + pauseTool.setupZookeeperListener(connectionString, nodePath); + for (int i = 0; i < sendNumber; i++) { + sendService.sendToKafka(); + } + schHandlerService.run(); + try { + Set keys = stringRedisTemplate.keys("*"); + for (String key : keys) { + String data = (String) stringRedisTemplate.opsForHash().get(key, "data"); + QueueUtil.taskQueue.put(data); + } + } catch (Exception e) { + + } + + + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/TimeOutHandlerService.java b/src/main/java/com/bfd/crawl/kafkahandler/service/TimeOutHandlerService.java new file mode 100644 index 0000000..8dea6b2 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/TimeOutHandlerService.java @@ -0,0 +1,66 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.bfd.crawl.kafkahandler.config.Constant; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author:jinming + * @className:TimeOutHandlerService + * @version:1.0 + * @description: + * @Date:2023/8/29 17:54 + */ +@Service +@Slf4j +public class TimeOutHandlerService { + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Async("asyncExecutor") + public void handlerTimeOut() { + while (true) { + try { + if (Constant.TIME_OUT_MAP.size() > 0) { + ConcurrentHashMap.KeySetView strings = Constant.TIME_OUT_MAP.keySet(); + for (String string : strings) { + Map map = Constant.TIME_OUT_MAP.get(string); + int timeOut = (int) map.get("timeOut"); + long startTime = (long) map.get("startTime"); + long timeConsuming = (System.currentTimeMillis() - startTime) / 1000; + //如果超时时间为-1或者当前时间减开始时间小于超时时间,不处理 + if (timeOut == -1 || timeConsuming < timeOut) { + continue; + } + if (Constant.CONSUMER_MAP.containsKey(string)) { + Constant.CONSUMER_MAP.get(string).close(); + + } + if (Constant.PRODUCER_MAP.containsKey(string)) { + Constant.PRODUCER_MAP.get(string).close(); + } + try { + stringRedisTemplate.delete(string); + } catch (Exception e) { + + } + + } + Thread.sleep(30000); + } else { + log.info("当前队列为空休眠10s"); + Thread.sleep(10000); + } + } catch (Throwable throwable) { + throwable.printStackTrace(); + log.error("计时线程发生异常" + throwable.getMessage()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/service/ZookeeperNodeMonitor.java b/src/main/java/com/bfd/crawl/kafkahandler/service/ZookeeperNodeMonitor.java new file mode 100644 index 0000000..f78a1dd --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/service/ZookeeperNodeMonitor.java @@ -0,0 +1,66 @@ +package com.bfd.crawl.kafkahandler.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bfd.crawl.kafkahandler.config.Constant; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +/** + * @author jian.mao + * @date 2024年4月17日 + * @description + */ +@Component +@Slf4j +public class ZookeeperNodeMonitor { + + @Resource + private StringRedisTemplate stringRedisTemplate; + @Autowired + private CuratorFramework curatorFramework; + @Value("${zookeeper.publish-node}") + private String nodePath; + + @PostConstruct + public void init() { + try { + // 创建节点监听器 + NodeCache nodeCache = new NodeCache(curatorFramework, nodePath); + nodeCache.start(); + + // 监听节点变化 + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + byte[] data = nodeCache.getCurrentData().getData(); + String nodeData = new String(data); + log.info("Node data changed: " + nodeData); + JSONObject jsonObject = JSON.parseObject(nodeData); + int scenesId = jsonObject.getIntValue("scenes_id"); + Constant.CONSUMER_RUNTIME_MAP.put(scenesId, false); + if (Constant.PRODUCER_MAP.containsKey(nodeData)) { + Constant.PRODUCER_MAP.get(nodeData).close(); + Constant.PRODUCER_MAP.remove(scenesId); + } + try { + stringRedisTemplate.delete(nodeData); + } catch (Exception e) { + + } + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/bfd/crawl/kafkahandler/util/DataUtil.java b/src/main/java/com/bfd/crawl/kafkahandler/util/DataUtil.java new file mode 100644 index 0000000..c86a5cf --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/util/DataUtil.java @@ -0,0 +1,65 @@ +package com.bfd.crawl.kafkahandler.util; + +import java.util.Map; + +import lombok.extern.slf4j.Slf4j; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSONPath; +import com.bfd.crawl.kafkahandler.config.Constant; + +/** + * @author:jinming + * @className:DataUtil + * @version:1.0 + * @description: 获取dataValue的值 + * @Date:2023/11/1 9:54 + */ +@Slf4j +public class DataUtil { + /** + * @param key 传入的key + * @param dataMap 数据map + * @return 根据传入的参数进行判断解析,返回正确的dataValue + */ + public static Object getValue(String key, Map dataMap) { + try { + //公式为空直接就返回 + if (key.equals(Constant.EMPTY)) { + return Constant.EMPTY; + } + if (!key.contains(Constant.NOT_KEY)) { + return key; + } + Object dataValue; + String isJson = "#json#"; + if (key.contains(isJson)) { + //进行第一次拆分,获取#json#前面的部分 + String[] keySplit = key.split(isJson); + String firstDataKey = keySplit[0]; + String[] firstDataKeySplit = firstDataKey.split(":"); + //取出前半部分对应的JSON数据并转换为JSONObject + String dataJson = (String) dataMap.get(firstDataKeySplit[0]); + JSONObject dataJsonObject = JSON.parseObject(dataJson); + //根据key的后半部分取出对应JSONObject中的值 + String firstDataKeyJson = (String) JSONPath.eval(dataJsonObject, firstDataKeySplit[1]); + String secDataKey = keySplit[1]; + JSONObject firstDataJsonObject = JSON.parseObject(firstDataKeyJson); + dataValue = JSONPath.eval(firstDataJsonObject, secDataKey); + return dataValue; + } + String[] keySplit = key.split(":"); + String jsonPath = keySplit[1]; + String dataJson = (String) dataMap.get(keySplit[0]); + JSONObject dataJsonObject = JSON.parseObject(dataJson); + dataValue = JSONPath.eval(dataJsonObject, jsonPath); + return dataValue; + } catch (Exception e) { + // TODO: handle exception + log.error("jsonpath公式取值异常,", e); + return null; + } + + } +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/util/QueueUtil.java b/src/main/java/com/bfd/crawl/kafkahandler/util/QueueUtil.java new file mode 100644 index 0000000..0293c34 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/util/QueueUtil.java @@ -0,0 +1,16 @@ +package com.bfd.crawl.kafkahandler.util; + +import java.util.concurrent.LinkedBlockingDeque; + +/** + * @author:jinming + * @className:QueueUtil + * @version:1.0 + * @description: + * @Date:2023/7/13 15:00 + */ +public class QueueUtil { + public static LinkedBlockingDeque taskQueue = new LinkedBlockingDeque(); + + public static LinkedBlockingDeque sendQueue = new LinkedBlockingDeque(); +} \ No newline at end of file diff --git a/src/main/java/com/bfd/crawl/kafkahandler/util/StringUtil.java b/src/main/java/com/bfd/crawl/kafkahandler/util/StringUtil.java new file mode 100644 index 0000000..7354c70 --- /dev/null +++ b/src/main/java/com/bfd/crawl/kafkahandler/util/StringUtil.java @@ -0,0 +1,147 @@ +package com.bfd.crawl.kafkahandler.util; + + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; + +import java.security.MessageDigest; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author jinming + * @version 1.0 + * @className StringUtile + * @Date 2022/1/21 11:46 + */ +@Slf4j +public class StringUtil { + public static boolean hasValue(String str) { + return str != null && !"".equals(str.trim()); + } + + public static String getRegexGroup(String regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Pattern p = Pattern.compile(regex); + Matcher m = p.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + } + + return resultStr; + } + + /** + * + * @param str1 原始字符串1 + * @param str2 原始字符串2 + * @return 并集后的新字符串 + */ + public static String stringUnion(String str1, String str2) { + // 将两个字符串的字符合并到一个集合中,自动去除重复字符 + Set unionSet = new HashSet<>(); + + for (char c : str1.toCharArray()) { + unionSet.add(c); + } + + for (char c : str2.toCharArray()) { + unionSet.add(c); + } + + // 将集合转换为字符串 + StringBuilder result = new StringBuilder(); + for (char c : unionSet) { + result.append(c); + } + + return result.toString(); + } + + + + // 递归函数来插入值 + public static void insertValue(JSONObject jsonObject, String[] path, String value) { + if (path.length == 0) { + return; + } + String key = path[0]; + + if (path.length == 1) { + jsonObject.put(key, value); + } else { + JSONObject subObject = jsonObject.getJSONObject(key); + if (subObject == null) { + subObject = new JSONObject(); + jsonObject.put(key, subObject); + } + + String[] newPath = new String[path.length - 1]; + System.arraycopy(path, 1, newPath, 0, newPath.length); + insertValue(subObject, newPath, value); + } + } + + public static Set getEmailAddress(String message) { + Set emailList = new HashSet<>(); + Pattern pattern = Pattern.compile("\\w+\\.?\\w+\\@\\w+\\.\\w+"); + Matcher m = pattern.matcher(message); + while (m.find()) { + emailList.add(m.group(0)); + } + return emailList; + } + + public static String getMd5(String string) { + try { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] bs = md5.digest(string.getBytes("UTF-8")); + StringBuilder sb = new StringBuilder(40); + for (byte x : bs) { + if ((x & 0xff) >> 4 == 0) { + sb.append("0").append(Integer.toHexString(x & 0xff)); + } else { + sb.append(Integer.toHexString(x & 0xff)); + } + } + return sb.toString(); + } catch (Exception e) { + //LOG.error("获取md5异常", e); + return "nceaform" + System.currentTimeMillis(); + } + } + + public static String removeAllHtmlTags(String str) { + return hasValue(str) ? str.replaceAll("<[^<>]+?>", "") : ""; + } + + public static String getRegexGroup(Pattern regex, String str, int id) { + String resultStr = ""; + if (hasValue(str)) { + Matcher m = regex.matcher(str); + if (m.find()) { + resultStr = m.group(id); + } + } + + if ("".equals(resultStr)) { + log.error(regex + " parser error!"); + } + + return resultStr; + } + + public static String getStrByPattern(String str, String regex) { + Pattern pattern = Pattern.compile(regex); + Matcher m = pattern.matcher(str); + return m.find() ? m.group(0) : ""; + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..4c8cb71 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,57 @@ +server: + port: 13188 +spring: + application: + name: 过滤器 + boot: + admin: + client: + health: + timeout: 10s + url: http://172.16.12.55:8001 + instance: + service-base-url: http://10.10.144.49:9080 + kafka: + bootstrap-servers: 172.18.1.146:9092,172.18.1.147:9092,172.18.1.148:9092 + producer: + retries: 3 + acks: all + batch-size: 4096 + buffer-memory: 102476800 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + properties: + max.request.size: 47185920 + + redis: + host: 172.18.1.157 + port: 6379 + timeout: 10000 + database: 6 + jedis: + pool: + max-active: 8 # ???????????????????? + max-wait: 800 # ??????????????????????? + max-idle: 8 # ??????????? + min-idle: 2 # ??????????? +logging: + file: + path: ./logs +thread: + handler: 1 + send: 1 + +zookeeper: + connection-string: 172.18.1.146:2181,172.18.1.147:2181,172.18.1.148:2181 + publish-node: /analyze +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: always + +send: + topic: analyze12321 diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..a587526 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,36 @@ + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + + + + + true + + ${logging.level} + + + ${logging.file.path}/kafkaHandler.log + + + ${logging.file.path}/kafkaHandler.log.%d{yyyy-MM-dd} + 3 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %line %-5level %logger{50} - %msg%n + UTF-8 + + + + + + + + diff --git a/src/test/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplicationTests.java b/src/test/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplicationTests.java new file mode 100644 index 0000000..56d25bd --- /dev/null +++ b/src/test/java/com/bfd/crawl/kafkahandler/KafkaHandlerApplicationTests.java @@ -0,0 +1,13 @@ +package com.bfd.crawl.kafkahandler; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class KafkaHandlerApplicationTests { + + @Test + void contextLoads() { + } + +}