Add message dispatcher (#3)

Reviewed-on: #3
Co-authored-by: Sergey Nazarov <insight.appdev@gmail.com>
Co-committed-by: Sergey Nazarov <insight.appdev@gmail.com>
This commit is contained in:
Sergey Nazarov 2024-03-28 09:51:19 +00:00 committed by nazarovsa
parent 3bca09b3b9
commit c8da9e618b
24 changed files with 302 additions and 57 deletions

View File

@ -7,9 +7,9 @@
<InsightTelegramBotVersion>0.16.0</InsightTelegramBotVersion>
</PropertyGroup>
<ItemGroup Label="Nocr">
<PackageVersion Include="Nocr.TextMatcher.Api.Contracts" Version="0.4.22" />
<PackageVersion Include="Nocr.TextMatcher.Async.Api.Contracts" Version="0.4.22" />
<PackageVersion Include="Nocr.Users.Api.Contracts" Version="0.4.22" />
<PackageVersion Include="Nocr.TextMatcher.Api.Contracts" Version="0.4.23" />
<PackageVersion Include="Nocr.TextMatcher.Async.Api.Contracts" Version="0.4.23" />
<PackageVersion Include="Nocr.Users.Api.Contracts" Version="0.4.23" />
</ItemGroup>
<ItemGroup Label="Rebus">
<PackageVersion Include="Rebus" Version="8.2.2" />
@ -30,5 +30,6 @@
<ItemGroup Label="Microsoft">
<PackageVersion Include="Microsoft.Extensions.Http" Version="$(MicrosoftVersion)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftVersion)" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftVersion)" />
</ItemGroup>
</Project>

View File

@ -1,10 +1,11 @@
using Insight.TelegramBot;
using Insight.TelegramBot.Models;
using Microsoft.Extensions.Logging;
using Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
using Telegram.Bot;
using Telegram.Bot.Types;
namespace Nocr.TelegramClient.AppServices;
namespace Nocr.TelegramClient.AppServices.Bots;
public sealed class BotClient : Bot
{

View File

@ -0,0 +1,7 @@
using Nocr.TelegramClient.Core.BackgroundJobs;
namespace Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
public interface IMessageDispatcherHandler : IRepeatableBackgroundServiceHandler
{
}

View File

@ -0,0 +1,10 @@
using Insight.TelegramBot.Models;
namespace Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
public interface IMessageDispatcherQueue
{
public void Enqueue(BotMessage message);
public bool TryDequeue(out BotMessage? message);
}

View File

@ -0,0 +1,12 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Nocr.TelegramClient.Core.BackgroundJobs;
namespace Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
public sealed class MessageDispatcherBackgroundService : RepeatableBackgroundService<IMessageDispatcherHandler, MessageDispatcherOptions>
{
public MessageDispatcherBackgroundService(ILogger<MessageDispatcherBackgroundService> logger, IOptions<MessageDispatcherOptions> options, IServiceProvider serviceProvider) : base(logger, options.Value, serviceProvider)
{
}
}

View File

@ -0,0 +1,36 @@
using Insight.TelegramBot;
using Insight.TelegramBot.Models;
using Microsoft.Extensions.Logging;
using Telegram.Bot.Types;
namespace Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
public sealed class MessageDispatcherHandler : IMessageDispatcherHandler
{
private readonly ILogger<MessageDispatcherHandler> _logger;
private readonly IBot _bot;
private readonly IMessageDispatcherQueue _queue;
public MessageDispatcherHandler(ILogger<MessageDispatcherHandler> logger, IBot bot, IMessageDispatcherQueue queue)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
}
public async Task Handle(CancellationToken cancellationToken = default)
{
if (_queue.TryDequeue(out var message))
{
switch (message)
{
case TextMessage tm:
await _bot.SendMessageAsync(tm, cancellationToken);
break;
default:
_logger.LogWarning("Dequeued message has unsupported type: {Type}", typeof(Message).FullName);
break;
}
}
}
}

View File

@ -0,0 +1,7 @@
using Nocr.TelegramClient.Core.BackgroundJobs;
namespace Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
public class MessageDispatcherOptions : RepeatableServiceOptions
{
}

View File

@ -0,0 +1,19 @@
using System.Collections.Concurrent;
using Insight.TelegramBot.Models;
namespace Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
public sealed class MessageDispatcherQueue : IMessageDispatcherQueue
{
private readonly ConcurrentQueue<BotMessage> _queue = new();
public void Enqueue(BotMessage message)
{
_queue.Enqueue(message);
}
public bool TryDequeue(out BotMessage? message)
{
return _queue.TryDequeue(out message);
}
}

View File

@ -1,18 +1,19 @@
using Insight.TelegramBot;
using Insight.TelegramBot.Handling.Handlers;
using Insight.TelegramBot.Models;
using Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
using Telegram.Bot.Types;
using Telegram.Bot.Types.Enums;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.StartMesage;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.StartMessage;
public class StartMessageHandler : IMatchingUpdateHandler<StartMessageMatcher>
{
private readonly IBot _bot;
private readonly IMessageDispatcherQueue _messageQueue;
public StartMessageHandler(IBot bot)
public StartMessageHandler(IMessageDispatcherQueue messageQueue)
{
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
_messageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue));
}
public Task Handle(Update update, CancellationToken cancellationToken = default)
@ -24,6 +25,7 @@ public class StartMessageHandler : IMatchingUpdateHandler<StartMessageMatcher>
ParseMode = ParseMode.Html
};
return _bot.SendMessageAsync(message, cancellationToken);
_messageQueue.Enqueue(message);
return Task.CompletedTask;
}
}

View File

@ -1,6 +1,6 @@
using Insight.TelegramBot.Handling.Matchers.TextMatchers;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.StartMesage;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.StartMessage;
public sealed class StartMessageMatcher : TextStartWithUpdateMatcher
{

View File

@ -1,8 +1,8 @@
using System.Text.RegularExpressions;
using Insight.TelegramBot;
using Insight.TelegramBot.Handling.Handlers;
using Insight.TelegramBot.Handling.Matchers.TextMatchers;
using Insight.TelegramBot.Models;
using Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
using Nocr.TelegramClient.AppServices.Users;
using Nocr.TextMatcher.Api.Contracts.TextMatches;
using Nocr.TextMatcher.Api.Contracts.TextMatches.Requests;
@ -10,17 +10,9 @@ using Telegram.Bot.Types;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.SubscribeMessage;
public sealed class SubscribeMessageMatcher : TextStartWithUpdateMatcher
{
public SubscribeMessageMatcher()
{
Template = "/subscribe";
}
}
public class SubscribeMessageHandler : IMatchingUpdateHandler<SubscribeMessageMatcher>
{
private readonly IBot _bot;
private readonly IMessageDispatcherQueue _messageQueue;
private readonly IUsersService _usersService;
private readonly ITextMatchesController _textMatchesController;
@ -32,9 +24,10 @@ public class SubscribeMessageHandler : IMatchingUpdateHandler<SubscribeMessageMa
new Regex(@"/subscribe (.*\B@(?=\w{5,32}\b)[a-zA-Z0-9]+(?:_[a-zA-Z0-9]+)*.*) (\d{1}) (.*)",
RegexOptions.Compiled);
public SubscribeMessageHandler(IBot bot, IUsersService usersService, ITextMatchesController textMatchesController)
public SubscribeMessageHandler(IMessageDispatcherQueue messageQueue, IUsersService usersService,
ITextMatchesController textMatchesController)
{
_bot = bot ?? throw new ArgumentNullException(nameof(bot));
_messageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue));
_usersService = usersService ?? throw new ArgumentNullException(nameof(usersService));
_textMatchesController =
textMatchesController ?? throw new ArgumentNullException(nameof(textMatchesController));
@ -47,10 +40,11 @@ public class SubscribeMessageHandler : IMatchingUpdateHandler<SubscribeMessageMa
var match = _commandRegex.Match(update.Message.Text);
if (!match.Success)
{
await _bot.SendMessageAsync(new TextMessage(telegramId)
_messageQueue.Enqueue(new TextMessage(telegramId)
{
Text = "Команда не удовлетворяет формату"
}, CancellationToken.None);
});
return;
}
var username = match.Groups[1].Value.TrimStart('@');
@ -66,9 +60,9 @@ public class SubscribeMessageHandler : IMatchingUpdateHandler<SubscribeMessageMa
Template = template,
}, cancellationToken);
await _bot.SendMessageAsync(new TextMessage(telegramId)
_messageQueue.Enqueue(new TextMessage(telegramId)
{
Text = $"Подписка создана: {matchId}"
}, CancellationToken.None);
});
}
}

View File

@ -0,0 +1,11 @@
using Insight.TelegramBot.Handling.Matchers.TextMatchers;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.SubscribeMessage;
public sealed class SubscribeMessageMatcher : TextStartWithUpdateMatcher
{
public SubscribeMessageMatcher()
{
Template = "/subscribe";
}
}

View File

@ -0,0 +1,45 @@
using Insight.TelegramBot;
using Insight.TelegramBot.Handling.Handlers;
using Insight.TelegramBot.Models;
using Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
using Nocr.TelegramClient.AppServices.Matches;
using Nocr.TelegramClient.AppServices.Users;
using Nocr.TextMatcher.Api.Contracts.TextMatches;
using Telegram.Bot.Types;
using Telegram.Bot.Types.Enums;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.SubscriptionsMessage;
public class SubscriptionsMessageHandler : IMatchingUpdateHandler<SubscriptionsMessageMatcher>
{
private readonly IMessageDispatcherQueue _messageQueue;
private readonly IUsersService _usersService;
private readonly ITextMatchesController _matchesController;
public SubscriptionsMessageHandler(IMessageDispatcherQueue messageQueue, IUsersService usersService,
ITextMatchesController matchesController)
{
_messageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue));
_usersService = usersService ?? throw new ArgumentNullException(nameof(usersService));
_matchesController = matchesController ?? throw new ArgumentNullException(nameof(matchesController));
}
public async Task Handle(Update update, CancellationToken cancellationToken = default)
{
var telegramId = update.Message.From.Id;
var user = await _usersService.GetOrCreate(telegramId, update.Message.From.Username, cancellationToken);
var subscriptions = await _matchesController.GetByUserId(user.Id, cancellationToken);
foreach (var subscription in subscriptions)
{
var textMessage = new TextMessage(telegramId)
{
Text = subscription.TextView(),
ParseMode = ParseMode.Html
};
_messageQueue.Enqueue(textMessage);
}
}
}

View File

@ -0,0 +1,11 @@
using Insight.TelegramBot.Handling.Matchers.TextMatchers;
namespace Nocr.TelegramClient.AppServices.Handlers.Messages.SubscriptionsMessage;
public class SubscriptionsMessageMatcher : TextStartWithUpdateMatcher
{
public SubscriptionsMessageMatcher()
{
Template = "/subscriptions";
}
}

View File

@ -8,7 +8,7 @@ using Nocr.Users.Api.Contracts.Users;
using Rebus.Handlers;
using Telegram.Bot.Types.Enums;
namespace Nocr.TelegramClient.AppServices.Matches;
namespace Nocr.TelegramClient.AppServices.Matches.Handlers;
public class TextMatchMatchedHandler : IHandleMessages<TextMatchMatched>
{
@ -41,29 +41,13 @@ public class TextMatchMatchedHandler : IHandleMessages<TextMatchMatched>
return;
}
var fromUsername = string.IsNullOrWhiteSpace(message.FromUsername) ? "anonymous" : message.FromUsername;
var fromUsername = string.IsNullOrWhiteSpace(message.From) ? "anonymous" : message.From;
var textMessage = new TextMessage(long.Parse(identity.Identity))
{
Text =
$"[{message.PublishedDateTime:MM.dd.yyyy HH:mm:ss}] Найдено совпадение.</br>Тип совпадения: <b>'{GetTextMatchRule((TextMatchRule)message.Rule)}'</b></br>Шаблон: <b>'{message.Template}'</b></br>{fromUsername} в @{message.ChatUsername}: <i>{message.Text}</i>",
Text = $"[{message.PublishedDateTime:MM.dd.yyyy HH:mm:ss}] Найдено совпадение.\nТип совпадения: <b>'{((TextMatchRule)message.Rule).TextView()}'</b>\nШаблон: <b>'{message.Template}'</b>\n{fromUsername} в @{message.ChatUsername}: <i>{message.Text}</i>",
ParseMode = ParseMode.Html
};
await _bot.SendMessageAsync(textMessage);
}
private string GetTextMatchRule(TextMatchRule rule)
{
switch (rule)
{
case TextMatchRule.Full:
return "Полное";
case TextMatchRule.AllWords:
return "Все слова из списка";
case TextMatchRule.AnyWord:
return "Одно слово из списка";
default:
throw new IndexOutOfRangeException(nameof(rule));
}
}
}

View File

@ -0,0 +1,32 @@
using Nocr.TextMatcher.Api.Contracts.TextMatches;
using Nocr.TextMatcher.Api.Contracts.TextMatches.Dto;
namespace Nocr.TelegramClient.AppServices.Matches;
public static class TextMatchExtensions
{
public static string TextView(this TextMatchData textMatch)
{
var activeText = textMatch.Active ? "Активна" : "Не активна";
var activeCommandText = textMatch.Active
? $"Деактивировать: /deactivate_{textMatch.Id}"
: $"Активировать: /activate_{textMatch.Id}";
var deleteCommandText = $"Удалить: /delete_subscription_{textMatch.Id}";
return $"[{textMatch.Id}] (@{textMatch.ChatUsername}) {activeText}: '{textMatch.Rule.TextView()}' > '{textMatch.Template}'\n{activeCommandText}\n{deleteCommandText}";
}
public static string TextView(this TextMatchRule rule)
{
switch (rule)
{
case TextMatchRule.Full:
return "Полное";
case TextMatchRule.AllWords:
return "Все слова из списка";
case TextMatchRule.AnyWord:
return "Одно слово из списка";
default:
throw new IndexOutOfRangeException(nameof(rule));
}
}
}

View File

@ -1,7 +1,10 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Nocr.TelegramClient.AppServices.Bots;
using Nocr.TelegramClient.AppServices.Bots.MessageDispatcher;
using Nocr.TelegramClient.AppServices.Matches;
using Nocr.TelegramClient.AppServices.Matches.Handlers;
using Nocr.TelegramClient.AppServices.Options;
using Nocr.TelegramClient.AppServices.Users;
using Nocr.TextMatcher.Api.Contracts.TextMatches;
@ -44,6 +47,11 @@ public static class ServiceCollectionExtensions
services.AddScoped<IUsersService, UsersService>();
services.Configure<MessageDispatcherOptions>(configuration.GetSection(nameof(MessageDispatcherOptions)));
services.AddSingleton<IMessageDispatcherQueue, MessageDispatcherQueue>();
services.AddScoped<IMessageDispatcherHandler, MessageDispatcherHandler>();
services.AddHostedService<MessageDispatcherBackgroundService>();
return services;
}
}

View File

@ -0,0 +1,6 @@
namespace Nocr.TelegramClient.Core.BackgroundJobs;
public interface IRepeatableBackgroundServiceHandler
{
Task Handle(CancellationToken cancellationToken = default);
}

View File

@ -0,0 +1,42 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Nocr.TelegramClient.Core.BackgroundJobs;
public abstract class RepeatableBackgroundService<THandler, TOptions> : BackgroundService
where THandler : IRepeatableBackgroundServiceHandler
where TOptions : RepeatableServiceOptions
{
private readonly ILogger<RepeatableBackgroundService<THandler, TOptions>> _logger;
private readonly TOptions _options;
private readonly IServiceProvider _serviceProvider;
protected RepeatableBackgroundService(ILogger<RepeatableBackgroundService<THandler, TOptions>> logger, TOptions options, IServiceProvider serviceProvider)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options ?? throw new ArgumentNullException(nameof(options));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _serviceProvider.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<THandler>();
await handler.Handle(stoppingToken);
await Task.Delay(_options.Interval, stoppingToken);
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Failed to process...");
await Task.Delay(_options.ExceptionInterval, stoppingToken);
}
}
}
}

View File

@ -0,0 +1,8 @@
namespace Nocr.TelegramClient.Core.BackgroundJobs;
public abstract class RepeatableServiceOptions
{
public TimeSpan Interval { get; set; } = TimeSpan.FromSeconds(60);
public TimeSpan ExceptionInterval { get; set; } = TimeSpan.FromSeconds(300);
}

View File

@ -1 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk" />
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions"/>
</ItemGroup>
</Project>

View File

@ -3,6 +3,7 @@ using Insight.TelegramBot.Hosting.DependencyInjection.Infrastructure;
using Insight.TelegramBot.Hosting.Polling.ExceptionHandlers;
using Microsoft.Extensions.Options;
using Nocr.TelegramClient.AppServices;
using Nocr.TelegramClient.AppServices.Bots;
using Nocr.TelegramClient.Core.Dates;
using Nocr.TelegramClient.Core.Options;
using Rebus.Bus;

View File

@ -13,5 +13,9 @@
"InputQueueName": "nocr.telegram.client.queue",
"DirectExchangeName": "nocr.direct",
"TopicsExchangeName": "nocr.topics"
},
"MessageDispatcherOptions": {
"Interval": "00:00:00.050",
"ExceptionInterval": "00:00:30"
}
}