主币种设置

记账模块是我们项目的核心模块,也是用户使用最多的模块,因此这个模块的东西比较多,我们要分为多个部分编写代码。

一、需求

币种设置的需求涉及到了我们前面编写的代码,我们来具体看一下需求。

编号
需求
说明

1

主币种设置

1. 用户可修改主币种;2. 在注册新用户成功后,为用户设置主币种为人民币

二、功能编写

在主币种设置这个需求中我们需要增加一个配置表Config ,这个表用来存储用户的配置,就目前来说我们用这个表存储用户设置的主币种。同时我们也需要增加用来操作配置表的Controller ConfigController,这个Controller 中目前只需要两个Action QueryUpdate。最后我们要在SysUserController中新增一个根据用户Id获取用户信息的Action QueryUserInfo,这个Action 返回的不仅包括SysUser表中的信息,还会返回用户的配置信息(到目前为止)。这些类和方法在这里就不列出了,大家按前面所说的自己动手编写代码实现业务功能,然后对比一下我写的代码。 这里我们讲解一下在注册新用户成功后,为用户设置主币种为人民币这个需求怎么实现。看到这个需求估计大部分人会觉得直接在Register Action 中增加设置主币种的代码就可以了,如果你也是这么想的那就错了。用户注册功能和设置主币种的关系不是很大,我们可以在注册时让系统设置主币种,也可以让用户自己手动设置,但是根据需求来看我们只能采用第一种方法,因此我们要需解决这么一个问题:在保持Register Action 单一职责的情况下,实现在注册时自动设置主币种为人民币。我相信已经有一部分读者想到可以使用通知来实现。是的没错,要实现这个功能我们可以使用通知的方式,也就是说当用户注册成功后Register Action会发送一条通知,告诉Config增加一条用户主币种。实现通知的功能我们可以选择的方法很多:基于事件机制共享对象使用消息队列等,在这里我们为了项目后期的扩展性就选用消息队列 来实现通知的功能,并且选目前在开发领域用的最多的MQ软件**RabbitMQ **。下面我们就一起来看一下如何实现需求吧。

2.1 安装 RabbitMQ

由于现在还处于开发阶段,因此可以把RabbitMQ安装到本地。接下来我们一起将RabbitMQ安装到Docker中吧。打开命令行工具,输入如下命令就可以吧RabbitMQ安装到我们本地了:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq

Tip:对于Docker不熟悉的同学,请先去Docker官网或者前往我的专栏Docker极简教程学习

2.2 实现需求

RabbitMQ 安装完了,我们现在就来开发需求。

  1. 在项目中安装RabbitMQ官方的.NET包RabbitMQ.Client

    dotnet add package RabbitMQ.Client
  2. 在项目中新建一个文件夹MQ,在这个文件夹下创建RabbitMQConnection类,这个类是用来初始化连接,以及释放链接资源,代码如下:

    using RabbitMQ.Client;
    using SporeAccounting.MQ.Model;
    
    namespace SporeAccounting.MQ;
    
    /// <summary>
    /// RabbitMQ连接类
    /// </summary>
    public class RabbitMQConnection : IDisposable
    {
        /// <summary>
        /// 连接
        /// </summary>
        private readonly IConnection _connection;
    
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="options"></param>
        public RabbitMQConnection(RabbitMQOptions options)
        {
            var factory = new ConnectionFactory
            {
                HostName = options.HostName,
                Port = options.Port,
                VirtualHost = options.VirtualHost,
                UserName = options.UserName,
                Password = options.Password
            };
            _connection = factory.CreateConnectionAsync().Result;
        }
    
        /// <summary>
        /// 创建通道
        /// </summary>
        /// <returns></returns>
        public async Task<IChannel> CreateChannel()
        {
            return await _connection.CreateChannelAsync();
        }
    
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose() => _connection.Dispose();
    }

    这段代码实现了一个 RabbitMQConnection 类,用于管理与 RabbitMQ 的连接和通道创建。它接收一个包含 RabbitMQ 配置的 RabbitMQOptions 对象作为参数,通过 ConnectionFactory 初始化连接。配置包括主机名、端口、虚拟主机、用户名和密码等信息。在构造函数中,利用 CreateConnectionAsync().Result 创建同步连接实例 _connection。此外,该类提供了 CreateChannel 方法,通过调用 _connection.CreateChannelAsync() 异步生成消息通道,方便与 RabbitMQ 进行通信。为了防止资源泄漏,RabbitMQConnection 实现了 IDisposable 接口,在调用 Dispose 方法时释放 _connection 占用的资源。这个类封装了连接和通道管理的逻辑,适合在需要频繁访问 RabbitMQ 的场景中使用,提高代码的可维护性和复用性。

  3. 然后我们新建发布类RabbitMQPublisher代码如下:

    using RabbitMQ.Client;
    
    namespace SporeAccounting.MQ;
    
    /// <summary>
    /// RabbitMQ发布者类
    /// </summary>
    public class RabbitMQPublisher
    {
        /// <summary>
        /// RabbitMQ连接
        /// </summary>
        private readonly RabbitMQConnection _connection;
    
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="connection"></param>
        public RabbitMQPublisher(RabbitMQConnection connection)
        {
            _connection = connection;
        }
    
        /// <summary>
        /// 发布消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        public async System.Threading.Tasks.Task Publish<T>(string queue, string routingKey, T message)
        {
            try
            {
                await using var channel = await _connection.CreateChannel();
                await channel.QueueDeclareAsync(queue, durable: true,false,false,null);
                var body = System.Text.Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(message));
                await channel.BasicPublishAsync(exchange: string.Empty, routingKey: routingKey, body: body);
            }
            catch
            {
                throw;
            }
        }
    }

    这段代码实现了一个 RabbitMQPublisher 类,用于向 RabbitMQ 消息队列发布消息。类中通过依赖注入的方式,接受一个 RabbitMQConnection 对象用于管理与 RabbitMQ 的连接。其核心功能集中在 Publish 方法中,该方法负责将消息发送到指定的队列。调用时,方法首先通过 _connection.CreateChannel 异步创建一个消息通道,并确保通道在使用完毕后释放资源。接着调用 QueueDeclareAsync 方法声明队列,确保目标队列存在,并将队列设置为持久化。然后,将传入的消息字符串转换为 UTF-8 字节数组以符合 RabbitMQ 的消息格式要求。最终,通过 BasicPublishAsync 方法发送消息到指定的路由键和队列。此类有效地封装了消息发布的逻辑,提供了简洁的接口来进行队列操作和消息发送,适合构建发布订阅模式的生产者端代码。通过异步编程模式,它可以在处理大量并发消息时保持高性能和资源利用率。

  4. 接着我们编写订阅类RabbitMQSubscriber,代码如下:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    
    namespace SporeAccounting.MQ
    {
        public class RabbitMQSubscriberService
        {
            private readonly RabbitMQConnection _connection;
            private readonly ILogger<RabbitMQSubscriberService> _logger;
    
            public RabbitMQSubscriberService(RabbitMQConnection connection, ILogger<RabbitMQSubscriberService> logger)
            {
                _connection = connection;
                _logger = logger;
            }
    
            /// <summary>
            /// 订阅消息队列
            /// </summary>
            /// <typeparam name="T">消息类型</typeparam>
            /// <param name="queue">队列名称</param>
            /// <param name="routingKey">路由键</param>
            /// <param name="onMessage">处理消息的逻辑</param>
            /// <returns></returns>
            public async System.Threading.Tasks.Task SubscribeAsync<T>(string queue, string routingKey, Action<T> onMessage)
            {
                var channel = await _connection.CreateChannel();
    
                // 声明队列
                await channel.QueueDeclareAsync(queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
    
                // 创建消费者
                var consumer = new AsyncEventingBasicConsumer(channel);
    
                // 绑定接收事件
                consumer.ReceivedAsync += async (sender, @event) =>
                {
                    try
                    {
                        var body = @event.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
    
                        _logger.LogInformation($"Message received from queue '{queue}': {message}");
    
                        // 反序列化并调用处理逻辑
                        var deserializedMessage = System.Text.Json.JsonSerializer.Deserialize<T>(message);
                        onMessage(deserializedMessage);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, $"Error processing message from queue '{queue}'.");
                    }
    
                    await System.Threading.Tasks.Task.CompletedTask;
                };
    
                // 开始消费队列
                await channel.BasicConsumeAsync(queue: queue, autoAck: true, consumer: consumer);
                _logger.LogInformation($"Subscribed to queue '{queue}' with routing key '{routingKey}'.");
            }
        }
    }

    这段代码定义了一个名为 RabbitMQSubscriber 的类,用于实现 RabbitMQ 的订阅功能。类中包含一个 _connection 字段,表示与 RabbitMQ 的连接,通过依赖注入的方式传入 RabbitMQConnection 对象并在构造函数中初始化。核心方法是 Subscribe,用于订阅指定队列的消息。调用时需要传入队列名称、路由键以及一个处理消息的回调函数 onMessage。在方法内部,先通过 _connection 创建一个通道,然后声明队列以确保其存在。随后,实例化 AsyncEventingBasicConsumer 作为消息消费者,并在其 ReceivedAsync 事件中注册逻辑:每当接收到消息时,将消息体从字节数组解码为字符串,并通过 onMessage 回调执行自定义处理逻辑。最后,通过 BasicConsumeAsync 启动对队列的监听,启用消息自动确认模式,完成订阅过程。这种设计使得消息处理逻辑可动态配置,适用于异步场景。

  5. 最后我们创建基于 RabbitMQ 的后台服务 RabbitMQBackgroundService ,这个服务用于将全部订阅置于后台。

    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using SporeAccounting.Models;
    using SporeAccounting.MQ.Message.Model;
    using SporeAccounting.Server.Interface;
    
    namespace SporeAccounting.MQ
    {
        /// <summary>
        /// RabbitMQBackgroundService
        /// </summary>
        public class RabbitMQBackgroundService : IHostedService
        {
            private readonly RabbitMQSubscriberService _subscriberService;
            private readonly ILogger<RabbitMQBackgroundService> _logger;
            private readonly IServiceProvider _serviceProvider;
    
            public RabbitMQBackgroundService(RabbitMQSubscriberService subscriberService,
                ILogger<RabbitMQBackgroundService> logger,
                IServiceProvider serviceProvider)
            {
                _subscriberService = subscriberService;
                _logger = logger;
                _serviceProvider = serviceProvider;
            }
    
            public async System.Threading.Tasks.Task StartAsync(CancellationToken cancellationToken)
            {
                _logger.LogInformation("Starting RabbitMQ subscription service...");
    
                // 配置多个队列订阅
                await _subscriberService.SubscribeAsync<MainCurrency>("SetMainCurrency", "SetMainCurrency",
                    (mainCurrency) =>
                    {
                        using var scope = _serviceProvider.CreateScope();
                        var configService = scope.ServiceProvider.GetRequiredService<IConfigServer>();
                        configService.Add(new Config()
                        {
                            Id = Guid.NewGuid().ToString(),
                            UserId = mainCurrency.UserId,
                            Value = mainCurrency.Currency,
                            ConfigTypeEnum = ConfigTypeEnum.Currency,
                            CreateDateTime = DateTime.Now,
                            CreateUserId = mainCurrency.UserId
                        });
                    });
            }
    
            public System.Threading.Tasks.Task StopAsync(CancellationToken cancellationToken)
            {
                _logger.LogInformation("Stopping RabbitMQ subscription service...");
                return System.Threading.Tasks.Task.CompletedTask;
            }
        }
    }

    这段代码实现了一个基于 RabbitMQ 的后台服务,继承自 IHostedService,用于管理服务的启动和停止。 在 StartAsync 方法中,服务通过 RabbitMQSubscriberService.SubscribeAsync 订阅 SetMainCurrency 队列的消息。收到消息后,回调方法会使用 IServiceProvider 创建作用域,从中获取 IConfigServer 服务,将消息内容(如用户 ID 和货币类型)存储为配置数据。处理逻辑包括生成 Config 实体并调用 Add 方法保存到存储。在 StopAsync 方法中,仅记录服务停止日志。

  6. 我们将前面编写的类注入到项目中,代码如下:

    builder.Services.AddSingleton(new RabbitMQOptions
    {
        HostName = configurationManager["RabbitMQ:Host"],
        Port = int.Parse(configurationManager["RabbitMQ:Port"]),
        UserName = configurationManager["RabbitMQ:UserName"],
        Password = configurationManager["RabbitMQ:Password"],
        VirtualHost = configurationManager["RabbitMQ:VirtualHost"],
    });
    builder.Services.AddSingleton<RabbitMQConnection>();
    builder.Services.AddSingleton<RabbitMQPublisher>();
    // 注册通用订阅服务
    builder.Services.AddSingleton<RabbitMQSubscriberService>();
    // 注册后台服务,用于启动订阅
    builder.Services.AddHostedService<RabbitMQBackgroundService>();

    这段代码通过依赖注入配置 RabbitMQ 服务,以支持消息队列的发布与订阅功能。首先,RabbitMQOptions 使用 AddSingleton 注册为单例,从 configurationManager 加载 RabbitMQ 的连接配置信息,包括主机名、端口、用户名、密码和虚拟主机等。接着,RabbitMQConnection 被注册为单例,用于管理与 RabbitMQ 的连接。RabbitMQPublisher 用于发布消息,RabbitMQSubscriberService 提供通用的订阅逻辑。这些服务为消息队列的核心功能提供支持。此外,RabbitMQBackgroundService 作为后台服务被注册,应用启动时会自动启动,用于初始化订阅功能,监听队列并处理消息。

  7. 接下来我们在Register Action 中添加发布设置主币种消息的代码,更新后的代码如下:

    /// <summary>
    /// 注册
    /// </summary>
    /// <param name="sysUserViewModel"></param>
    /// <returns></returns>
    [HttpPost]
    [Route("Register")]
    public ActionResult<ResponseData<bool>> Register(SysUserViewModel sysUserViewModel)
    {
       try
       {
           var role = _sysRoleServer.QueryByName("Consumer");
           SysUser sysUser = _mapper.Map<SysUser>(sysUserViewModel);
           sysUser.Salt = Guid.NewGuid().ToString("N");
           sysUser.Password = HashPasswordWithSalt(sysUser.Password, sysUser.Salt);
           sysUser.CreateUserId = sysUser.Id;
           sysUser.CreateDateTime = DateTime.Now;
           sysUser.RoleId = role.Id;
           _sysUserServer.Add(sysUser);
           //发布设置主币种消息
           MainCurrency mainCurrency = new MainCurrency()
           {
               UserId = sysUser.Id,
               Currency = "e7b3e54d-dbf3-432e-b6fb-b251ffa844b6"
           };
           _ = _rabbitMqPublisher.Publish<MainCurrency>("SetMainCurrency", "SetMainCurrency", mainCurrency);
           return Ok(new ResponseData<bool>(HttpStatusCode.OK, "", false));
       }
       catch (Exception ex)
       {
           return Ok(new ResponseData<bool>(HttpStatusCode.InternalServerError, "服务端异常", false));
       }
    }

三、总结

这篇文章我们实现了在注册时设置主币种的功能。我们之所以使用消息队列实现是因为注册功能和设置主币种属于两个业务,因此我们需要将它们分离出来,同时即使设置主币种操作失败了也不影响注册功能。 这里只列出了核心的类和方法,其他的方法以及和Config相关的操作请大家自己动手来实现一下,完成后和我的代码对比一下看看有什么不同。

Last updated