Compare commits

..

No commits in common. "main" and "feature/reconnect_and_wide_log" have entirely different histories.

36 changed files with 382 additions and 538 deletions

View File

@ -2,15 +2,12 @@
**/.env
**/.git
**/.gitignore
**/.gitattributes
**/.gitmodules
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/.idea
**/.fleet
**/*.*proj.user
**/*.dbmdl
**/*.jfm
@ -22,43 +19,7 @@
**/node_modules
**/npm-debug.log
**/obj
**/packages
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
# Test artifacts
**/TestResults/
**/test-results/
**/tests/
**/*Tests/
**/*.Tests/
**/*Test/
**/*.Test/
# CI/CD
**/.drone.yml
**/_deploy/
# Temporary and log files
**/*.tmp
**/*.log
**/*.swp
**/*.swo
**/~*
# OS files
**/.DS_Store
**/Thumbs.db
# NuGet packages
**/*.nupkg
**/*.snupkg
# Development config files
**/appsettings.Development.json
**/appsettings.*.json.example
**/.nocr.env
**/.nocr.env.example
**/launchSettings.json
README.md

4
.gitignore vendored
View File

@ -230,8 +230,6 @@ Thumbs.db
.idea/
# custom
# Ignore environment-specific appsettings, but allow .example files
/**/**/appsettings.*.json
!/**/**/appsettings.*.json.example
/**/**/appsettings.protected.json
**/**/deployment.yml

View File

@ -6,16 +6,15 @@
<MicrosoftVersion>8.0.0</MicrosoftVersion>
</PropertyGroup>
<ItemGroup Label="Nocr">
<PackageVersion Include="Nocr.TextMatcher.Async.Api.Contracts" />
<PackageVersion Include="prometheus-net" Version="8.2.1" />
<PackageVersion Include="prometheus-net.AspNetCore" Version="8.2.1" />
<PackageVersion Include="Serilog.Enrichers.Thread" Version="4.0.0" />
</ItemGroup>
<ItemGroup Label="Telegram">
<PackageVersion Include="WTelegramClient" Version="4.3.4" />
<PackageVersion Include="WTelegramClient" Version="3.7.1" />
</ItemGroup>
<ItemGroup Label="Rebus">
<PackageVersion Include="Rebus" Version="8.8.0" />
<PackageVersion Include="Rebus.ServiceProvider" Version="10.3.0" />
<PackageVersion Include="Rebus" Version="8.2.2" />
<PackageVersion Include="Rebus.ServiceProvider" Version="10.1.0" />
<PackageVersion Include="Rebus.RabbitMq" Version="9.0.1" />
<PackageVersion Include="Rebus.Serilog" Version="8.0.0" />
</ItemGroup>

View File

@ -4,11 +4,9 @@
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
<PackageReference Include="Nocr.TextMatcher.Async.Api.Contracts" VersionOverride="0.7.*" />
<PackageReference Include="Nocr.TextMatcher.Async.Api.Contracts" />
<PackageReference Include="prometheus-net" />
<PackageReference Include="WTelegramClient" />
<PackageReference Include="Rebus" />

View File

@ -3,7 +3,6 @@ using Microsoft.Extensions.DependencyInjection;
using Nocr.TelegramListener.AppServices.TextMatches;
using Nocr.TelegramListener.AppServices.UpdateListeners;
using Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
using Nocr.TelegramListener.AppServices.UpdateListeners.Publishers;
using Rebus.Config;
namespace Nocr.TelegramListener.AppServices;
@ -24,15 +23,9 @@ public static class ServiceCollectionExtensions
services.AddScoped<IUpdateHandler, UpdateHandler>();
services.AddScoped<INewMessageHandler, NewMessageHandler>();
services.AddScoped<IEditMessageHandler, EditMessageHandler>();
// Message event publishers
services.AddScoped<NewMessagePublisher>();
services.AddScoped<EditedMessagePublisher>();
services.AddScoped<MessageEventPublisherFactory>();
services.AddSingleton<ITelegramClientContainer, TelegramClientContainer>();
services.AddSingleton<TelegramRegistry>();
return services;
}
}

View File

@ -4,5 +4,5 @@ namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
public interface INewMessageHandler
{
Task Handle(MessageBase messageBase, bool isEdit = false);
Task Handle(MessageBase messageBase);
}

View File

@ -4,5 +4,5 @@ namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
public interface IUpdateHandler
{
Task HandleUpdate(Update update, CancellationToken cancellationToken = default);
Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default);
}

View File

@ -17,29 +17,21 @@ public sealed class EditMessageHandler : IEditMessageHandler
public Task Handle(MessageBase messageBase)
{
_logger.LogDebug("Executing {Handler} for message {MessageId}.", nameof(EditMessageHandler), messageBase.ID);
try
_logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID);
switch (messageBase)
{
switch (messageBase)
{
case Message m:
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author,
m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
break;
}
}
catch (Exception ex)
{
_logger.LogError("Error executing {Handler} for message {MessageId}\n{Exception}.", nameof(EditMessageHandler), messageBase.ID, ex);
case Message m:
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats) ?? m.post_author,
m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.from_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
break;
}
return Task.CompletedTask;

View File

@ -1,64 +1,65 @@
using Microsoft.Extensions.Logging;
using Nocr.TelegramListener.AppServices.UpdateListeners.Publishers;
using Nocr.TelegramListener.Async.Api.Contracts.Events;
using Nocr.TelegramListener.Core.Dates;
using Rebus.Bus;
using TL;
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
public sealed class NewMessageHandler : INewMessageHandler
{
private readonly IBus _bus;
private readonly ILogger<NewMessageHandler> _logger;
private readonly TelegramRegistry _telegramRegistry;
private readonly MessageEventPublisherFactory _publisherFactory;
private readonly ICurrentDateProvider _dateProvider;
public NewMessageHandler(
ILogger<NewMessageHandler> logger,
TelegramRegistry telegramRegistry,
MessageEventPublisherFactory publisherFactory)
public NewMessageHandler(IBus bus, ILogger<NewMessageHandler> logger, TelegramRegistry telegramRegistry,
ICurrentDateProvider dateProvider)
{
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
_publisherFactory = publisherFactory ?? throw new ArgumentNullException(nameof(publisherFactory));
_dateProvider = dateProvider ?? throw new ArgumentNullException(nameof(dateProvider));
}
public async Task Handle(MessageBase messageBase, bool isEdit = false)
public async Task Handle(MessageBase messageBase)
{
_logger.LogDebug("Executing {Handler} for message {MessageId}, isEdit={IsEdit}.", nameof(NewMessageHandler), messageBase.ID, isEdit);
try
_logger.LogDebug("Executing {Handler} for message {MessageId}", nameof(EditMessageHandler), messageBase.ID);
switch (messageBase)
{
switch (messageBase)
{
case Message m:
if (string.IsNullOrWhiteSpace(m.message))
break;
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
var chatUserName = m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats)?.Split("@").Last();
if (string.IsNullOrWhiteSpace(chatUserName))
{
_logger.LogWarning("Failed to get chat user name for chat {ChatId}.", m.peer_id.ID);
break;
}
var publisher = _publisherFactory.GetPublisher(isEdit);
await publisher.PublishAsync(m, chatUserName);
case Message m:
if (string.IsNullOrWhiteSpace(m.message))
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
_logger.LogInformation("{From} in {Chat} > {MessageText}",
m.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
m.message);
var chatUserName = m.peer_id.Peer(_telegramRegistry.Users, _telegramRegistry.Chats).Split("@").Last();
if (string.IsNullOrWhiteSpace(chatUserName))
{
_logger.LogWarning("Failed to get chat user name for chat {ChatId}", m.peer_id.ID);
break;
}
}
catch (Exception ex)
{
_logger.LogError("Error executing {Handler} for message {MessageId}\n{Exception}.", nameof(NewMessageHandler), messageBase.ID, ex);
}
var @event = new MessageReceived
{
MessageId = messageBase.ID,
// Для каналов from = null
FromId = m.From?.ID ?? m.Peer.ID,
From = (m.From ?? m.Peer).Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ChatUsername = chatUserName,
Text = m.message,
OccuredDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish("nocr.telegram.listener", @event);
break;
case MessageService ms:
_logger.LogInformation("{From} in {Chat} > [{Action}]",
ms.From?.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.Peer.Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ms.action.GetType().Name[13..]);
break;
}
}
}

View File

@ -3,57 +3,97 @@ using Microsoft.Extensions.Logging;
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
public sealed class UpdateHandler(
ILogger<UpdateHandler> logger,
INewMessageHandler newMessageHandler
) : IUpdateHandler
public sealed class UpdateHandler : IUpdateHandler
{
private readonly ILogger<UpdateHandler> _logger = logger
?? throw new ArgumentNullException(nameof(logger));
private readonly INewMessageHandler _newMessageHandler = newMessageHandler
?? throw new ArgumentNullException(nameof(newMessageHandler));
private readonly ILogger<UpdateHandler> _logger;
private readonly ITelegramClientContainer _clientContainer;
private readonly TelegramRegistry _telegramRegistry;
private readonly INewMessageHandler _newMessageHandler;
private readonly IEditMessageHandler _editMessageHandler;
public async Task HandleUpdate(Update update, CancellationToken cancellationToken = default)
public UpdateHandler(
ILogger<UpdateHandler> logger,
ITelegramClientContainer clientContainer,
TelegramRegistry telegramRegistry,
INewMessageHandler newMessageHandler,
IEditMessageHandler editMessageHandler)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_clientContainer = clientContainer ?? throw new ArgumentNullException(nameof(clientContainer));
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
_newMessageHandler = newMessageHandler ?? throw new ArgumentNullException(nameof(newMessageHandler));
_editMessageHandler = editMessageHandler ?? throw new ArgumentNullException(nameof(editMessageHandler));
}
public async Task HandleUpdate(UpdatesBase updates, CancellationToken cancellationToken = default)
{
TelegramListenerMetrics.UpdatesReceived.Inc();
TelegramListenerMetrics.LastUpdateTimestamp.Set(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
try
{
switch (update)
{
case UpdateNewMessage unm:
await HandleMessage(unm.message);
break;
case UpdateEditMessage uem:
await HandleMessage(uem.message, true);
break;
default:
_logger.LogInformation("Update of type `{UnsupportedUpdateName}` fired.", update.GetType().Name);
_logger.LogDebug("{update}", update);
break;
}
var (client, users, chats) =
(_clientContainer.Client, _telegramRegistry.Users, _telegramRegistry.Chats);
TelegramListenerMetrics.UpdatesProcessed.Inc();
updates.CollectUsersChats(users, chats);
if (updates is UpdateShortMessage usm && !users.ContainsKey(usm.user_id))
(await client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats(
users, chats);
else if (updates is UpdateShortChatMessage uscm &&
(!users.ContainsKey(uscm.from_id) ||
!chats.ContainsKey(uscm.chat_id)))
(await client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats(
users,
chats);
foreach (var update in updates.UpdateList)
switch (update)
{
case UpdateNewMessage unm:
await _newMessageHandler.Handle(unm.message);
break;
case UpdateEditMessage uem:
await _editMessageHandler.Handle(uem.message);
break;
// Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases
case UpdateDeleteChannelMessages udcm:
Console.WriteLine(
$"{udcm.messages.Length} message(s) deleted in {chats.Chat(udcm.channel_id)}");
break;
case UpdateDeleteMessages udm:
Console.WriteLine($"{udm.messages.Length} message(s) deleted");
break;
case UpdateUserTyping uut:
Console.WriteLine($"{users.User(uut.user_id)} is {uut.action}");
break;
case UpdateChatUserTyping ucut:
Console.WriteLine(
$"{ucut.from_id.Peer(users, chats)} is {ucut.action} in {chats.Chat(ucut.chat_id)}");
break;
case UpdateChannelUserTyping ucut2:
Console.WriteLine(
$"{ucut2.from_id.Peer(users, chats)} is {ucut2.action} in {chats.Chat(ucut2.channel_id)}");
break;
case UpdateChatParticipants { participants: ChatParticipants cp }:
Console.WriteLine(
$"{cp.participants.Length} participants in {chats.Chat(cp.chat_id)}");
break;
case UpdateUserStatus uus:
Console.WriteLine(
$"{users.User(uus.user_id)} is now {uus.status.GetType().Name[10..]}");
break;
case UpdateUserName uun:
Console.WriteLine(
$"{users.User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}");
break;
case UpdateUser uu:
Console.WriteLine($"{users.User(uu.user_id)} has changed infos/photo");
break;
default:
Console.WriteLine(update.GetType().Name);
break; // there are much more update types than the above example cases
}
}
catch (Exception ex)
{
TelegramListenerMetrics.UpdatesErrors.Inc();
_logger.LogError(ex, "Critical error in HandleUpdate.");
_logger.LogError(ex, "Critical error in HandleUpdate");
throw;
}
}
private async Task HandleMessage(MessageBase messageBase, bool edit = false)
{
_logger.LogDebug("HandleMessage called for {MessageType} with ID {MessageId}, edit={Edit}",
messageBase.GetType().Name, messageBase.ID, edit);
switch (messageBase)
{
case Message m:
_logger.LogDebug("({Edit}) from `{From}` in `{In}`: {Message}", edit ? "E" : "N", m.from_id, m.peer_id, m.message);
await _newMessageHandler.Handle(m, isEdit: edit);
break;
}
}
}

View File

@ -1,44 +0,0 @@
using Microsoft.Extensions.Logging;
using Nocr.TelegramListener.Async.Api.Contracts.Events;
using Nocr.TelegramListener.Core.Dates;
using Rebus.Bus;
using TL;
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Publishers;
public sealed class EditedMessagePublisher : IMessageEventPublisher
{
private readonly IBus _bus;
private readonly ILogger<EditedMessagePublisher> _logger;
private readonly TelegramRegistry _telegramRegistry;
private readonly ICurrentDateProvider _dateProvider;
public EditedMessagePublisher(
IBus bus,
ILogger<EditedMessagePublisher> logger,
TelegramRegistry telegramRegistry,
ICurrentDateProvider dateProvider)
{
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
_dateProvider = dateProvider ?? throw new ArgumentNullException(nameof(dateProvider));
}
public async Task PublishAsync(Message message, string chatUsername)
{
var @event = new MessageEdited
{
MessageId = message.ID,
// Для каналов from = null
FromId = message.From?.ID ?? message.Peer.ID,
From = (message.From ?? message.Peer).Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ChatUsername = chatUsername,
Text = message.message,
OccuredDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish("nocr.telegram.listener", @event);
_logger.LogDebug("Published MessageEdited for message {MessageId}.", message.ID);
}
}

View File

@ -1,16 +0,0 @@
using TL;
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Publishers;
/// <summary>
/// Interface for publishing message events to the message bus
/// </summary>
public interface IMessageEventPublisher
{
/// <summary>
/// Publishes a message event based on the message content
/// </summary>
/// <param name="message">The Telegram message</param>
/// <param name="chatUsername">Username of the chat</param>
Task PublishAsync(Message message, string chatUsername);
}

View File

@ -1,25 +0,0 @@
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Publishers;
/// <summary>
/// Factory for creating appropriate message event publishers based on message type
/// </summary>
public sealed class MessageEventPublisherFactory
{
private readonly NewMessagePublisher _newMessagePublisher;
private readonly EditedMessagePublisher _editedMessagePublisher;
public MessageEventPublisherFactory(
NewMessagePublisher newMessagePublisher,
EditedMessagePublisher editedMessagePublisher)
{
_newMessagePublisher = newMessagePublisher ?? throw new ArgumentNullException(nameof(newMessagePublisher));
_editedMessagePublisher = editedMessagePublisher ?? throw new ArgumentNullException(nameof(editedMessagePublisher));
}
/// <summary>
/// Gets the appropriate publisher based on whether the message is an edit
/// </summary>
/// <param name="isEdit">True if the message is an edit, false if it's a new message</param>
/// <returns>The appropriate publisher instance</returns>
public IMessageEventPublisher GetPublisher(bool isEdit) => isEdit ? _editedMessagePublisher : _newMessagePublisher;
}

View File

@ -1,44 +0,0 @@
using Microsoft.Extensions.Logging;
using Nocr.TelegramListener.Async.Api.Contracts.Events;
using Nocr.TelegramListener.Core.Dates;
using Rebus.Bus;
using TL;
namespace Nocr.TelegramListener.AppServices.UpdateListeners.Publishers;
public sealed class NewMessagePublisher : IMessageEventPublisher
{
private readonly IBus _bus;
private readonly ILogger<NewMessagePublisher> _logger;
private readonly TelegramRegistry _telegramRegistry;
private readonly ICurrentDateProvider _dateProvider;
public NewMessagePublisher(
IBus bus,
ILogger<NewMessagePublisher> logger,
TelegramRegistry telegramRegistry,
ICurrentDateProvider dateProvider)
{
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_telegramRegistry = telegramRegistry ?? throw new ArgumentNullException(nameof(telegramRegistry));
_dateProvider = dateProvider ?? throw new ArgumentNullException(nameof(dateProvider));
}
public async Task PublishAsync(Message message, string chatUsername)
{
var @event = new MessageReceived
{
MessageId = message.ID,
// Для каналов from = null
FromId = message.From?.ID ?? message.Peer.ID,
From = (message.From ?? message.Peer).Peer(_telegramRegistry.Users, _telegramRegistry.Chats),
ChatUsername = chatUsername,
Text = message.message,
OccuredDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish("nocr.telegram.listener", @event);
_logger.LogDebug("Published MessageReceived for message {MessageId}.", message.ID);
}
}

View File

@ -1,21 +1,24 @@
using Microsoft.Extensions.Options;
using TL;
using WTelegram;
namespace Nocr.TelegramListener.AppServices.UpdateListeners;
public sealed class TelegramClientContainer(IOptions<WTelegramClientOptions> options) : ITelegramClientContainer, IDisposable
public sealed class TelegramClientContainer : ITelegramClientContainer, IDisposable
{
private Client? _client;
private readonly WTelegramClientOptions _options;
private readonly WTelegramClientOptions _options = options.Value ?? throw new ArgumentNullException(nameof(options));
public Client Client => _client ?? throw new InvalidOperationException("Client not initialized yet.");
public Client Client => _client ?? throw new InvalidOperationException("Client not initialized yet");
public bool Initialized { get; private set; }
private readonly object _syncRoot = new();
public TelegramClientContainer(IOptions<WTelegramClientOptions> options)
{
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
}
public void Initialize()
{
lock (_syncRoot)
@ -39,7 +42,7 @@ public sealed class TelegramClientContainer(IOptions<WTelegramClientOptions> opt
}
}
private string? ConfigureWTelegramClient(string what)
private string ConfigureWTelegramClient(string what)
{
switch (what)
{
@ -51,7 +54,10 @@ public sealed class TelegramClientContainer(IOptions<WTelegramClientOptions> opt
case "verification_code":
Console.Write("Code: ");
return Console.ReadLine();
default: return null;
//case "first_name": return "Dmitry"; // if sign-up is required
//case "last_name": return "Charushnikov"; // if sign-up is required
//case "password": return ""; // if user has enabled 2FA
default: return null; // let WTelegramClient decide the default config
}
}

View File

@ -6,15 +6,11 @@ public static class TelegramObjectExtensions
{
public static string User(this IDictionary<long, User> dictionary, long id) =>
dictionary.TryGetValue(id, out var user) ? user.ToString() : $"User {id}";
public static string Chat(this IDictionary<long, ChatBase> dictionary, long id) =>
dictionary.TryGetValue(id, out var chat) ? chat.ToString() ?? $"Null chat {id}" : $"Chat {id}";
public static string? Peer(this Peer? peer, IDictionary<long, User> users, IDictionary<long, ChatBase> chats) => peer switch
{
null => null,
PeerUser user => users.User(user.user_id),
PeerChat or PeerChannel => chats.Chat(peer.ID),
_ => $"Peer {peer.ID}"
};
public static string Chat(this IDictionary<long, ChatBase> dictionary, long id) =>
dictionary.TryGetValue(id, out var chat) ? chat.ToString() : $"Chat {id}";
public static string Peer(this Peer peer, IDictionary<long, User> users, IDictionary<long, ChatBase> chats) => peer is null ? null
: peer is PeerUser user ? users.User(user.user_id)
: peer is PeerChat or PeerChannel ? chats.Chat(peer.ID) : $"Peer {peer.ID}";
}

View File

@ -6,7 +6,7 @@ namespace Nocr.TelegramListener.AppServices.UpdateListeners;
public sealed class TelegramRegistry
{
public User? My { get; private set; }
public User My { get; private set; }
public ConcurrentDictionary<long, User> Users = new();
public ConcurrentDictionary<long, ChatBase> Chats = new();

View File

@ -1,9 +1,12 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Nocr.TelegramListener.AppServices.UpdateListeners.Handlers;
using Prometheus;
using TL;
using TL.Methods;
using WTelegram;
namespace Nocr.TelegramListener.AppServices.UpdateListeners;
@ -15,7 +18,8 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
private readonly ITelegramClientContainer _telegramClientContainer;
private readonly TelegramRegistry _telegramRegistry;
private DateTimeOffset _activityStartTime = DateTimeOffset.UtcNow;
private DateTime _lastActivityTime = DateTime.UtcNow;
private readonly TimeSpan _activityTimeout = TimeSpan.FromMinutes(10);
public UpdateListenerBackgroundService(
ILogger<UpdateListenerBackgroundService> logger,
@ -35,49 +39,72 @@ public sealed class UpdateListenerBackgroundService : BackgroundService
{
Helpers.Log = (l, s) => System.Diagnostics.Debug.WriteLine(s);
Client? client = null;
while (!stoppingToken.IsCancellationRequested)
{
if (DateTimeOffset.UtcNow - TimeSpan.FromHours(2) > _activityStartTime)
try
{
_logger.LogInformation("Timeout exceeded.");
_activityStartTime = DateTimeOffset.UtcNow;
if (client == null || _lastActivityTime < DateTime.UtcNow - _activityTimeout)
{
if (client != null)
{
_logger.LogWarning("No activity detected for {Timeout}. Reconnecting...", _activityTimeout);
client.OnUpdate -= HandleUpdates;
client.Dispose();
}
_telegramRegistry.Clear();
_telegramClientContainer.Reset();
_telegramClientContainer.Initialize();
client = _telegramClientContainer.Client;
client.OnUpdate += HandleUpdates;
var my = await client.LoginUserIfNeeded();
_telegramRegistry.SetMy(my);
_logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})",
_telegramRegistry.My.username ??
_telegramRegistry.My.first_name + " " + _telegramRegistry.My.last_name,
_telegramRegistry.My.id);
await _telegramRegistry.Update(client);
_lastActivityTime = DateTime.UtcNow;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in main loop. Restarting...");
client?.Dispose();
client = null;
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
continue;
}
if (client == null)
{
_telegramRegistry.Clear();
_telegramClientContainer.Reset();
_telegramClientContainer.Initialize();
client = _telegramClientContainer.Client;
client.OnUpdates += HandleUpdates;
var my = await client.LoginUserIfNeeded();
_telegramRegistry.SetMy(my);
_logger.LogInformation("Telegram client is logged-in as {Username} (id {Id})",
_telegramRegistry.My?.username ??
_telegramRegistry.My?.first_name + " " + _telegramRegistry.My?.last_name,
_telegramRegistry.My?.id);
}
await _telegramRegistry.Update(client);
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
client?.Dispose();
}
}
private async Task HandleUpdates(UpdatesBase updates)
{
using var scope = _serviceProvider.CreateScope();
var updateHandler = scope.ServiceProvider.GetRequiredService<IUpdateHandler>();
TelegramListenerMetrics.UpdatesReceived.Inc();
TelegramListenerMetrics.LastUpdateTimestamp.SetToCurrentTimeUtc();
_logger.LogDebug("Received {UpdateType} with {Count} updates", updates.GetType().Name, updates.UpdateList.Length);
foreach (var update in updates.UpdateList)
try
{
_logger.LogDebug("Processing update {UpdateType} (ID: {UpdateId})", update.GetType().Name, update.GetType().GetProperty("ID")?.GetValue(update) ?? "N/A");
await updateHandler.HandleUpdate(update);
using var scope = _serviceProvider.CreateScope();
var updateHandler = scope.ServiceProvider.GetRequiredService<IUpdateHandler>();
await updateHandler.HandleUpdate(updates);
TelegramListenerMetrics.UpdatesProcessed.Inc();
}
catch (Exception ex)
{
TelegramListenerMetrics.UpdatesErrors.Inc();
_logger.LogError(ex, "Error handling updates.");
}
}
}

View File

@ -5,11 +5,11 @@ namespace Nocr.TelegramListener.AppServices.UpdateListeners;
/// </summary>
public sealed class WTelegramClientOptions
{
public required string ApiId { get; set; }
public string ApiId { get; set; }
public required string ApiHash { get; set; }
public string ApiHash { get; set; }
public required string PhoneNumber { get; set; }
public string PhoneNumber { get; set; }
public string? FirstName { get; set; }

View File

@ -1,39 +0,0 @@
namespace Nocr.TelegramListener.Async.Api.Contracts.Events;
/// <summary>
/// Событие редактирования сообщения в Telegram
/// </summary>
public sealed class MessageEdited : IEvent
{
public Guid Id { get; } = Guid.NewGuid();
/// <summary>
/// Текст сообщения после редактирования
/// </summary>
public string Text { get; set; } = default!;
/// <summary>
/// Идентификатор отправителя
/// </summary>
public long FromId { get; set; }
/// <summary>
/// Имя или username отправителя
/// </summary>
public string? From { get; set; }
/// <summary>
/// Идентификатор сообщения
/// </summary>
public long MessageId { get; set; }
/// <summary>
/// Username чата отправителя
/// </summary>
public string ChatUsername { get; set; } = default!;
/// <summary>
/// Дата редактирования сообщения
/// </summary>
public DateTimeOffset OccuredDateTime { get; set; }
}

View File

@ -34,7 +34,7 @@ public abstract class RepeatableBackgroundService<THandler, TOptions> : Backgrou
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Failed to process.");
_logger.LogCritical(ex, "Failed to process...");
await Task.Delay(_options.ExceptionInterval, stoppingToken);
}
}

View File

@ -2,11 +2,11 @@ namespace Nocr.TelegramListener.Core.Options;
public sealed class RebusRabbitMqOptions
{
public required string ConnectionString { get; set; }
public string ConnectionString { get; set; }
public required string InputQueueName { get; set; }
public string InputQueueName { get; set; }
public required string DirectExchangeName { get; set; }
public string DirectExchangeName { get; set; }
public required string TopicsExchangeName { get; set; }
public string TopicsExchangeName { get; set; }
}

View File

@ -1,34 +1,21 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
# Install curl for healthcheck
RUN apt-get update && \
apt-get install -y --no-install-recommends curl && \
rm -rf /var/lib/apt/lists/*
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
# Install CA certificates for custom NuGet source
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \
update-ca-certificates && \
rm -rf /var/lib/apt/lists/*
# Copy nuget.config from build context root
# Must be copied to submodule root before building:
# - CI: pipeline copies it with "cp nuget.config telegram-listener/"
# - Docker Compose: uses additional_contexts but file must also be copied
COPY nuget.config /root/.nuget/NuGet/NuGet.Config
COPY . .
# Publish directly (restore + build + publish in one step)
RUN dotnet nuget add source -n "musk.fun.nocr" "https://gitea.musk.fun/api/packages/nocr/nuget/index.json"
RUN dotnet restore "src/Nocr.TelegramListener.Host/Nocr.TelegramListener.Host.csproj"
WORKDIR "/src/src/Nocr.TelegramListener.Host"
RUN dotnet build "Nocr.TelegramListener.Host.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "Nocr.TelegramListener.Host.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=build /app/publish .
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "Nocr.TelegramListener.Host.dll"]

View File

@ -7,31 +7,17 @@ public class HostBuilderFactory<TStartup> where TStartup : class
public IHostBuilder CreateHostBuilder(string[] args, string? baseDirectory = null)
{
var builder = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(host => { host.UseStartup<TStartup>(); })
.ConfigureAppConfiguration((context, configurationBuilder) =>
.ConfigureAppConfiguration((_, configurationBuilder) =>
{
if (!string.IsNullOrWhiteSpace(baseDirectory))
configurationBuilder.SetBasePath(baseDirectory);
// Configuration priority (low to high):
// 1. appsettings.json (already loaded by CreateDefaultBuilder)
// 2. appsettings.{Environment}.json (already loaded)
// 3. User Secrets (already loaded in Development)
// 4. Environment Variables (already loaded) ← highest priority
// Debug mode: log configuration on startup
if (Environment.GetEnvironmentVariable("NOCR_DEBUG_MODE") == "true")
{
Console.WriteLine("[NOCR_DEBUG] Configuration debug mode enabled");
Console.WriteLine($"[NOCR_DEBUG] Environment: {context.HostingEnvironment.EnvironmentName}");
Console.WriteLine($"[NOCR_DEBUG] Base Path: {baseDirectory ?? configurationBuilder.Build().GetValue<string>("ContentRoot") ?? "default"}");
}
configurationBuilder.AddJsonFile(".secrets/appsettings.protected.json", true);
})
.ConfigureWebHostDefaults(host => { host.UseStartup<TStartup>(); })
.UseSerilog((ctx, logBuilder) =>
{
logBuilder.ReadFrom.Configuration(ctx.Configuration)
.Enrich.With<ShortSourceContextEnricher>()
.Enrich.WithThreadId()
.Enrich.FromLogContext();
});

View File

@ -1,16 +0,0 @@
using Serilog.Core;
using Serilog.Events;
public class ShortSourceContextEnricher : ILogEventEnricher
{
public void Enrich(LogEvent logEvent, ILogEventPropertyFactory propertyFactory)
{
if (logEvent.Properties.TryGetValue("SourceContext", out var sourceContextValue))
{
var fullName = sourceContextValue.ToString().Trim('"');
var shortName = fullName.Split('.').Last();
var shortContext = propertyFactory.CreateProperty("ShortSourceContext", shortName);
logEvent.AddPropertyIfAbsent(shortContext);
}
}
}

View File

@ -1,10 +1,8 @@
using Microsoft.Extensions.Options;
using Nocr.TelegramListener.AppServices;
using Nocr.TelegramListener.AppServices.UpdateListeners;
using Nocr.TelegramListener.Async.Api.Contracts.Events;
using Nocr.TelegramListener.Core.Dates;
using Nocr.TelegramListener.Core.Options;
using Prometheus;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Routing.TypeBased;
@ -23,12 +21,6 @@ public class Startup
public void ConfigureServices(IServiceCollection services)
{
// Debug mode: log loaded configuration
if (Environment.GetEnvironmentVariable("NOCR_DEBUG_MODE") == "true")
{
LogConfigurationDebug();
}
services.AddSingleton<ICurrentDateProvider, DefaultCurrentDateProvider>();
services.AddAppServices(Configuration);
@ -39,6 +31,14 @@ public class Startup
builder.Transport(t =>
{
var rebusOptions = ctx.GetRequiredService<IOptions<RebusRabbitMqOptions>>().Value;
Console.WriteLine("*** DEBUG ***");
Console.WriteLine(Environment.GetEnvironmentVariable("RebusRabbitMqOptions__ConnectionString"));
Console.WriteLine(rebusOptions.ConnectionString);
Console.WriteLine("*** DEBUG ***");
t.UseRabbitMq(rebusOptions.ConnectionString, rebusOptions.InputQueueName)
.DefaultQueueOptions(queue => queue.SetDurable(true))
.ExchangeNames(rebusOptions.DirectExchangeName, rebusOptions.TopicsExchangeName);
@ -48,53 +48,12 @@ public class Startup
.Routing(r => r.TypeBased()));
}
private void LogConfigurationDebug()
{
Console.WriteLine("=== [NOCR_DEBUG] Configuration Values ===");
var rebusOptions = Configuration.GetSection(nameof(RebusRabbitMqOptions));
Console.WriteLine($"[NOCR_DEBUG] RebusRabbitMqOptions:");
Console.WriteLine($"[NOCR_DEBUG] ConnectionString: {MaskConnectionString(rebusOptions["ConnectionString"])}");
Console.WriteLine($"[NOCR_DEBUG] InputQueueName: {rebusOptions["InputQueueName"]}");
Console.WriteLine($"[NOCR_DEBUG] DirectExchangeName: {rebusOptions["DirectExchangeName"]}");
Console.WriteLine($"[NOCR_DEBUG] TopicsExchangeName: {rebusOptions["TopicsExchangeName"]}");
var wtOptions = Configuration.GetSection(nameof(WTelegramClientOptions));
Console.WriteLine($"[NOCR_DEBUG] WTelegramClientOptions:");
Console.WriteLine($"[NOCR_DEBUG] ApiId: {wtOptions["ApiId"]}");
Console.WriteLine($"[NOCR_DEBUG] ApiHash: {MaskSecret(wtOptions["ApiHash"])}");
Console.WriteLine($"[NOCR_DEBUG] PhoneNumber: {MaskSecret(wtOptions["PhoneNumber"])}");
Console.WriteLine("=== [NOCR_DEBUG] End Configuration ===");
}
private static string MaskConnectionString(string? value)
{
if (string.IsNullOrEmpty(value)) return "(empty)";
// Mask password in connection string: amqp://user:pass@host -> amqp://user:***@host
var masked = System.Text.RegularExpressions.Regex.Replace(value, @"(?<=://)([^:]+):([^@]+)(?=@)", m => $"{m.Groups[1].Value}:***");
return masked;
}
private static string MaskSecret(string? value)
{
if (string.IsNullOrEmpty(value)) return "(empty)";
if (value.Length <= 4) return "***";
return $"{value.Substring(0, 2)}...{value.Substring(value.Length - 2)}";
}
public void Configure(IApplicationBuilder app)
{
var bus = app.ApplicationServices.GetRequiredService<IBus>();
// TODO: BackgroundService
bus.Advanced.Topics.Subscribe(Constants.RoutingKeys.Subscriptions).GetAwaiter().GetResult();
app.UseHttpMetrics();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapHealthChecks("/health");
endpoints.MapMetrics();
});
app.UseHealthChecks("/health");
}
}

View File

@ -29,7 +29,7 @@ public class TelegramListenerHealthCheck(
}
catch (Exception ex)
{
_logger.LogError(ex, "Health check failed.");
_logger.LogError(ex, "Health check failed");
return HealthCheckResult.Unhealthy("Error communicating with Telegram", ex);
}
}

View File

@ -6,9 +6,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="prometheus-net.AspNetCore" />
<PackageReference Include="Serilog.AspNetCore" />
<PackageReference Include="Serilog.Enrichers.Thread" />
</ItemGroup>
<ItemGroup Label="Rebus">

View File

@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.002.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Nocr.TelegramListener.Host", "Nocr.TelegramListener.Host.csproj", "{CFF9C09C-D157-4B97-87B4-74008ACA2429}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{CFF9C09C-D157-4B97-87B4-74008ACA2429}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CFF9C09C-D157-4B97-87B4-74008ACA2429}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CFF9C09C-D157-4B97-87B4-74008ACA2429}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CFF9C09C-D157-4B97-87B4-74008ACA2429}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4FC53B3E-B8EC-4E59-ADE5-80ABAD60F371}
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,15 @@
{
"Serilog": {
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "[{Level:u3}] {Timestamp:MM-dd HH:mm:ss} {TraceId} {SourceContext:l} {Message:lj}{NewLine}{Exception}"
}
}
]
},
"RebusRabbitMqOptions": {
"ConnectionString": "amqp://admin:admin@localhost:5672/"
}
}

View File

@ -1,17 +0,0 @@
{
// This file provides an example configuration for local VS Code debugging.
// Copy this file to appsettings.Development.json and fill in your actual values.
// Alternatively, use environment variables or User Secrets for sensitive data.
"RebusRabbitMqOptions": {
// RabbitMQ connection string for local development
// Format: amqp://username:password@hostname:port/
"ConnectionString": "amqp://admin:admin@localhost:5672/"
},
"WTelegramClientOptions": {
// Telegram API credentials - obtain from https://my.telegram.org/apps
"ApiId": "YOUR_API_ID",
"ApiHash": "YOUR_API_HASH",
"PhoneNumber": "YOUR_PHONE_NUMBER"
}
}

View File

@ -0,0 +1,28 @@
{
"Serilog": {
"MinimumLevel": {
"Default": "Information"
},
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "[{Level:u3}] {Timestamp:MM-dd HH:mm:ss} {TraceId} {SourceContext:l} {Message:lj}{NewLine}{Exception}"
}
},
{
"Name": "File",
"Args": {
"path": "/var/log/nocr/telegram-listener/telegram-listener-.log",
"outputTemplate": "[{Level:u3}] {Timestamp:dd-MM-yyyy HH:mm:ss} {TraceId} {SourceContext:l} {Message:lj}{NewLine}{Exception}",
"fileSizeLimitBytes": 104857600,
"rollingInterval": "Day",
"rollOnFileSizeLimit": true
}
}
]
},
"RebusRabbitMqOptions": {
"ConnectionString": "amqp://admin:admin@nocr-rabbitmq:5672/"
}
}

View File

@ -1,11 +0,0 @@
{
// This file provides an example configuration for Docker Compose deployment.
// Copy this file to appsettings.DockerCompose.json (or use environment variables).
"RebusRabbitMqOptions": {
// RabbitMQ connection string - use service name from docker-compose.yml
"ConnectionString": "amqp://admin:admin@nocr-rabbitmq:5672/"
}
// Note: WTelegramClientOptions should be provided via environment variables
// See docker-compose.yml or use .env file
}

View File

@ -0,0 +1,28 @@
{
"Serilog": {
"MinimumLevel": {
"Default": "Information"
},
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "[{Level:u3}] {Timestamp:MM-dd HH:mm:ss} {TraceId} {SourceContext:l} {Message:lj}{NewLine}{Exception}"
}
},
{
"Name": "File",
"Args": {
"path": "/var/log/nocr/telegram-listener/telegram-listener-.log",
"outputTemplate": "[{Level:u3}] {Timestamp:dd-MM-yyyy HH:mm:ss} {TraceId} {SourceContext:l} {Message:lj}{NewLine}{Exception}",
"fileSizeLimitBytes": 104857600,
"rollingInterval": "Day",
"rollOnFileSizeLimit": true
}
}
]
},
"RebusRabbitMqOptions": {
"ConnectionString": ""
}
}

View File

@ -7,25 +7,11 @@
"Microsoft.AspNetCore": "Error",
"System.Net.Http.HttpClient": "Warning"
}
},
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "[{Level:u3}] {Timestamp:MM-dd HH:mm:ss} [#{ThreadId}] {ShortSourceContext:l}: {Message:lj}{NewLine}{Exception}"
}
}
]
}
},
"RebusRabbitMqOptions": {
"ConnectionString": "",
"InputQueueName": "nocr.telegram.listener.queue",
"DirectExchangeName": "nocr.direct",
"TopicsExchangeName": "nocr.topics"
},
"WTelegramClientOptions": {
"ApiId": "",
"ApiHash": "",
"PhoneNumber": ""
}
}

View File

@ -0,0 +1,33 @@
{
"Serilog": {
"MinimumLevel": {
"Default": "Information"
},
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "[{Level:u3}] {Timestamp:MM-dd HH:mm:ss} {TraceId} {SourceContext:l} {Message:lj}{NewLine}{Exception}"
}
},
{
"Name": "File",
"Args": {
"path": "/var/log/nocr/telegram-listener/telegram-listener-.log",
"outputTemplate": "[{Level:u3}] {Timestamp:dd-MM-yyyy HH:mm:ss} {TraceId} {SourceContext:l} {Message:lj}{NewLine}{Exception}",
"fileSizeLimitBytes": 104857600,
"rollingInterval": "Day",
"rollOnFileSizeLimit": true
}
}
]
},
"RebusRabbitMqOptions": {
"ConnectionString": ""
},
"WTelegramClientOptions": {
"ApiId": "",
"ApiHash": "",
"PhoneNumber": ""
}
}