From 6e59a4759fb87cb0de4fd3e02ce4e6f874db6afd Mon Sep 17 00:00:00 2001 From: ruberoid Date: Sun, 11 May 2025 13:53:58 +0400 Subject: [PATCH] added wide debug area for stoppage listener purposes. --- Directory.Packages.props | 3 +- .../Nocr.TelegramListener.AppServices.csproj | 1 + .../TelegramListenerMetrics.cs | 24 ++++ .../Handlers/Implementation/UpdateHandler.cs | 134 ++++++++++-------- .../UpdateListenerBackgroundService.cs | 75 +++++++--- .../Infrastructure/Startup.cs | 4 + .../TelegramListenerHealthCheck.cs | 36 +++++ 7 files changed, 199 insertions(+), 78 deletions(-) create mode 100644 src/Nocr.TelegramListener.AppServices/TelegramListenerMetrics.cs create mode 100644 src/Nocr.TelegramListener.Host/Infrastructure/TelegramListenerHealthCheck.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 8aea8da..9cbcbbb 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -7,6 +7,7 @@ + @@ -29,4 +30,4 @@ - + \ No newline at end of file diff --git a/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj b/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj index d062521..b49fafe 100644 --- a/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj +++ b/src/Nocr.TelegramListener.AppServices/Nocr.TelegramListener.AppServices.csproj @@ -7,6 +7,7 @@ + diff --git a/src/Nocr.TelegramListener.AppServices/TelegramListenerMetrics.cs b/src/Nocr.TelegramListener.AppServices/TelegramListenerMetrics.cs new file mode 100644 index 0000000..bf9b91a --- /dev/null +++ b/src/Nocr.TelegramListener.AppServices/TelegramListenerMetrics.cs @@ -0,0 +1,24 @@ +using Prometheus; + +public static class TelegramListenerMetrics +{ + public static readonly Counter UpdatesReceived = Metrics.CreateCounter( + "telegram_updates_received_total", + "Total number of updates received from Telegram" + ); + + public static readonly Counter UpdatesProcessed = Metrics.CreateCounter( + "telegram_updates_processed_total", + "Total number of updates successfully processed" + ); + + public static readonly Counter UpdatesErrors = Metrics.CreateCounter( + "telegram_updates_errors_total", + "Total number of errors while processing updates" + ); + + public static readonly Gauge LastUpdateTimestamp = Metrics.CreateGauge( + "telegram_last_update_timestamp", + "Timestamp of the last received update" + ); +} \ No newline at end of file diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs index 33c122e..b556a57 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/Handlers/Implementation/UpdateHandler.cs @@ -1,18 +1,24 @@ using TL; +using Microsoft.Extensions.Logging; namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers; public sealed class UpdateHandler : IUpdateHandler { + private readonly ILogger _logger; private readonly ITelegramClientContainer _clientContainer; private readonly TelegramRegistry _telegramRegistry; private readonly INewMessageHandler _newMessageHandler; private readonly IEditMessageHandler _editMessageHandler; - public UpdateHandler(ITelegramClientContainer clientContainer, TelegramRegistry telegramRegistry, + public UpdateHandler( + ILogger logger, + ITelegramClientContainer clientContainer, + TelegramRegistry telegramRegistry, INewMessageHandler newMessageHandler, IEditMessageHandler editMessageHandler) { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _clientContainer = clientContainer ?? throw new ArgumentNullException(nameof(clientContainer)); _telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry)); _newMessageHandler = newMessageHandler ?? throw new ArgumentNullException(nameof(newMessageHandler)); @@ -21,65 +27,73 @@ public sealed class UpdateHandler : IUpdateHandler public async Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default) { - var (client, users, chats) = - (_clientContainer.Client, _telegramRegistry.Users, _telegramRegistry.Chats); + try + { + var (client, users, chats) = + (_clientContainer.Client, _telegramRegistry.Users, _telegramRegistry.Chats); - updates.CollectUsersChats(users, chats); - if (updates is UpdateShortMessage usm && !users.ContainsKey(usm.user_id)) - (await client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats( - users, chats); - else if (updates is UpdateShortChatMessage uscm && - (!users.ContainsKey(uscm.from_id) || - !chats.ContainsKey(uscm.chat_id))) - (await client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats( - users, - chats); - foreach (var update in updates.UpdateList) - switch (update) - { - case UpdateNewMessage unm: - await _newMessageHandler.Handle(unm.message); - break; - case UpdateEditMessage uem: - await _editMessageHandler.Handle(uem.message); - break; - // Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases - case UpdateDeleteChannelMessages udcm: - Console.WriteLine( - $"{udcm.messages.Length} message(s) deleted in {chats.Chat(udcm.channel_id)}"); - break; - case UpdateDeleteMessages udm: - Console.WriteLine($"{udm.messages.Length} message(s) deleted"); - break; - case UpdateUserTyping uut: - Console.WriteLine($"{users.User(uut.user_id)} is {uut.action}"); - break; - case UpdateChatUserTyping ucut: - Console.WriteLine( - $"{ucut.from_id.Peer(users, chats)} is {ucut.action} in {chats.Chat(ucut.chat_id)}"); - break; - case UpdateChannelUserTyping ucut2: - Console.WriteLine( - $"{ucut2.from_id.Peer(users, chats)} is {ucut2.action} in {chats.Chat(ucut2.channel_id)}"); - break; - case UpdateChatParticipants { participants: ChatParticipants cp }: - Console.WriteLine( - $"{cp.participants.Length} participants in {chats.Chat(cp.chat_id)}"); - break; - case UpdateUserStatus uus: - Console.WriteLine( - $"{users.User(uus.user_id)} is now {uus.status.GetType().Name[10..]}"); - break; - case UpdateUserName uun: - Console.WriteLine( - $"{users.User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}"); - break; - case UpdateUser uu: - Console.WriteLine($"{users.User(uu.user_id)} has changed infos/photo"); - break; - default: - Console.WriteLine(update.GetType().Name); - break; // there are much more update types than the above example cases - } + updates.CollectUsersChats(users, chats); + if (updates is UpdateShortMessage usm && !users.ContainsKey(usm.user_id)) + (await client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats( + users, chats); + else if (updates is UpdateShortChatMessage uscm && + (!users.ContainsKey(uscm.from_id) || + !chats.ContainsKey(uscm.chat_id))) + (await client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats( + users, + chats); + foreach (var update in updates.UpdateList) + switch (update) + { + case UpdateNewMessage unm: + await _newMessageHandler.Handle(unm.message); + break; + case UpdateEditMessage uem: + await _editMessageHandler.Handle(uem.message); + break; + // Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases + case UpdateDeleteChannelMessages udcm: + Console.WriteLine( + $"{udcm.messages.Length} message(s) deleted in {chats.Chat(udcm.channel_id)}"); + break; + case UpdateDeleteMessages udm: + Console.WriteLine($"{udm.messages.Length} message(s) deleted"); + break; + case UpdateUserTyping uut: + Console.WriteLine($"{users.User(uut.user_id)} is {uut.action}"); + break; + case UpdateChatUserTyping ucut: + Console.WriteLine( + $"{ucut.from_id.Peer(users, chats)} is {ucut.action} in {chats.Chat(ucut.chat_id)}"); + break; + case UpdateChannelUserTyping ucut2: + Console.WriteLine( + $"{ucut2.from_id.Peer(users, chats)} is {ucut2.action} in {chats.Chat(ucut2.channel_id)}"); + break; + case UpdateChatParticipants { participants: ChatParticipants cp }: + Console.WriteLine( + $"{cp.participants.Length} participants in {chats.Chat(cp.chat_id)}"); + break; + case UpdateUserStatus uus: + Console.WriteLine( + $"{users.User(uus.user_id)} is now {uus.status.GetType().Name[10..]}"); + break; + case UpdateUserName uun: + Console.WriteLine( + $"{users.User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}"); + break; + case UpdateUser uu: + Console.WriteLine($"{users.User(uu.user_id)} has changed infos/photo"); + break; + default: + Console.WriteLine(update.GetType().Name); + break; // there are much more update types than the above example cases + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Critical error in HandleUpdate"); + throw; + } } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs b/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs index 0b78622..8f0cdd3 100644 --- a/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs +++ b/src/Nocr.TelegramListener.AppServices/UpdateListeners/UpdateListenerBackgroundService.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Nocr.TelegramListener.AppServices.UpdateListeners.Handlers; +using Prometheus; using TL; using TL.Methods; using WTelegram; @@ -17,6 +18,9 @@ public sealed class UpdateListenerBackgroundService : BackgroundService private readonly ITelegramClientContainer _telegramClientContainer; private readonly TelegramRegistry _telegramRegistry; + private DateTime _lastActivityTime = DateTime.UtcNow; + private readonly TimeSpan _activityTimeout = TimeSpan.FromMinutes(10); + public UpdateListenerBackgroundService( ILogger logger, IServiceProvider serviceProvider, @@ -35,35 +39,72 @@ public sealed class UpdateListenerBackgroundService : BackgroundService { Helpers.Log = (l, s) => System.Diagnostics.Debug.WriteLine(s); Client? client = null; + while (!stoppingToken.IsCancellationRequested) { - if (client == null) + try { - _telegramRegistry.Clear(); - _telegramClientContainer.Reset(); - _telegramClientContainer.Initialize(); + if (client == null || _lastActivityTime < DateTime.UtcNow - _activityTimeout) + { + if (client != null) + { + _logger.LogWarning("No activity detected for {Timeout}. Reconnecting...", _activityTimeout); + client.OnUpdate -= HandleUpdates; + client.Dispose(); + } - client = _telegramClientContainer.Client; - client.OnUpdate += HandleUpdates; - var my = await client.LoginUserIfNeeded(); - _telegramRegistry.SetMy(my); + _telegramRegistry.Clear(); + _telegramClientContainer.Reset(); + _telegramClientContainer.Initialize(); - _logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})", - _telegramRegistry.My.username ?? - _telegramRegistry.My.first_name + " " + _telegramRegistry.My.last_name, - _telegramRegistry.My.id); + client = _telegramClientContainer.Client; + client.OnUpdate += HandleUpdates; - await _telegramRegistry.Update(client); + var my = await client.LoginUserIfNeeded(); + _telegramRegistry.SetMy(my); + + _logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})", + _telegramRegistry.My.username ?? + _telegramRegistry.My.first_name + " " + _telegramRegistry.My.last_name, + _telegramRegistry.My.id); + + await _telegramRegistry.Update(client); + + _lastActivityTime = DateTime.UtcNow; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in main loop. Restarting..."); + client?.Dispose(); + client = null; + await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); + continue; } - await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); + await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); + + client?.Dispose(); } } private async Task HandleUpdates(UpdatesBase updates) { - using var scope = _serviceProvider.CreateScope(); - var updateHandler = scope.ServiceProvider.GetRequiredService(); - await updateHandler.HandleUpdate(updates); + TelegramListenerMetrics.UpdatesReceived.Inc(); + TelegramListenerMetrics.LastUpdateTimestamp.SetToCurrentTimeUtc(); + + try + { + using var scope = _serviceProvider.CreateScope(); + var updateHandler = scope.ServiceProvider.GetRequiredService(); + await updateHandler.HandleUpdate(updates); + + TelegramListenerMetrics.UpdatesProcessed.Inc(); + } + catch (Exception ex) + { + TelegramListenerMetrics.UpdatesErrors.Inc(); + _logger.LogError(ex, "Error handling updates."); + } } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs b/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs index 989ffb7..57c99d3 100644 --- a/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs +++ b/src/Nocr.TelegramListener.Host/Infrastructure/Startup.cs @@ -25,6 +25,8 @@ public class Startup services.AddAppServices(Configuration); services.Configure(Configuration.GetSection(nameof(RebusRabbitMqOptions))); + services.AddHealthChecks() + .AddCheck("telegram_listener"); services.AddRebus((builder, ctx) => builder.Transport(t => { @@ -51,5 +53,7 @@ public class Startup var bus = app.ApplicationServices.GetRequiredService(); // TODO: BackgroundService bus.Advanced.Topics.Subscribe(Constants.RoutingKeys.Subscriptions).GetAwaiter().GetResult(); + + app.UseHealthChecks("/health"); } } \ No newline at end of file diff --git a/src/Nocr.TelegramListener.Host/Infrastructure/TelegramListenerHealthCheck.cs b/src/Nocr.TelegramListener.Host/Infrastructure/TelegramListenerHealthCheck.cs new file mode 100644 index 0000000..ae904bf --- /dev/null +++ b/src/Nocr.TelegramListener.Host/Infrastructure/TelegramListenerHealthCheck.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Nocr.TelegramListener.AppServices.UpdateListeners; +using TL; + +public class TelegramListenerHealthCheck( + ILogger logger, + ITelegramClientContainer clientContainer + ) : IHealthCheck +{ + private readonly ITelegramClientContainer _clientContainer = clientContainer; + private readonly ILogger _logger = logger; + + public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) + { + try + { + if (!_clientContainer.Initialized) + { + return HealthCheckResult.Unhealthy("Telegram client not initialized"); + } + + var me = await _clientContainer.Client.Users_GetFullUser(_clientContainer.Client.User); + if (me == null) + { + return HealthCheckResult.Unhealthy("Cannot get self info from Telegram"); + } + + return HealthCheckResult.Healthy($"Connected as {me?.full_user.about}"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Health check failed"); + return HealthCheckResult.Unhealthy("Error communicating with Telegram", ex); + } + } +} \ No newline at end of file