From 07eced36568aaa3d130e3795a549fc1ce5d41672 Mon Sep 17 00:00:00 2001 From: "steve.gao" Date: Tue, 10 Dec 2024 14:51:21 +0800 Subject: [PATCH] initial --- README.md | 33 +++++ kafka_utils.js | 75 ++++++++++ server.js | 413 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ upload_files.js | 99 ++++++++++++++ utils.js | 22 +++ 5 files changed, 642 insertions(+) create mode 100644 README.md create mode 100644 kafka_utils.js create mode 100644 server.js create mode 100644 upload_files.js create mode 100644 utils.js diff --git a/README.md b/README.md new file mode 100644 index 0000000..db2ef16 --- /dev/null +++ b/README.md @@ -0,0 +1,33 @@ +# whatsapp-wppconnect + +whatsapp采集机器人: 监控目标账号的群组聊天信息 +- 项目框架:`https://github.com/wppconnect-team/wppconnect` + +环境安装配置: +- 环境配置:nodejs 18.0+ 目前测试稳定 +- 项目依赖安装:`npm i --save @wppconnect-team/wppconnect` +- nodejs推送kakfa依赖: `npm install kafkajs` +- api接口框架安装:`npm install express --save` +- 服务器管理nodejs项目:`npm install pm2 -g` + +部署情况: +- 部署服务器: +- 推送kafka: + - brokers: + - topic: `whatsapp_test1` + + +支持功能: +- 监听聊天信息:支持文本类和媒体类("chat", "image", "video", "sticker", "ptt", "document", "audio") + - 文本类:"chat" + - 纯文本 + - 外部链接:引用链接缩略图、引用标题、引用介绍 等内容 + - 媒体类:"image", "video", "sticker", "ptt", "document", "audio" +- 监听群组的状态:现在只监听被移除的状态 + +- 支持接口: + - 加群 + - 获取用户头像 + - 获取群组信息头像 + - 获取群组信息(包含部分群成员) + - 获取当前群组信息 \ No newline at end of file diff --git a/kafka_utils.js b/kafka_utils.js new file mode 100644 index 0000000..043cfb4 --- /dev/null +++ b/kafka_utils.js @@ -0,0 +1,75 @@ +const {Kafka} = require("kafkajs"); +const brokers = [] // TODO:测试kafka + +/** + * + * 生成kafka 的生产者 + * + **/ +function create_kafka_client(){ + // 初始化kafka服务 + const kafka = new Kafka({ + clientId: 'my-app', + brokers: brokers + }) + const producer = kafka.producer(); // 启动生产者 + console.log("初始化kafka 成功"); + return producer; +} + +// 发送 消息 关于推送消息 会过滤为undefined的key ***** +const push_data = async(producer, message, topic) => { + await producer.connect(); + // console.log("推送数据为:", message); + let res = await producer.send({ + topic: topic, + messages: [ + {value: JSON.stringify(message)} + ], + }); + if(res){ + console.log("推送成功!"); + } +} + + + +// var message = { +// group_id: '120363043137963504@g.us', +// sender_name: 'Última HORA 505', +// sender_id: '50582068410@c.us', +// message_id: 'false_120363043137963504@g.us_08290A24DFDD4E24A5C5DDEC2158AFD7_50582068410@c.us', +// reply_to_msg_id: undefined, +// message_text: 'https://www.facebook.com/share/v/3UZiZm8NC1vnL6h3/?mibextid=oFDknk', +// media: 'http://172.18.1.180:9980/group17/default/20240325/11/26/3/thubm.jpeg', +// datetime: '2024-03-25 03:26:20', +// type: 'chat', +// mimetype: undefined, +// subtype: 'url', +// link_title: '#ULTIMA_HORA 🚨🚨🚨🇳🇮 En Matagalpa, está cayendo un fuerte aguacero... De que zona de ese departamento lo reportan? #Matagalpa #Noticiero #Sucesos... | By Última HORA 505Facebook', +// description: '#ULTIMA_HORA 🚨🚨🚨🇳🇮 En Matagalpa, está cayendo un fuerte aguacero... De que zona de ese departamento lo reportan? #Matagalpa #Noticiero #Sucesos...' +// } +// +// +//{ +// group_id: '120363031969802116@g.us', +// sender_name: 'José Ángel', +// sender_id: '50586212116@c.us', +// message_id: 'false_120363031969802116@g.us_7B01725DAA5C59CBFD5B8EB15187A82D_50586212116@c.us', +// reply_to_msg_id: undefined, +// message_text: undefined, +// media: 'error', +// datetime: '2024-03-28 05:06:44', +// type: 'video', +// mimetype: 'video/mp4', +// subtype: undefined +//} +// +// push_data(create_kafka_client(), message, "whatsapp_test1"); + + + +module.exports = { + push_data, + create_kafka_client +} diff --git a/server.js b/server.js new file mode 100644 index 0000000..155ef25 --- /dev/null +++ b/server.js @@ -0,0 +1,413 @@ +/** + * pm2 管理启动 + * 启动: pm2 start server + * 停止:pm2 stop server + * + **/ + +const wppconnect = require('@wppconnect-team/wppconnect'); +const bodyParser = require("body-parser"); +const express = require("express"); +const {upload_files, saveToLocal} = require("./upload_files"); +const utils = require("./utils"); +const {push_data, create_kafka_client} = require("./kafka_utils"); +const topic_message = "whatsapp_test1"; +// const topic_status = "whatsapp_test2"; +const user_iphone = '8615201683893@c.us'; // 当前登陆账号手机 用于判断被踢的是不是自己 + +// 封装部分日志打印内容 新增日期显示 +{ + const newLog = function () { + process.stdout.write(`${utils.timestampToTime(new Date() / 1000)} `) + arguments.callee.oLog.apply(this, arguments); + }; + const newError = function () { + process.stdout.write(`${utils.timestampToTime(new Date() / 1000)} `) + arguments.callee.oError.apply(this, arguments); + }; + newLog.oLog = console.log; + newError.oError = console.error; + console.log = newLog; + console.error = newError; +} + +// 初始化express服务 +const app = express(); +const PORT = 3000; +app.use(bodyParser.json()); +app.use(bodyParser.urlencoded({extended: true})); + + +// 反复登陆可能是网络问题 + +wppconnect.create({ + session: 'olive', + statusFind: (statusSession, session) => { + console.log('Status Session: ', statusSession); //return isLogged || notLogged || browserClose || qrReadSuccess || qrReadFail || autocloseCalled || desconnectedMobile || deleteToken + console.log('Session name: ', session); + }, + headless: "new", + disableWelcome:true, + browserArgs: [ + '--disable-setuid-sandbox', + '--no-sandbox', + '--safebrowsing-disable-auto-update', + '--disable-features=LeakyPeeker' + ], + puppeteerOptions: { + args: ["--no-sandbox", '--disable-setuid-sandbox'], + } + }).then((client) => start(client)).catch((error) => console.log(error)); + +async function _download_file(client, message_id, mine, file_name){ + let response = undefined; + try{ + var base64 = await client.downloadMedia(message_id); + response = await upload_files(base64, mine, file_name); + }catch(error){ + console.error("[onMessage] 下载异常: ", error); + response = "protocolTimeout"; + } + return response; +} + + +/** + * + * 下载文件流程 + **/ +async function download_file(client, message_id, message_type, mine){ + console.log("下载文件流程: ", message_type, mine); + let response = {media_data: undefined, local_data: undefined}; + switch(message_type){ + case "image": + var file_name = "image.jpg"; + response = await _download_file(client, message_id, mine, file_name); // 新增下载方法 + break; + case "video": + var file_name = "video.mp4"; + response = await _download_file(client,message_id, mine, file_name); // 新增下载方法 + break; + case "ptt": // opus + var file_name = "audio.opus"; + response = await _download_file(client,message_id, mine, file_name); // 新增下载方法 + break; + case "sticker": // webp + var file_name = "sticker.webp"; + response = await _download_file(client,message_id, mine, file_name); // 新增下载方法 + + break; + case "audio": + var file_name = mine.replace("/", "."); + response = await _download_file(client,message_id, mine, file_name); // 新增下载方法 + + break; + case "document": + var file_name = mine.replace("/", "."); + response = await _download_file(client,message_id, mine, file_name); // 新增下载方法 + break; + } + +// if(response === "error"){ +// let result = saveToLocal(base64, mine); +// console.error("上传失败: 保存到本地 ", result); +// return {media_data: response, local_data: result} +// } + + return response; +} + + + +/** + * 服务启动入口 + **/ +function start(client) { + + // 获得kafka对象的生产者 + const producer = create_kafka_client(); + + // 监控消息部分 + // ---------------------------------------------------------- + client.onMessage(async (message) => { + console.log("[onMessage] 原始数据:", message); + // 拦截消息类型 + let chats_type = ["chat", "image", "video", "sticker", "ptt", "document", "audio"] + // 拦截群组信息类型 + let other_type = ["remove"] + console.log("[onMessage] 原始数据类型:", message.type); + if(message.isGroupMsg === true && chats_type.indexOf(message.type) > -1){ + let data = { // subtype + group_id: message.chatId, // 没有群组名称 + sender_name: message.sender.pushname, + sender_id: message.sender.id, + message_id :message.id, + reply_to_msg_id: message.quotedMsgId, // 这个不一定有这个字段 表示为回复内容 + message_text: undefined, + media: undefined, + datetime: utils.timestampToTime(message.timestamp), // 修改时间戳为 yymmdd hhmmss + type: message.type, + mimetype: message.mimetype, + subtype: message.subtype + } + + // 下载资源 + let {media_data, local_data} = await download_file(client, message.id, message.type, message.mimetype); + data["media"] = media_data; + data["local_data"] = local_data; // 失败的话下载到本地 + + // 是否有资源 影响对应的字段 + if(media_data){ // 有资源文件的 + data["message_text"] = message.caption; + if(media_data === "protocolTimeout"){ // 如果是异常状态则修改为undefined + data["media"] = undefined; + console.log("下载失败 舍弃本条数据: ", data); + return; + } + }else{ + data["message_text"] = message.content; + } + + if(message.subtype === "url"){ // chat下的url 表示为引用有外部链接 + // message.thumbnail // 缩略图 base64 + media_data = await upload_files(message.thumbnail, "image/jpeg", "thubm.jpeg"); + console.log("[onMessage] 下载相关链接缩略图") + data["media"] = media_data; + data["link_title"] = message.title; + data["description"] = message.description; + } + + console.log("[onMessage] 封装数据:", data); + + try{ // 推送 消息 + await push_data(producer, data, topic_message); + }catch(error){ + console.log("[onMessage] 推送异常: ", error); + } + + }else if(message.type === "gp2" && other_type.indexOf(message.subtype) > -1 && message.recipients.indexOf(user_iphone) > -1){ + // 被踢出群 且还得是自己 别人被踢也能监听到 + let data = { + group_id: message.chatId, + sender_name: message.sender.pushname, + sender_id: message.sender.id, + subtype: message.subtype + } + + let group_data = await client.getContact(message.chatId); + console.log("[onMessage] 被踢群 ", group_data) + data["group_title"] = group_data["name"]; + + try{ // 推送 消息 + await push_data(producer, data, topic_message); + }catch(error){ + console.error("[onMessage] 推送异常: ", error.message); + } + + console.log("[onMessage] kick out of group", data); + } + + }); + + // 接口部分 + // ---------------------------------------------------------- + + // 初始化接口 获取所有群组信息 + app.get('/whatsapp/index', async (req, res) => { + console.log("[index] 获取所有接口信息"); + let result = {code: 200, data: undefined}; // template + try{ + let message = await client.listChats({onlyGroups: true}); // 获取的是聊天框里的所有聊天 即使被踢了 + result["data"] = message + }catch(error){ + console.log(error); + result = {code: 500, data: error.message} + } + res.send(result); + }); + + + + +// http://127.0.0.1:3000/whatsapp/join_group?link=https://chat.whatsapp.com/G1pxOV1USEAGmlvhzYGS8k + // 加群接口 + app.get("/whatsapp/join_group", async (req, res) => { + // 被踢的群就没有办法再加、已经加过的群返回群id {"id": ""} + // TODO: 需要区分不同失败的异常 收集 + // 1. 需要认证的群,返回: + // 2. 提交认证后,返回: already-exists + // 3. 提交认证后且入群,返回 conflict + let result = {code: 200, data: undefined}; // template + let join_link = req.query.link; + console.log("[join_group] 需要添加的群链接为:", join_link); + + try{ + + let data = await client.joinGroup(join_link); + result["data"] = data; + + }catch(error){ + console.log(error); + // 测试error + result = {code: 500, data: error.message} + + if(error.message === "conflict"){ // 针对加完群需要批准的 过滤异常 + let data = await client.getGroupInfoFromInviteLink(join_link); + console.log("[join_group] 通过邀请码反查数据:", data); + result["code"] = 203 + result["data"] = {id: data.id}; + } + + } + + console.log("[join_group] result:", result); + res.send(result); + }); + + + // 获取联系人信息 + app.get("/whatsapp/get_profile", async (req, res) => { + let result = {code: 200, data: undefined}; // template + + try{ + let chat_id = req.query.userId; // 593968824284@c.us + console.log("[get_profile] 需要获得用户id:", chat_id); + let data = await client.getProfilePicFromServer(chat_id); + result["data"] = data; + }catch(error){ + console.log(error); + result = {code: 500, data: error.message} + } + + console.log("[get_profile] result:", result); + res.send(result); + }); + + + // 检查在线状态 + app.get("/whatsapp/check_online", async (req, res) => { + let result = {code: 200, data: undefined}; // template + + try{ + console.log("[check_online] 检查在线状态"); + let data = await client.isOnline(); + result["data"] = data; + }catch(error){ + console.log(error); + result = {code: 500, data: error.message} + } + + console.log("[check_online] result:", result); + res.send(result); + }); + + + // 获得某个聊天的详情介绍 包括头像图片 这个获取群组信息的最全 包含群成员****** + app.get('/whatsapp/get_chat', async (req, res) => { + let result = {code: 200, data: undefined}; // template + + try{ + let chat_id = req.query.chatId; // 593968824284@c.us + console.log("[get_chat] 需要获得内容的chatId:", chat_id); + let data = await client.getChatById(chat_id); + result["data"] = data; + }catch(error){ + console.log(error); + result = {code: 500, data: error.message} + } + console.log("[get_chat] result:", result); + res.send(result); + }); + + + // 获得某个群组 + app.get('/whatsapp/get_contact', async (req, res) => { + let result = {code: 200, data: undefined}; // template + + try{ + let chat_id = req.query.contactId; // 593968824284@c.us + let invoke_id = req.query.inviteCode; + console.log("[get_contact] 需要获得群组内容:", chat_id, invoke_id); + let data = await client.getContact(chat_id); + console.log("[get_contact] data: ", data) +// console.log(data.getProfilePicFromServer()) + result["data"] = data; + if(invoke_id){ // 如果存在邀请码则拼接其他数据 + let data1 = await client.getGroupInfoFromInviteLink(invoke_id); + data1.profilePicThumbObj = data.profilePicThumbObj; // 合并字段 + result["data"] = data1; + } + }catch(error){ + console.log(error); + result = {code: 500, data: error.message} + } + + console.log("[get_contact] result:", result); + res.send(result); + }); + + // 邀请码反查群信息 + app.get('/whatsapp/invoke', async (req, res) => { + let result = {code: 200, data: undefined}; // template + + try{ + let chat_id = req.query.inviteCode; // 593968824284@c.us + console.log("[invoke] 需要获得群组内容:", chat_id); + let data = await client.getGroupInfoFromInviteLink(chat_id); + result["data"] = data; + }catch(error){ + console.log(error); + result = {code: 500, data: error.message} + } + + console.log("[invoke] result:", result); + res.send(result); + }); + + app.post('/whatsapp/send', async (req, res) => { + console.log("enter send!"); + let result = {code: 200, data: undefined}; // template + + + try{ + let body = req.body; // 获取消息内容以及类型 120363335479943723@g.us + let message_type = body["message_type"]; + let content = body["content"] + let chat_id = body["chat_id"]; + var data = "" + if(message_type === "text"){ // 文本 + data = await client.sendText(chat_id, content); // 指定目标群组 + 内容 + }else if(message_type === "file"){ // 文件 content 内容支持 本地文件/ data:text/plain;base64 + data = await client.sendFile(chat_id, content); // 指定目标群组 + 内容 + }else if(message_type === "image"){ // 图片 content 内容支持 本地文件/link + data = await client.sendImage(chat_id, content); + }else{ + data = ""; + } + console.log("[invoke] data => ", data); + console.log("[invoke] 发送内容的群组id为:", chat_id, content); + result["data"] = "发送成功"; // 由于没有发送成功的回执消息 + }catch(error){ + console.log(error); + result = {code: 500, data: error.message} + } + + console.log("[invoke] result:", result); + res.send(result); + + }); + + + // 开起服务 + app.listen(PORT, '0.0.0.0', () => console.log("开启服务,端口3000", new Date().toString())); + + +} + + + + + + + + diff --git a/upload_files.js b/upload_files.js new file mode 100644 index 0000000..4b01d16 --- /dev/null +++ b/upload_files.js @@ -0,0 +1,99 @@ +'use strict' +const axios = require('axios'); +const fs = require('fs'); +const url = ""; // TODO: 上传路径修改 +const dir = "/opt/crawl/whatsapp/whatsapp-wppconnect/sources/" + + + +/** + * base64数据流转化为 适合formdata的格式 + **/ +function base64ToBlob(base64, mime) { + let raw; + if(base64.indexOf(";base64,") > -1){ // 如果存在base64这个前缀的话 + const parts = base64.split(';base64,'); + const contentType = parts[0].split(':')[1]; + // console.log(parts[1]); + raw = atob(parts[1]); + }else{ + raw = atob(base64); + } + + const rawLength = raw.length; + const uInt8Array = new Uint8Array(rawLength); + + for (let i = 0; i < rawLength; ++i) { + uInt8Array[i] = raw.charCodeAt(i); + } + return new Blob([uInt8Array], {type: mime}); +} + + +/** +* 如果失败则下载到本地 +**/ +function saveToLocal(base64, mime){ + var file_name = "".concat(new Date()* 1000, "_".concat(mime.replace("/", "."))) + file_name = dir + file_name; + + let videoData; + if(base64.indexOf(";base64,") > -1){ // 如果存在base64这个前缀的话 + const parts = base64.split(';base64,'); + const contentType = parts[0].split(':')[1]; + videoData = Buffer.from(parts[1], 'base64'); + }else{ + videoData = Buffer.from(base64, 'base64'); + } + + fs.writeFile(file_name, videoData, 'base64', (err) => { + if (err) { + console.error(err); + console.error("视频保存失败: ", base64); + return; + } + console.log('视频已保存为 ' + file_name); + }); + + return file_name; +} + + +async function upload_files(base64, mime, file_name){ + // 创建一个Blob对象 + const blob = base64ToBlob(base64, mime); + // 创建FormData对象并添加文件 + const formData = new FormData(); + formData.append('file', blob, file_name); // 'image.png'是文件名 + + const headers = { + 'Content-Type': 'multipart/form-data' + } + + // 这个包可以 + try{ + let res = await axios.post(url, formData, { + headers:headers, + timeout: 20000 // 设置超时时间 + }) + console.log("上传成功:", res["data"]); +// return res["data"]; + return {media_data: res["data"], local_data: undefined}; + }catch(error){ + console.error('上传失败:', error); + let result = saveToLocal(base64, mime); + console.error("上传失败: 保存到本地 ", result); + return {media_data: "error", local_data: result} +// return "error"; + } + + // .then(response => console.log(response.data)) + // .catch(error => console.error('Error:', error)); +} + + +module.exports = { + upload_files, + saveToLocal +} + diff --git a/utils.js b/utils.js new file mode 100644 index 0000000..6595e81 --- /dev/null +++ b/utils.js @@ -0,0 +1,22 @@ + +/** + * + * 时间戳转化 datetime格式 + **/ +function timestampToTime(timestamp) { + var date = new Date(timestamp * 1000);//时间戳为10位需*1000,时间戳为13位的话不需乘1000 + var Y = date.getFullYear() + '-'; + var M = (date.getMonth()+1 < 10 ? '0'+(date.getMonth()+1):date.getMonth()+1) + '-'; + var D = (date.getDate()< 10 ? '0'+date.getDate():date.getDate())+ ' '; + var h = (date.getHours() < 10 ? '0'+date.getHours():date.getHours())+ ':'; + var m = (date.getMinutes() < 10 ? '0'+date.getMinutes():date.getMinutes()) + ':'; + var s = date.getSeconds() < 10 ? '0'+date.getSeconds():date.getSeconds(); + return Y+M+D+h+m+s; +} + +// console.log(timestampToTime(1711016965)); + + +module.exports = { + timestampToTime +} \ No newline at end of file