using MongoDB.Driver.Core.Configuration; using MySqlConnector; using OfficeOpenXml.FormulaParsing.Excel.Functions.Logical; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Text.Json; using System.Threading.Tasks; using static SKIT.FlurlHttpClient.Wechat.Api.Models.ComponentTCBCreateCloudBaseRunServerVersionRequest.Types; using static SKIT.FlurlHttpClient.Wechat.TenpayV3.Models.CreateRefundDomesticRefundRequest.Types.Amount.Types; namespace Admin.NET.Core; [JobDetail("job_orderSync", Description = "同步用户已支付订单", GroupName = "default", Concurrent = false)] public class DynamicOrdersJob : IJob { private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; // 数据库连接字符串和 Webhook 配置 private const string ConnectionString = "DataSource=192.168.104.103; Port=3306;SslMode=None;Database=crmeb; Uid=root; Pwd=mysql_Fz7crZ;CharSet=utf8mb4;"; private static readonly HttpClient HttpClient = new(); public DynamicOrdersJob(IServiceProvider serviceProvider, ILoggerFactory loggerFactory) { _serviceProvider = serviceProvider; _logger = loggerFactory.CreateLogger(CommonConst.SysLogCategoryName); } public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken) { await CheckAndSendWebhooksAsync(); string msg = $"【{DateTime.Now}】同步用户已支付订单..."; var originColor = Console.ForegroundColor; Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(msg); Console.ForegroundColor = originColor; _logger.LogInformation(msg); } private async Task CheckAndSendWebhooksAsync() { // 使用独立的连接创建命令 using (var connection = new MySqlConnection(ConnectionString)) { await connection.OpenAsync(); // 每次执行时打开一个新的连接 // 使用现有连接查询未处理事件 using var command = new MySqlCommand(@" SELECT we.event_id, we.order_id, we.unionid, we.cart_id, we.phone, we.amount, es.title FROM webhook_events we LEFT JOIN eb_special es ON we.cart_id = es.id WHERE we.processed = FALSE", connection); using var reader = await command.ExecuteReaderAsync(); while (await reader.ReadAsync()) { int eventId = reader.GetInt32("event_id"); string orderId = reader.GetString("order_id"); string cartId = reader.GetString("cart_id"); // 商品id string phone = reader.GetString("phone"); // 手机号 string amount = reader.GetString("amount"); string cartName = reader.GetString("title"); string unionid = reader.GetString("unionid"); // 发送 Webhook 请求 if (await SendWebhookAsync(new SysWechatUserMapCarts() { Amount = amount, cartId = cartId, CreateTime = DateTime.Now, CreateUserName = "system", Unionid = unionid, IsDelete = false, OrderId = orderId, cartName = cartName, Phone = phone, UpdateTime = DateTime.Now, UpdateUserName = "system", })) { // 如果发送成功,将事件标记为已处理 await MarkEventAsProcessedAsync(eventId); } } await reader.CloseAsync(); // 确保每次查询结束后关闭读取器 } } /// /// 同步订单信息 /// /// /// private async Task SendWebhookAsync(SysWechatUserMapCarts carts) { using var serviceScope = _serviceProvider.CreateScope(); ////获取用户仓储 var rep = serviceScope.ServiceProvider.GetService>(); if (string.IsNullOrWhiteSpace(carts.Phone) && !string.IsNullOrWhiteSpace(carts.Unionid)) { var user = serviceScope.ServiceProvider.GetService>();//判断是否有该用户 var wxUser = await user.GetFirstAsync(e => e.UnionId == carts.Unionid); if (wxUser != null) { carts.Phone = wxUser.Mobile; } } return !await rep.IsAnyAsync(e => e.OrderId == carts.OrderId && e.IsDelete == false) ? await rep.InsertAsync(carts) : false; } private async Task MarkEventAsProcessedAsync(int eventId) { // 使用 using 确保连接使用后正确释放 using (var connection = new MySqlConnection(ConnectionString)) { await connection.OpenAsync(); // 打开连接 var command = new MySqlCommand("UPDATE webhook_events SET processed = TRUE WHERE event_id = @eventId", connection); command.Parameters.AddWithValue("@eventId", eventId); await command.ExecuteNonQueryAsync(); // 执行更新操作 } } }