Again changes listener service to updateManager one.

This commit is contained in:
ruberoid 2025-05-13 20:49:12 +04:00
parent c086c35b80
commit 09814ad249
4 changed files with 43 additions and 69 deletions

View File

@ -6,25 +6,14 @@ namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
public sealed class UpdateHandler : IUpdateHandler
{
private readonly ILogger<UpdateHandler> _logger;
private readonly ITelegramClientContainer _clientContainer;
private readonly TelegramRegistry _telegramRegistry;
private readonly INewMessageHandler _newMessageHandler;
private readonly IEditMessageHandler _editMessageHandler;
private string Peer(Peer peer) => _clientContainer.Manager.UserOrChat(peer)?.ToString() ?? $"Peer {peer?.ID}";
public UpdateHandler(
ILogger<UpdateHandler> logger,
ITelegramClientContainer clientContainer,
TelegramRegistry telegramRegistry,
INewMessageHandler newMessageHandler,
IEditMessageHandler editMessageHandler)
INewMessageHandler newMessageHandler)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_clientContainer = clientContainer ?? throw new ArgumentNullException(nameof(clientContainer));
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
_newMessageHandler = newMessageHandler ?? throw new ArgumentNullException(nameof(newMessageHandler));
_editMessageHandler = editMessageHandler ?? throw new ArgumentNullException(nameof(editMessageHandler));
}
public async Task HandleUpdate(Update update, CancellationToken cancellationToken = default)
@ -54,12 +43,9 @@ public sealed class UpdateHandler : IUpdateHandler
switch (messageBase)
{
case Message m:
_logger.LogInformation($"{Peer(m.from_id) ?? m.post_author} in {Peer(m.peer_id)}> {m.message}");
_logger.LogInformation($"{m.from_id} in {m.peer_id}> {m.message}");
await _newMessageHandler.Handle(m);
break;
case MessageService ms:
_logger.LogInformation($"{Peer(ms.from_id)} in {Peer(ms.peer_id)} [{ms.action.GetType().Name[13..]}]");
break;
}
}
}

View File

@ -6,8 +6,6 @@ public interface ITelegramClientContainer
{
Client Client { get; }
UpdateManager Manager { get; set; }
public bool Initialized { get; }
public void Initialize();

View File

@ -13,8 +13,7 @@ public sealed class TelegramClientContainer(IOptions<WTelegramClientOptions> opt
public Client Client => _client ?? throw new InvalidOperationException("Client not initialized yet.");
public bool Initialized { get; private set; }
public required UpdateManager Manager { get; set; }
private readonly object _syncRoot = new();
public void Initialize()

View File

@ -16,94 +16,85 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
private readonly ILogger<UpdateListenerBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly ITelegramClientContainer _telegramClientContainer;
private readonly TelegramRegistry _telegramRegistry;
private DateTime _lastActivityTime = DateTime.UtcNow;
private readonly TimeSpan _activityTimeout = TimeSpan.FromMinutes(10);
private UpdateManager? _updateManager;
private readonly INewMessageHandler _newMessageHandler;
public UpdateListenerBackgroundService(
ILogger<UpdateListenerBackgroundService> logger,
IServiceProvider serviceProvider,
ITelegramClientContainer telegramClientContainer,
TelegramRegistry telegramRegistry)
INewMessageHandler newMessageHandler)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_telegramClientContainer =
telegramClientContainer ?? throw new ArgumentNullException(nameof(telegramClientContainer));
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
_newMessageHandler = newMessageHandler ?? throw new ArgumentNullException(nameof(newMessageHandler));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Helpers.Log = (l, s) => System.Diagnostics.Debug.WriteLine(s);
Client? client = null;
while (!stoppingToken.IsCancellationRequested)
{
try
{
if (client == null || _lastActivityTime < DateTime.UtcNow - _activityTimeout)
if (_telegramClientContainer.Client != null)
{
if (client != null)
{
_logger.LogWarning("No activity detected for {Timeout}. Reconnecting...", _activityTimeout);
client.Dispose();
}
_telegramRegistry.Clear();
_logger.LogWarning($"Liveness timeout exceed. Resetting WTelegram client..");
_telegramClientContainer.Reset();
_telegramClientContainer.Initialize();
client = _telegramClientContainer.Client;
_telegramClientContainer.Manager = client.WithUpdateManager(HandleUpdates);
var my = await client.LoginUserIfNeeded();
_telegramRegistry.SetMy(my);
_logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})",
_telegramRegistry.My.username ??
_telegramRegistry.My.first_name + " " + _telegramRegistry.My.last_name,
_telegramRegistry.My.id);
await _telegramRegistry.Update(client);
_lastActivityTime = DateTime.UtcNow;
}
_telegramClientContainer.Initialize();
_updateManager = _telegramClientContainer.Client.WithUpdateManager(HandleUpdates);
var my = await _telegramClientContainer.Client.LoginUserIfNeeded();
_logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})",
my.username ??
my.first_name + " " + my.last_name,
my.id);
var dialogs = await _telegramClientContainer.Client.Messages_GetAllDialogs();
dialogs.CollectUsersChats(_updateManager.Users, _updateManager.Chats);
await Task.Delay(TimeSpan.FromHours(2), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in main loop. Restarting...");
client?.Dispose();
client = null;
_telegramClientContainer.Reset();
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
continue;
}
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
client?.Dispose();
}
}
private async Task HandleUpdates(Update update)
{
TelegramListenerMetrics.UpdatesReceived.Inc();
TelegramListenerMetrics.LastUpdateTimestamp.SetToCurrentTimeUtc();
_logger.LogInformation($"Update of type: {update.GetType().Name} received.");
try
switch (update)
{
using var scope = _serviceProvider.CreateScope();
var updateHandler = scope.ServiceProvider.GetRequiredService<IUpdateHandler>();
await updateHandler.HandleUpdate(update);
TelegramListenerMetrics.UpdatesProcessed.Inc();
}
catch (Exception ex)
{
TelegramListenerMetrics.UpdatesErrors.Inc();
_logger.LogError(ex, "Error handling updates.");
case UpdateNewMessage unm: await HandleMessage(unm.message); break;
case UpdateEditMessage uem: await HandleMessage(uem.message, true); break;
default: break;
}
}
private Task HandleMessage(MessageBase messageBase, bool edit = false)
{
switch (messageBase)
{
case Message m: _newMessageHandler.Handle(m); break;
}
return Task.CompletedTask;
}
}