added wide debug area for stoppage listener purposes.
This commit is contained in:
parent
04fff13fef
commit
6e59a4759f
@ -7,6 +7,7 @@
|
||||
</PropertyGroup>
|
||||
<ItemGroup Label="Nocr">
|
||||
<PackageVersion Include="Nocr.TextMatcher.Async.Api.Contracts" />
|
||||
<PackageVersion Include="prometheus-net" Version="8.2.1" />
|
||||
</ItemGroup>
|
||||
<ItemGroup Label="Telegram">
|
||||
<PackageVersion Include="WTelegramClient" Version="3.7.1" />
|
||||
@ -29,4 +30,4 @@
|
||||
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftVersion)" />
|
||||
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftVersion)" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
</Project>
|
||||
@ -7,6 +7,7 @@
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
|
||||
<PackageReference Include="Nocr.TextMatcher.Async.Api.Contracts" />
|
||||
<PackageReference Include="prometheus-net" />
|
||||
<PackageReference Include="WTelegramClient" />
|
||||
<PackageReference Include="Rebus" />
|
||||
<PackageReference Include="Rebus.ServiceProvider" />
|
||||
|
||||
@ -0,0 +1,24 @@
|
||||
using Prometheus;
|
||||
|
||||
public static class TelegramListenerMetrics
|
||||
{
|
||||
public static readonly Counter UpdatesReceived = Metrics.CreateCounter(
|
||||
"telegram_updates_received_total",
|
||||
"Total number of updates received from Telegram"
|
||||
);
|
||||
|
||||
public static readonly Counter UpdatesProcessed = Metrics.CreateCounter(
|
||||
"telegram_updates_processed_total",
|
||||
"Total number of updates successfully processed"
|
||||
);
|
||||
|
||||
public static readonly Counter UpdatesErrors = Metrics.CreateCounter(
|
||||
"telegram_updates_errors_total",
|
||||
"Total number of errors while processing updates"
|
||||
);
|
||||
|
||||
public static readonly Gauge LastUpdateTimestamp = Metrics.CreateGauge(
|
||||
"telegram_last_update_timestamp",
|
||||
"Timestamp of the last received update"
|
||||
);
|
||||
}
|
||||
@ -1,18 +1,24 @@
|
||||
using TL;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
|
||||
|
||||
public sealed class UpdateHandler : IUpdateHandler
|
||||
{
|
||||
private readonly ILogger<UpdateHandler> _logger;
|
||||
private readonly ITelegramClientContainer _clientContainer;
|
||||
private readonly TelegramRegistry _telegramRegistry;
|
||||
private readonly INewMessageHandler _newMessageHandler;
|
||||
private readonly IEditMessageHandler _editMessageHandler;
|
||||
|
||||
public UpdateHandler(ITelegramClientContainer clientContainer, TelegramRegistry telegramRegistry,
|
||||
public UpdateHandler(
|
||||
ILogger<UpdateHandler> logger,
|
||||
ITelegramClientContainer clientContainer,
|
||||
TelegramRegistry telegramRegistry,
|
||||
INewMessageHandler newMessageHandler,
|
||||
IEditMessageHandler editMessageHandler)
|
||||
{
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_clientContainer = clientContainer ?? throw new ArgumentNullException(nameof(clientContainer));
|
||||
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
|
||||
_newMessageHandler = newMessageHandler ?? throw new ArgumentNullException(nameof(newMessageHandler));
|
||||
@ -21,65 +27,73 @@ public sealed class UpdateHandler : IUpdateHandler
|
||||
|
||||
public async Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var (client, users, chats) =
|
||||
(_clientContainer.Client, _telegramRegistry.Users, _telegramRegistry.Chats);
|
||||
try
|
||||
{
|
||||
var (client, users, chats) =
|
||||
(_clientContainer.Client, _telegramRegistry.Users, _telegramRegistry.Chats);
|
||||
|
||||
updates.CollectUsersChats(users, chats);
|
||||
if (updates is UpdateShortMessage usm && !users.ContainsKey(usm.user_id))
|
||||
(await client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats(
|
||||
users, chats);
|
||||
else if (updates is UpdateShortChatMessage uscm &&
|
||||
(!users.ContainsKey(uscm.from_id) ||
|
||||
!chats.ContainsKey(uscm.chat_id)))
|
||||
(await client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats(
|
||||
users,
|
||||
chats);
|
||||
foreach (var update in updates.UpdateList)
|
||||
switch (update)
|
||||
{
|
||||
case UpdateNewMessage unm:
|
||||
await _newMessageHandler.Handle(unm.message);
|
||||
break;
|
||||
case UpdateEditMessage uem:
|
||||
await _editMessageHandler.Handle(uem.message);
|
||||
break;
|
||||
// Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases
|
||||
case UpdateDeleteChannelMessages udcm:
|
||||
Console.WriteLine(
|
||||
$"{udcm.messages.Length} message(s) deleted in {chats.Chat(udcm.channel_id)}");
|
||||
break;
|
||||
case UpdateDeleteMessages udm:
|
||||
Console.WriteLine($"{udm.messages.Length} message(s) deleted");
|
||||
break;
|
||||
case UpdateUserTyping uut:
|
||||
Console.WriteLine($"{users.User(uut.user_id)} is {uut.action}");
|
||||
break;
|
||||
case UpdateChatUserTyping ucut:
|
||||
Console.WriteLine(
|
||||
$"{ucut.from_id.Peer(users, chats)} is {ucut.action} in {chats.Chat(ucut.chat_id)}");
|
||||
break;
|
||||
case UpdateChannelUserTyping ucut2:
|
||||
Console.WriteLine(
|
||||
$"{ucut2.from_id.Peer(users, chats)} is {ucut2.action} in {chats.Chat(ucut2.channel_id)}");
|
||||
break;
|
||||
case UpdateChatParticipants { participants: ChatParticipants cp }:
|
||||
Console.WriteLine(
|
||||
$"{cp.participants.Length} participants in {chats.Chat(cp.chat_id)}");
|
||||
break;
|
||||
case UpdateUserStatus uus:
|
||||
Console.WriteLine(
|
||||
$"{users.User(uus.user_id)} is now {uus.status.GetType().Name[10..]}");
|
||||
break;
|
||||
case UpdateUserName uun:
|
||||
Console.WriteLine(
|
||||
$"{users.User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}");
|
||||
break;
|
||||
case UpdateUser uu:
|
||||
Console.WriteLine($"{users.User(uu.user_id)} has changed infos/photo");
|
||||
break;
|
||||
default:
|
||||
Console.WriteLine(update.GetType().Name);
|
||||
break; // there are much more update types than the above example cases
|
||||
}
|
||||
updates.CollectUsersChats(users, chats);
|
||||
if (updates is UpdateShortMessage usm && !users.ContainsKey(usm.user_id))
|
||||
(await client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats(
|
||||
users, chats);
|
||||
else if (updates is UpdateShortChatMessage uscm &&
|
||||
(!users.ContainsKey(uscm.from_id) ||
|
||||
!chats.ContainsKey(uscm.chat_id)))
|
||||
(await client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats(
|
||||
users,
|
||||
chats);
|
||||
foreach (var update in updates.UpdateList)
|
||||
switch (update)
|
||||
{
|
||||
case UpdateNewMessage unm:
|
||||
await _newMessageHandler.Handle(unm.message);
|
||||
break;
|
||||
case UpdateEditMessage uem:
|
||||
await _editMessageHandler.Handle(uem.message);
|
||||
break;
|
||||
// Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases
|
||||
case UpdateDeleteChannelMessages udcm:
|
||||
Console.WriteLine(
|
||||
$"{udcm.messages.Length} message(s) deleted in {chats.Chat(udcm.channel_id)}");
|
||||
break;
|
||||
case UpdateDeleteMessages udm:
|
||||
Console.WriteLine($"{udm.messages.Length} message(s) deleted");
|
||||
break;
|
||||
case UpdateUserTyping uut:
|
||||
Console.WriteLine($"{users.User(uut.user_id)} is {uut.action}");
|
||||
break;
|
||||
case UpdateChatUserTyping ucut:
|
||||
Console.WriteLine(
|
||||
$"{ucut.from_id.Peer(users, chats)} is {ucut.action} in {chats.Chat(ucut.chat_id)}");
|
||||
break;
|
||||
case UpdateChannelUserTyping ucut2:
|
||||
Console.WriteLine(
|
||||
$"{ucut2.from_id.Peer(users, chats)} is {ucut2.action} in {chats.Chat(ucut2.channel_id)}");
|
||||
break;
|
||||
case UpdateChatParticipants { participants: ChatParticipants cp }:
|
||||
Console.WriteLine(
|
||||
$"{cp.participants.Length} participants in {chats.Chat(cp.chat_id)}");
|
||||
break;
|
||||
case UpdateUserStatus uus:
|
||||
Console.WriteLine(
|
||||
$"{users.User(uus.user_id)} is now {uus.status.GetType().Name[10..]}");
|
||||
break;
|
||||
case UpdateUserName uun:
|
||||
Console.WriteLine(
|
||||
$"{users.User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}");
|
||||
break;
|
||||
case UpdateUser uu:
|
||||
Console.WriteLine($"{users.User(uu.user_id)} has changed infos/photo");
|
||||
break;
|
||||
default:
|
||||
Console.WriteLine(update.GetType().Name);
|
||||
break; // there are much more update types than the above example cases
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Critical error in HandleUpdate");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
|
||||
using Prometheus;
|
||||
using TL;
|
||||
using TL.Methods;
|
||||
using WTelegram;
|
||||
@ -17,6 +18,9 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
|
||||
private readonly ITelegramClientContainer _telegramClientContainer;
|
||||
private readonly TelegramRegistry _telegramRegistry;
|
||||
|
||||
private DateTime _lastActivityTime = DateTime.UtcNow;
|
||||
private readonly TimeSpan _activityTimeout = TimeSpan.FromMinutes(10);
|
||||
|
||||
public UpdateListenerBackgroundService(
|
||||
ILogger<UpdateListenerBackgroundService> logger,
|
||||
IServiceProvider serviceProvider,
|
||||
@ -35,35 +39,72 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
|
||||
{
|
||||
Helpers.Log = (l, s) => System.Diagnostics.Debug.WriteLine(s);
|
||||
Client? client = null;
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
if (client == null)
|
||||
try
|
||||
{
|
||||
_telegramRegistry.Clear();
|
||||
_telegramClientContainer.Reset();
|
||||
_telegramClientContainer.Initialize();
|
||||
if (client == null || _lastActivityTime < DateTime.UtcNow - _activityTimeout)
|
||||
{
|
||||
if (client != null)
|
||||
{
|
||||
_logger.LogWarning("No activity detected for {Timeout}. Reconnecting...", _activityTimeout);
|
||||
client.OnUpdate -= HandleUpdates;
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
client = _telegramClientContainer.Client;
|
||||
client.OnUpdate += HandleUpdates;
|
||||
var my = await client.LoginUserIfNeeded();
|
||||
_telegramRegistry.SetMy(my);
|
||||
_telegramRegistry.Clear();
|
||||
_telegramClientContainer.Reset();
|
||||
_telegramClientContainer.Initialize();
|
||||
|
||||
_logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})",
|
||||
_telegramRegistry.My.username ??
|
||||
_telegramRegistry.My.first_name + " " + _telegramRegistry.My.last_name,
|
||||
_telegramRegistry.My.id);
|
||||
client = _telegramClientContainer.Client;
|
||||
client.OnUpdate += HandleUpdates;
|
||||
|
||||
await _telegramRegistry.Update(client);
|
||||
var my = await client.LoginUserIfNeeded();
|
||||
_telegramRegistry.SetMy(my);
|
||||
|
||||
_logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})",
|
||||
_telegramRegistry.My.username ??
|
||||
_telegramRegistry.My.first_name + " " + _telegramRegistry.My.last_name,
|
||||
_telegramRegistry.My.id);
|
||||
|
||||
await _telegramRegistry.Update(client);
|
||||
|
||||
_lastActivityTime = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in main loop. Restarting...");
|
||||
client?.Dispose();
|
||||
client = null;
|
||||
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
|
||||
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
|
||||
|
||||
client?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleUpdates(UpdatesBase updates)
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var updateHandler = scope.ServiceProvider.GetRequiredService<IUpdateHandler>();
|
||||
await updateHandler.HandleUpdate(updates);
|
||||
TelegramListenerMetrics.UpdatesReceived.Inc();
|
||||
TelegramListenerMetrics.LastUpdateTimestamp.SetToCurrentTimeUtc();
|
||||
|
||||
try
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var updateHandler = scope.ServiceProvider.GetRequiredService<IUpdateHandler>();
|
||||
await updateHandler.HandleUpdate(updates);
|
||||
|
||||
TelegramListenerMetrics.UpdatesProcessed.Inc();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
TelegramListenerMetrics.UpdatesErrors.Inc();
|
||||
_logger.LogError(ex, "Error handling updates.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,8 @@ public class Startup
|
||||
services.AddAppServices(Configuration);
|
||||
|
||||
services.Configure<RebusRabbitMqOptions>(Configuration.GetSection(nameof(RebusRabbitMqOptions)));
|
||||
services.AddHealthChecks()
|
||||
.AddCheck<TelegramListenerHealthCheck>("telegram_listener");
|
||||
services.AddRebus((builder, ctx) =>
|
||||
builder.Transport(t =>
|
||||
{
|
||||
@ -51,5 +53,7 @@ public class Startup
|
||||
var bus = app.ApplicationServices.GetRequiredService<IBus>();
|
||||
// TODO: BackgroundService
|
||||
bus.Advanced.Topics.Subscribe(Constants.RoutingKeys.Subscriptions).GetAwaiter().GetResult();
|
||||
|
||||
app.UseHealthChecks("/health");
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||
using Nocr.TelegramListener.AppServices.UpdateListeners;
|
||||
using TL;
|
||||
|
||||
public class TelegramListenerHealthCheck(
|
||||
ILogger<TelegramListenerHealthCheck> logger,
|
||||
ITelegramClientContainer clientContainer
|
||||
) : IHealthCheck
|
||||
{
|
||||
private readonly ITelegramClientContainer _clientContainer = clientContainer;
|
||||
private readonly ILogger<TelegramListenerHealthCheck> _logger = logger;
|
||||
|
||||
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!_clientContainer.Initialized)
|
||||
{
|
||||
return HealthCheckResult.Unhealthy("Telegram client not initialized");
|
||||
}
|
||||
|
||||
var me = await _clientContainer.Client.Users_GetFullUser(_clientContainer.Client.User);
|
||||
if (me == null)
|
||||
{
|
||||
return HealthCheckResult.Unhealthy("Cannot get self info from Telegram");
|
||||
}
|
||||
|
||||
return HealthCheckResult.Healthy($"Connected as {me?.full_user.about}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Health check failed");
|
||||
return HealthCheckResult.Unhealthy("Error communicating with Telegram", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user