From 0343dc4c5dff7429dfd64f7fa68e51a55b3e2bc3 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Wed, 20 Mar 2024 18:23:57 +0000 Subject: [PATCH] Add rebus (#1) Reviewed-on: https://gitea.musk.fun/nocr/telegram-listener/pulls/1 Co-authored-by: Sergey Nazarov Co-committed-by: Sergey Nazarov --- Directory.Packages.props | 6 +++++ .../Nocr.TelegramListener.AppServices.csproj | 1 + .../Implementation/NewMessageHandler.cs | 25 ++++++++++++++++--- .../Events/TextUpdateReceived.cs | 10 +++++++- .../Infrastructure/Startup.cs | 24 ++++++++++++++++-- .../Nocr.TelegramListener.Host.csproj | 7 ++++++ .../appsettings.Development.json | 3 +++ .../appsettings.Production.json | 3 +++ .../appsettings.json | 3 +++ 9 files changed, 75 insertions(+), 7 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index dc25262..11eb8c4 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -9,6 +9,12 @@ + + + + + + diff --git a/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj b/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj index 96f2b81..a1c36c8 100644 --- a/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj +++ b/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj @@ -4,6 +4,7 @@ + diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs index 735527d..d9a4807 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs @@ -1,29 +1,48 @@ using Microsoft.Extensions.Logging; +using Nocr.TelegramListener.Async.Api.Contracts.Events; +using Nocr.TelegramListener.Core.Dates; +using Rebus.Bus; using TL; namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers; public sealed class NewMessageHandler : INewMessageHandler { + private readonly IBus _bus; private readonly ILogger _logger; private readonly TelegramRegistry _telegramRegistry; + private readonly ICurrentDateProvider _dateProvider; - public NewMessageHandler(ILogger logger, TelegramRegistry telegramRegistry) + public NewMessageHandler(IBus bus, ILogger logger, TelegramRegistry telegramRegistry, + ICurrentDateProvider dateProvider) { + _bus = bus ?? throw new ArgumentNullException(nameof(bus)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry)); + _dateProvider = dateProvider ?? throw new ArgumentNullException(nameof(dateProvider)); } - public Task Handle(MessageBase messageBase) + public async Task Handle(MessageBase messageBase) { _logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID); switch (messageBase) { case Message m: + if (!string.IsNullOrWhiteSpace(m.message)) + break; + _logger.LogInformation("{From} in {Chat} > {MessageText}", m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author, m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), m.message); + var @event = new MessageReceived + { + From = m.from_id.ID, + ChatId = m.peer_id.ID, + Text = m.message, + OccuredDateTime = _dateProvider.UtcNow + }; + await _bus.Publish(@event); break; case MessageService ms: _logger.LogInformation("{From} in {Chat} > [{Action}]", @@ -32,7 +51,5 @@ public sealed class NewMessageHandler : INewMessageHandler ms.action.GetType().Name[13..]); break; } - - return Task.CompletedTask; } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.Async.Api.Contracts/Events/TextUpdateReceived.cs b/src/Nocr.TelegramListener.Async.Api.Contracts/Events/TextUpdateReceived.cs index 4cc2a9a..f6eb77e 100644 --- a/src/Nocr.TelegramListener.Async.Api.Contracts/Events/TextUpdateReceived.cs +++ b/src/Nocr.TelegramListener.Async.Api.Contracts/Events/TextUpdateReceived.cs @@ -2,7 +2,15 @@ namespace Nocr.TelegramListener.Async.Api.Contracts.Events; -public sealed class TextUpdateReceived : IEvent +public sealed class MessageReceived : IEvent { public Guid Id { get; } = Guid.NewGuid(); + + public string Text { get; set; } + + public long From { get; set; } + + public long ChatId { get; set; } + + public DateTimeOffset OccuredDateTime { get; set; } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs b/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs index 075211f..02b9a97 100644 --- a/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs +++ b/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs @@ -1,6 +1,10 @@ +using Microsoft.Extensions.Options; using Nocr.TelegramListener.AppServices; -using Nocr.TelegramListener.AppServices.UpdateListeners; +using Nocr.TelegramListener.Async.Api.Contracts.Events; using Nocr.TelegramListener.Core.Dates; +using Rebus.Config; +using Rebus.Routing.TypeBased; +using Rebus.Serialization.Json; namespace Nocr.TelegramListener.Host.Infrastructure; @@ -16,11 +20,27 @@ public class Startup public void ConfigureServices(IServiceCollection services) { services.AddSingleton(); - services.AddAppServices(Configuration); + + services.Configure(Configuration.GetSection(nameof(RebusRabbitMqOptions))); + services.AddRebus((builder, ctx) => + builder.Transport(t => + t.UseRabbitMq(ctx.GetRequiredService>().Value.ConnectionString, + ctx.GetRequiredService>().Value.InputQueueName) + .DefaultQueueOptions(queue => queue.SetDurable(true)) + .ExchangeNames("nocr.direct", "nocr.topics")) + .Serialization(s => s.UseSystemTextJson()) + .Logging(l => l.Serilog())); } public void Configure(IApplicationBuilder app) { } + + public sealed class RebusRabbitMqOptions + { + public string ConnectionString { get; set; } + + public string InputQueueName { get; set; } + } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.Host/Nocr.TelegramListener.Host.csproj b/src/Nocr.TelegramListener.Host/Nocr.TelegramListener.Host.csproj index 9cafcba..c046fd2 100644 --- a/src/Nocr.TelegramListener.Host/Nocr.TelegramListener.Host.csproj +++ b/src/Nocr.TelegramListener.Host/Nocr.TelegramListener.Host.csproj @@ -8,6 +8,13 @@ + + + + + + + diff --git a/src/Nocr.TelegramListener.Host/appsettings.Development.json b/src/Nocr.TelegramListener.Host/appsettings.Development.json index fdf6b48..2d26fac 100644 --- a/src/Nocr.TelegramListener.Host/appsettings.Development.json +++ b/src/Nocr.TelegramListener.Host/appsettings.Development.json @@ -8,5 +8,8 @@ } } ] + }, + "RebusRabbitMqOptions": { + "ConnectionString": "amqp://admin:admin@localhost:5672/" } } diff --git a/src/Nocr.TelegramListener.Host/appsettings.Production.json b/src/Nocr.TelegramListener.Host/appsettings.Production.json index d0a7c99..d3c47a6 100644 --- a/src/Nocr.TelegramListener.Host/appsettings.Production.json +++ b/src/Nocr.TelegramListener.Host/appsettings.Production.json @@ -21,5 +21,8 @@ } } ] + }, + "RebusRabbitMqOptions": { + "ConnectionString": "amqp://admin:admin@nocr-rabbitmq:5672/" } } diff --git a/src/Nocr.TelegramListener.Host/appsettings.json b/src/Nocr.TelegramListener.Host/appsettings.json index ff9ad20..06b4ea8 100644 --- a/src/Nocr.TelegramListener.Host/appsettings.json +++ b/src/Nocr.TelegramListener.Host/appsettings.json @@ -8,5 +8,8 @@ "System.Net.Http.HttpClient": "Warning" } } + }, + "RebusRabbitMqOptions": { + "InputQueueName": "nocr.telegram.listener.queue" } }