Add rebus (#1)

Reviewed-on: #1
Co-authored-by: Sergey Nazarov <insight.appdev@gmail.com>
Co-committed-by: Sergey Nazarov <insight.appdev@gmail.com>
This commit is contained in:
Sergey Nazarov 2024-03-20 18:23:57 +00:00 committed by nazarovsa
parent ffad0d2add
commit 0343dc4c5d
9 changed files with 75 additions and 7 deletions

View File

@ -9,6 +9,12 @@
<PackageVersion Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
<PackageVersion Include="WTelegramClient" Version="3.7.1" />
</ItemGroup>
<ItemGroup Label="Rebus">
<PackageVersion Include="Rebus" Version="8.2.2" />
<PackageVersion Include="Rebus.ServiceProvider" Version="10.1.0" />
<PackageVersion Include="Rebus.RabbitMq" Version="9.0.1" />
<PackageVersion Include="Rebus.Serilog" Version="8.0.0" />
</ItemGroup>
<ItemGroup Label="Serilog">
<PackageVersion Include="Serilog" Version="3.1.1" />
<PackageVersion Include="Serilog.AspNetCore" Version="8.0.1" />

View File

@ -4,6 +4,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
<PackageReference Include="WTelegramClient" />
<PackageReference Include="Rebus" />
</ItemGroup>
<ItemGroup>

View File

@ -1,29 +1,48 @@
using Microsoft.Extensions.Logging;
using Nocr.TelegramListener.Async.Api.Contracts.Events;
using Nocr.TelegramListener.Core.Dates;
using Rebus.Bus;
using TL;
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
public sealed class NewMessageHandler : INewMessageHandler
{
private readonly IBus _bus;
private readonly ILogger<NewMessageHandler> _logger;
private readonly TelegramRegistry _telegramRegistry;
private readonly ICurrentDateProvider _dateProvider;
public NewMessageHandler(ILogger<NewMessageHandler> logger, TelegramRegistry telegramRegistry)
public NewMessageHandler(IBus bus, ILogger<NewMessageHandler> logger, TelegramRegistry telegramRegistry,
ICurrentDateProvider dateProvider)
{
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
_dateProvider = dateProvider ?? throw new ArgumentNullException(nameof(dateProvider));
}
public Task Handle(MessageBase messageBase)
public async Task Handle(MessageBase messageBase)
{
_logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID);
switch (messageBase)
{
case Message m:
if (!string.IsNullOrWhiteSpace(m.message))
break;
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author,
m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
var @event = new MessageReceived
{
From = m.from_id.ID,
ChatId = m.peer_id.ID,
Text = m.message,
OccuredDateTime = _dateProvider.UtcNow
};
await _bus.Publish(@event);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
@ -32,7 +51,5 @@ public sealed class NewMessageHandler : INewMessageHandler
ms.action.GetType().Name[13..]);
break;
}
return Task.CompletedTask;
}
}

View File

@ -2,7 +2,15 @@
namespace Nocr.TelegramListener.Async.Api.Contracts.Events;
public sealed class TextUpdateReceived : IEvent
public sealed class MessageReceived : IEvent
{
public Guid Id { get; } = Guid.NewGuid();
public string Text { get; set; }
public long From { get; set; }
public long ChatId { get; set; }
public DateTimeOffset OccuredDateTime { get; set; }
}

View File

@ -1,6 +1,10 @@
using Microsoft.Extensions.Options;
using Nocr.TelegramListener.AppServices;
using Nocr.TelegramListener.AppServices.UpdateListeners;
using Nocr.TelegramListener.Async.Api.Contracts.Events;
using Nocr.TelegramListener.Core.Dates;
using Rebus.Config;
using Rebus.Routing.TypeBased;
using Rebus.Serialization.Json;
namespace Nocr.TelegramListener.Host.Infrastructure;
@ -16,11 +20,27 @@ public class Startup
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ICurrentDateProvider, DefaultCurrentDateProvider>();
services.AddAppServices(Configuration);
services.Configure<RebusRabbitMqOptions>(Configuration.GetSection(nameof(RebusRabbitMqOptions)));
services.AddRebus((builder, ctx) =>
builder.Transport(t =>
t.UseRabbitMq(ctx.GetRequiredService<IOptions<RebusRabbitMqOptions>>().Value.ConnectionString,
ctx.GetRequiredService<IOptions<RebusRabbitMqOptions>>().Value.InputQueueName)
.DefaultQueueOptions(queue => queue.SetDurable(true))
.ExchangeNames("nocr.direct", "nocr.topics"))
.Serialization(s => s.UseSystemTextJson())
.Logging(l => l.Serilog()));
}
public void Configure(IApplicationBuilder app)
{
}
public sealed class RebusRabbitMqOptions
{
public string ConnectionString { get; set; }
public string InputQueueName { get; set; }
}
}

View File

@ -8,6 +8,13 @@
<PackageReference Include="Serilog.AspNetCore" />
</ItemGroup>
<ItemGroup Label="Rebus">
<PackageReference Include="Rebus" />
<PackageReference Include="Rebus.ServiceProvider" />
<PackageReference Include="Rebus.RabbitMq" />
<PackageReference Include="Rebus.Serilog" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Nocr.TelegramListener.AppServices\Nocr.TelegramListener.AppServices.csproj" />
</ItemGroup>

View File

@ -8,5 +8,8 @@
}
}
]
},
"RebusRabbitMqOptions": {
"ConnectionString": "amqp://admin:admin@localhost:5672/"
}
}

View File

@ -21,5 +21,8 @@
}
}
]
},
"RebusRabbitMqOptions": {
"ConnectionString": "amqp://admin:admin@nocr-rabbitmq:5672/"
}
}

View File

@ -8,5 +8,8 @@
"System.Net.Http.HttpClient": "Warning"
}
}
},
"RebusRabbitMqOptions": {
"InputQueueName": "nocr.telegram.listener.queue"
}
}