Added update manager.

This commit is contained in:
ruberoid 2025-05-13 18:35:34 +04:00
parent 0861d48f5f
commit c086c35b80
8 changed files with 104 additions and 125 deletions

View File

@ -4,5 +4,5 @@ namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
public interface IUpdateHandler
{
Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default);
Task HandleUpdate(Update update, CancellationToken cancellationToken = default);
}

View File

@ -18,20 +18,28 @@ public sealed class EditMessageHandler : IEditMessageHandler
public Task Handle(MessageBase messageBase)
{
_logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID);
switch (messageBase)
try
{
case Message m:
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author,
m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
break;
switch (messageBase)
{
case Message m:
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author,
m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
break;
}
}
catch (Exception ex)
{
_logger.LogError("Error executing {Handler} for message {MessageId}\n{Exception}", nameof(EditMessageHandler), messageBase.ID, ex);
}
return Task.CompletedTask;

View File

@ -24,42 +24,51 @@ public sealed class NewMessageHandler : INewMessageHandler
public async Task Handle(MessageBase messageBase)
{
_logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID);
switch (messageBase)
_logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(NewMessageHandler), messageBase.ID);
try
{
case Message m:
if (string.IsNullOrWhiteSpace(m.message))
break;
switch (messageBase)
{
case Message m:
if (string.IsNullOrWhiteSpace(m.message))
break;
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
var chatUserName = m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats).Split("@").Last();
if (string.IsNullOrWhiteSpace(chatUserName))
{
_logger.LogWarning("Failed to get chat user name for chat {ChatId}", m.peer_id.ID);
break;
}
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
var chatUserName = m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats).Split("@").Last();
if (string.IsNullOrWhiteSpace(chatUserName))
{
_logger.LogWarning("Failed to get chat user name for chat {ChatId}", m.peer_id.ID);
break;
}
var @event = new MessageReceived
{
MessageId = messageBase.ID,
// Для каналов from = null
FromId = m.From?.ID ?? m.Peer.ID,
From = (m.From ?? m.Peer).Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ChatUsername = chatUserName,
Text = m.message,
OccuredDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish("nocr.telegram.listener", @event);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
break;
var @event = new MessageReceived
{
MessageId = messageBase.ID,
// Для каналов from = null
FromId = m.From?.ID ?? m.Peer.ID,
From = (m.From ?? m.Peer).Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ChatUsername = chatUserName,
Text = m.message,
OccuredDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish("nocr.telegram.listener", @event);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
break;
}
}
catch (Exception ex)
{
_logger.LogError("Error executing {Handler} for message {MessageId}\n{Exception}", nameof(NewMessageHandler), messageBase.ID, ex);
}
}
}

View File

@ -11,6 +11,8 @@ public sealed class UpdateHandler : IUpdateHandler
private readonly INewMessageHandler _newMessageHandler;
private readonly IEditMessageHandler _editMessageHandler;
private string Peer(Peer peer) => _clientContainer.Manager.UserOrChat(peer)?.ToString() ?? $"Peer {peer?.ID}";
public UpdateHandler(
ILogger<UpdateHandler> logger,
ITelegramClientContainer clientContainer,
@ -25,70 +27,19 @@ public sealed class UpdateHandler : IUpdateHandler
_editMessageHandler = editMessageHandler ?? throw new ArgumentNullException(nameof(editMessageHandler));
}
public async Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default)
public async Task HandleUpdate(Update update, CancellationToken cancellationToken = default)
{
try
{
var (client, users, chats) =
(_clientContainer.Client, _telegramRegistry.Users, _telegramRegistry.Chats);
switch (update)
{
case UpdateNewMessage unm: await HandleMessage(unm.message); break;
case UpdateEditMessage uem: await HandleMessage(uem.message, true); break;
updates.CollectUsersChats(users, chats);
if (updates is UpdateShortMessage usm && !users.ContainsKey(usm.user_id))
(await client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats(
users, chats);
else if (updates is UpdateShortChatMessage uscm &&
(!users.ContainsKey(uscm.from_id) ||
!chats.ContainsKey(uscm.chat_id)))
(await client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats(
users,
chats);
foreach (var update in updates.UpdateList)
switch (update)
{
case UpdateNewMessage unm:
await _newMessageHandler.Handle(unm.message);
break;
case UpdateEditMessage uem:
await _editMessageHandler.Handle(uem.message);
break;
// Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases
case UpdateDeleteChannelMessages udcm:
Console.WriteLine(
$"{udcm.messages.Length} message(s) deleted in {chats.Chat(udcm.channel_id)}");
break;
case UpdateDeleteMessages udm:
Console.WriteLine($"{udm.messages.Length} message(s) deleted");
break;
case UpdateUserTyping uut:
Console.WriteLine($"{users.User(uut.user_id)} is {uut.action}");
break;
case UpdateChatUserTyping ucut:
Console.WriteLine(
$"{ucut.from_id.Peer(users, chats)} is {ucut.action} in {chats.Chat(ucut.chat_id)}");
break;
case UpdateChannelUserTyping ucut2:
Console.WriteLine(
$"{ucut2.from_id.Peer(users, chats)} is {ucut2.action} in {chats.Chat(ucut2.channel_id)}");
break;
case UpdateChatParticipants { participants: ChatParticipants cp }:
Console.WriteLine(
$"{cp.participants.Length} participants in {chats.Chat(cp.chat_id)}");
break;
case UpdateUserStatus uus:
Console.WriteLine(
$"{users.User(uus.user_id)} is now {uus.status.GetType().Name[10..]}");
break;
case UpdateUserName uun:
Console.WriteLine(
$"{users.User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}");
break;
case UpdateUser uu:
Console.WriteLine($"{users.User(uu.user_id)} has changed infos/photo");
break;
default:
Console.WriteLine(update.GetType().Name);
break; // there are much more update types than the above example cases
}
default:
_logger.LogInformation($"Update of type `{update.GetType().Name}` fired.");
break;
}
}
catch (Exception ex)
{
@ -96,4 +47,19 @@ public sealed class UpdateHandler : IUpdateHandler
throw;
}
}
private async Task HandleMessage(MessageBase messageBase, bool edit = false)
{
if (edit) _logger.LogInformation("(Edit): ");
switch (messageBase)
{
case Message m:
_logger.LogInformation($"{Peer(m.from_id) ?? m.post_author} in {Peer(m.peer_id)}> {m.message}");
await _newMessageHandler.Handle(m);
break;
case MessageService ms:
_logger.LogInformation($"{Peer(ms.from_id)} in {Peer(ms.peer_id)} [{ms.action.GetType().Name[13..]}]");
break;
}
}
}

View File

@ -6,6 +6,8 @@ public interface ITelegramClientContainer
{
Client Client { get; }
UpdateManager Manager { get; set; }
public bool Initialized { get; }
public void Initialize();

View File

@ -1,24 +1,22 @@
using Microsoft.Extensions.Options;
using TL;
using WTelegram;
namespace Nocr.TelegramListener.AppServices.UpdateListeners;
public sealed class TelegramClientContainer : ITelegramClientContainer, IDisposable
public sealed class TelegramClientContainer(IOptions<WTelegramClientOptions> options) : ITelegramClientContainer, IDisposable
{
private Client? _client;
private readonly WTelegramClientOptions _options;
public Client Client => _client ?? throw new InvalidOperationException("Client not initialized yet");
private readonly WTelegramClientOptions _options = options.Value ?? throw new ArgumentNullException(nameof(options));
public Client Client => _client ?? throw new InvalidOperationException("Client not initialized yet.");
public bool Initialized { get; private set; }
public required UpdateManager Manager { get; set; }
private readonly object _syncRoot = new();
public TelegramClientContainer(IOptions<WTelegramClientOptions> options)
{
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
}
public void Initialize()
{
lock (_syncRoot)
@ -42,7 +40,7 @@ public sealed class TelegramClientContainer : ITelegramClientContainer, IDisposa
}
}
private string ConfigureWTelegramClient(string what)
private string? ConfigureWTelegramClient(string what)
{
switch (what)
{
@ -54,10 +52,7 @@ public sealed class TelegramClientContainer : ITelegramClientContainer, IDisposa
case "verification_code":
Console.Write("Code: ");
return Console.ReadLine();
//case "first_name": return "Dmitry"; // if sign-up is required
//case "last_name": return "Charushnikov"; // if sign-up is required
//case "password": return ""; // if user has enabled 2FA
default: return null; // let WTelegramClient decide the default config
default: return null;
}
}

View File

@ -49,7 +49,6 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
if (client != null)
{
_logger.LogWarning("No activity detected for {Timeout}. Reconnecting...", _activityTimeout);
client.OnUpdates -= HandleUpdates;
client.Dispose();
}
@ -58,7 +57,7 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
_telegramClientContainer.Initialize();
client = _telegramClientContainer.Client;
client.OnUpdates += HandleUpdates;
_telegramClientContainer.Manager = client.WithUpdateManager(HandleUpdates);
var my = await client.LoginUserIfNeeded();
_telegramRegistry.SetMy(my);
@ -88,7 +87,7 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
}
}
private async Task HandleUpdates(UpdatesBase updates)
private async Task HandleUpdates(Update update)
{
TelegramListenerMetrics.UpdatesReceived.Inc();
TelegramListenerMetrics.LastUpdateTimestamp.SetToCurrentTimeUtc();
@ -97,7 +96,7 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
{
using var scope = _serviceProvider.CreateScope();
var updateHandler = scope.ServiceProvider.GetRequiredService<IUpdateHandler>();
await updateHandler.HandleUpdate(updates);
await updateHandler.HandleUpdate(update);
TelegramListenerMetrics.UpdatesProcessed.Inc();
}

View File

@ -1,7 +1,7 @@
{
"Serilog": {
"MinimumLevel": {
"Default": "Information"
"Default": "Debug"
},
"WriteTo": [
{