From c086c35b803856b80803d0fa7823c7928762e2fd Mon Sep 17 00:00:00 2001 From: ruberoid Date: Tue, 13 May 2025 18:35:34 +0400 Subject: [PATCH] Added update manager. --- .../Handlers/IUpdateHandler.cs | 2 +- .../Implementation/EditMessageHandler.cs | 34 +++++--- .../Implementation/NewMessageHandler.cs | 75 +++++++++------- .../Handlers/Implementation/UpdateHandler.cs | 86 ++++++------------- .../ITelegramClientContainer.cs | 2 + .../TelegramClientContainer.cs | 21 ++--- .../UpdateListenerBackgroundService.cs | 7 +- .../appsettings.k8s.json | 2 +- 8 files changed, 104 insertions(+), 125 deletions(-) diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/IUpdateHandler.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/IUpdateHandler.cs index 573ba85..0a94c32 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/IUpdateHandler.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/IUpdateHandler.cs @@ -4,5 +4,5 @@ namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers; public interface IUpdateHandler { - Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default); + Task HandleUpdate(Update update, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/EditMessageHandler.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/EditMessageHandler.cs index 21c7e5c..3315c6c 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/EditMessageHandler.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/EditMessageHandler.cs @@ -18,20 +18,28 @@ public sealed class EditMessageHandler : IEditMessageHandler public Task Handle(MessageBase messageBase) { _logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID); - switch (messageBase) + + try { - case Message m: - _logger.LogInformation("{From} in {Chat} > {MessageText}", - m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author, - m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - m.message); - break; - case MessageService ms: - _logger.LogInformation("{From} in {Chat} > [{Action}]", - ms.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - ms.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - ms.action.GetType().Name[13..]); - break; + switch (messageBase) + { + case Message m: + _logger.LogInformation("{From} in {Chat} > {MessageText}", + m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author, + m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + m.message); + break; + case MessageService ms: + _logger.LogInformation("{From} in {Chat} > [{Action}]", + ms.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + ms.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + ms.action.GetType().Name[13..]); + break; + } + } + catch (Exception ex) + { + _logger.LogError("Error executing {Handler} for message {MessageId}\n{Exception}", nameof(EditMessageHandler), messageBase.ID, ex); } return Task.CompletedTask; diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs index 4917736..6547406 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/NewMessageHandler.cs @@ -24,42 +24,51 @@ public sealed class NewMessageHandler : INewMessageHandler public async Task Handle(MessageBase messageBase) { - _logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID); - switch (messageBase) + _logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(NewMessageHandler), messageBase.ID); + + try { - case Message m: - if (string.IsNullOrWhiteSpace(m.message)) - break; + switch (messageBase) + { + case Message m: + if (string.IsNullOrWhiteSpace(m.message)) + break; - _logger.LogInformation("{From} in {Chat} > {MessageText}", - m.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - m.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - m.message); - var chatUserName = m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats).Split("@").Last(); - if (string.IsNullOrWhiteSpace(chatUserName)) - { - _logger.LogWarning("Failed to get chat user name for chat {ChatId}", m.peer_id.ID); - break; - } + _logger.LogInformation("{From} in {Chat} > {MessageText}", + m.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + m.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + m.message); + var chatUserName = m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats).Split("@").Last(); + if (string.IsNullOrWhiteSpace(chatUserName)) + { + _logger.LogWarning("Failed to get chat user name for chat {ChatId}", m.peer_id.ID); + break; + } - var @event = new MessageReceived - { - MessageId = messageBase.ID, - // Для каналов from = null - FromId = m.From?.ID ?? m.Peer.ID, - From = (m.From ?? m.Peer).Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - ChatUsername = chatUserName, - Text = m.message, - OccuredDateTime = _dateProvider.UtcNow - }; - await _bus.Advanced.Topics.Publish("nocr.telegram.listener", @event); - break; - case MessageService ms: - _logger.LogInformation("{From} in {Chat} > [{Action}]", - ms.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - ms.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), - ms.action.GetType().Name[13..]); - break; + var @event = new MessageReceived + { + MessageId = messageBase.ID, + // Для каналов from = null + FromId = m.From?.ID ?? m.Peer.ID, + From = (m.From ?? m.Peer).Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + ChatUsername = chatUserName, + Text = m.message, + OccuredDateTime = _dateProvider.UtcNow + }; + + await _bus.Advanced.Topics.Publish("nocr.telegram.listener", @event); + break; + case MessageService ms: + _logger.LogInformation("{From} in {Chat} > [{Action}]", + ms.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + ms.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats), + ms.action.GetType().Name[13..]); + break; + } + } + catch (Exception ex) + { + _logger.LogError("Error executing {Handler} for message {MessageId}\n{Exception}", nameof(NewMessageHandler), messageBase.ID, ex); } } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs index b556a57..532454d 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs @@ -11,6 +11,8 @@ public sealed class UpdateHandler : IUpdateHandler private readonly INewMessageHandler _newMessageHandler; private readonly IEditMessageHandler _editMessageHandler; + private string Peer(Peer peer) => _clientContainer.Manager.UserOrChat(peer)?.ToString() ?? $"Peer {peer?.ID}"; + public UpdateHandler( ILogger logger, ITelegramClientContainer clientContainer, @@ -25,70 +27,19 @@ public sealed class UpdateHandler : IUpdateHandler _editMessageHandler = editMessageHandler ?? throw new ArgumentNullException(nameof(editMessageHandler)); } - public async Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default) + public async Task HandleUpdate(Update update, CancellationToken cancellationToken = default) { try { - var (client, users, chats) = - (_clientContainer.Client, _telegramRegistry.Users, _telegramRegistry.Chats); + switch (update) + { + case UpdateNewMessage unm: await HandleMessage(unm.message); break; + case UpdateEditMessage uem: await HandleMessage(uem.message, true); break; - updates.CollectUsersChats(users, chats); - if (updates is UpdateShortMessage usm && !users.ContainsKey(usm.user_id)) - (await client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats( - users, chats); - else if (updates is UpdateShortChatMessage uscm && - (!users.ContainsKey(uscm.from_id) || - !chats.ContainsKey(uscm.chat_id))) - (await client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats( - users, - chats); - foreach (var update in updates.UpdateList) - switch (update) - { - case UpdateNewMessage unm: - await _newMessageHandler.Handle(unm.message); - break; - case UpdateEditMessage uem: - await _editMessageHandler.Handle(uem.message); - break; - // Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases - case UpdateDeleteChannelMessages udcm: - Console.WriteLine( - $"{udcm.messages.Length} message(s) deleted in {chats.Chat(udcm.channel_id)}"); - break; - case UpdateDeleteMessages udm: - Console.WriteLine($"{udm.messages.Length} message(s) deleted"); - break; - case UpdateUserTyping uut: - Console.WriteLine($"{users.User(uut.user_id)} is {uut.action}"); - break; - case UpdateChatUserTyping ucut: - Console.WriteLine( - $"{ucut.from_id.Peer(users, chats)} is {ucut.action} in {chats.Chat(ucut.chat_id)}"); - break; - case UpdateChannelUserTyping ucut2: - Console.WriteLine( - $"{ucut2.from_id.Peer(users, chats)} is {ucut2.action} in {chats.Chat(ucut2.channel_id)}"); - break; - case UpdateChatParticipants { participants: ChatParticipants cp }: - Console.WriteLine( - $"{cp.participants.Length} participants in {chats.Chat(cp.chat_id)}"); - break; - case UpdateUserStatus uus: - Console.WriteLine( - $"{users.User(uus.user_id)} is now {uus.status.GetType().Name[10..]}"); - break; - case UpdateUserName uun: - Console.WriteLine( - $"{users.User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}"); - break; - case UpdateUser uu: - Console.WriteLine($"{users.User(uu.user_id)} has changed infos/photo"); - break; - default: - Console.WriteLine(update.GetType().Name); - break; // there are much more update types than the above example cases - } + default: + _logger.LogInformation($"Update of type `{update.GetType().Name}` fired."); + break; + } } catch (Exception ex) { @@ -96,4 +47,19 @@ public sealed class UpdateHandler : IUpdateHandler throw; } } + + private async Task HandleMessage(MessageBase messageBase, bool edit = false) + { + if (edit) _logger.LogInformation("(Edit): "); + switch (messageBase) + { + case Message m: + _logger.LogInformation($"{Peer(m.from_id) ?? m.post_author} in {Peer(m.peer_id)}> {m.message}"); + await _newMessageHandler.Handle(m); + break; + case MessageService ms: + _logger.LogInformation($"{Peer(ms.from_id)} in {Peer(ms.peer_id)} [{ms.action.GetType().Name[13..]}]"); + break; + } + } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/ITelegramClientContainer.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/ITelegramClientContainer.cs index 5e61279..da744e1 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/ITelegramClientContainer.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/ITelegramClientContainer.cs @@ -6,6 +6,8 @@ public interface ITelegramClientContainer { Client Client { get; } + UpdateManager Manager { get; set; } + public bool Initialized { get; } public void Initialize(); diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/TelegramClientContainer.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/TelegramClientContainer.cs index afd3894..3938603 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/TelegramClientContainer.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/TelegramClientContainer.cs @@ -1,24 +1,22 @@ using Microsoft.Extensions.Options; +using TL; using WTelegram; namespace Nocr.TelegramListener.AppServices.UpdateListeners; -public sealed class TelegramClientContainer : ITelegramClientContainer, IDisposable +public sealed class TelegramClientContainer(IOptions options) : ITelegramClientContainer, IDisposable { private Client? _client; - private readonly WTelegramClientOptions _options; - public Client Client => _client ?? throw new InvalidOperationException("Client not initialized yet"); + private readonly WTelegramClientOptions _options = options.Value ?? throw new ArgumentNullException(nameof(options)); + + public Client Client => _client ?? throw new InvalidOperationException("Client not initialized yet."); public bool Initialized { get; private set; } + public required UpdateManager Manager { get; set; } private readonly object _syncRoot = new(); - public TelegramClientContainer(IOptions options) - { - _options = options.Value ?? throw new ArgumentNullException(nameof(options)); - } - public void Initialize() { lock (_syncRoot) @@ -42,7 +40,7 @@ public sealed class TelegramClientContainer : ITelegramClientContainer, IDisposa } } - private string ConfigureWTelegramClient(string what) + private string? ConfigureWTelegramClient(string what) { switch (what) { @@ -54,10 +52,7 @@ public sealed class TelegramClientContainer : ITelegramClientContainer, IDisposa case "verification_code": Console.Write("Code: "); return Console.ReadLine(); - //case "first_name": return "Dmitry"; // if sign-up is required - //case "last_name": return "Charushnikov"; // if sign-up is required - //case "password": return ""; // if user has enabled 2FA - default: return null; // let WTelegramClient decide the default config + default: return null; } } diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs index e8da26f..8396d16 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs @@ -49,7 +49,6 @@ public sealed class UpdateListenerBackgroundService : BackgroundService if (client != null) { _logger.LogWarning("No activity detected for {Timeout}. Reconnecting...", _activityTimeout); - client.OnUpdates -= HandleUpdates; client.Dispose(); } @@ -58,7 +57,7 @@ public sealed class UpdateListenerBackgroundService : BackgroundService _telegramClientContainer.Initialize(); client = _telegramClientContainer.Client; - client.OnUpdates += HandleUpdates; + _telegramClientContainer.Manager = client.WithUpdateManager(HandleUpdates); var my = await client.LoginUserIfNeeded(); _telegramRegistry.SetMy(my); @@ -88,7 +87,7 @@ public sealed class UpdateListenerBackgroundService : BackgroundService } } - private async Task HandleUpdates(UpdatesBase updates) + private async Task HandleUpdates(Update update) { TelegramListenerMetrics.UpdatesReceived.Inc(); TelegramListenerMetrics.LastUpdateTimestamp.SetToCurrentTimeUtc(); @@ -97,7 +96,7 @@ public sealed class UpdateListenerBackgroundService : BackgroundService { using var scope = _serviceProvider.CreateScope(); var updateHandler = scope.ServiceProvider.GetRequiredService(); - await updateHandler.HandleUpdate(updates); + await updateHandler.HandleUpdate(update); TelegramListenerMetrics.UpdatesProcessed.Inc(); } diff --git a/src/Nocr.TelegramListener.Host/appsettings.k8s.json b/src/Nocr.TelegramListener.Host/appsettings.k8s.json index aada430..dede7c2 100644 --- a/src/Nocr.TelegramListener.Host/appsettings.k8s.json +++ b/src/Nocr.TelegramListener.Host/appsettings.k8s.json @@ -1,7 +1,7 @@ { "Serilog": { "MinimumLevel": { - "Default": "Information" + "Default": "Debug" }, "WriteTo": [ {