Add message deduplication and versioning for text match notifications

This update implements a comprehensive solution to prevent duplicate notifications
when Telegram messages are edited, while maintaining a full history of changes.

Features:
- New TextMatch entity to store match history with versioning
- Database migration for TextMatches table with proper indexes
- TextMatchRepository for managing match records
- TextSubscriptionUpdated event for message update notifications
- Enhanced MessageReceivedHandler with deduplication logic:
  * First match creates version 1 and publishes TextSubscriptionMatched
  * Subsequent updates create new versions and publish TextSubscriptionUpdated
  * Skips notifications if message text hasn't changed

Technical details:
- MessageId changed from int to long to match Telegram API types
- Proper indexes on (MessageId, SubscriptionId) and UserId
- Full audit trail of message edits in database

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
ruberoid 2025-10-14 13:22:58 +04:00
parent 68cd5a0b1a
commit 78d1099bfc
11 changed files with 642 additions and 21 deletions

View File

@ -0,0 +1,24 @@
namespace Nocr.TextMatcher.AppServices.TextMatches.Repositories;
public interface ITextMatchRepository
{
/// <summary>
/// Gets the latest version of a match for a specific message and subscription
/// </summary>
Task<TextMatch?> GetLatestByMessageAndSubscription(long messageId, long subscriptionId, CancellationToken cancellationToken = default);
/// <summary>
/// Gets all matches for a specific message across all subscriptions
/// </summary>
Task<List<TextMatch>> GetByMessageId(long messageId, CancellationToken cancellationToken = default);
/// <summary>
/// Creates a new match record
/// </summary>
Task<TextMatch> Create(TextMatch match, CancellationToken cancellationToken = default);
/// <summary>
/// Gets the current version number for a message-subscription pair (0 if not exists)
/// </summary>
Task<int> GetCurrentVersion(long messageId, long subscriptionId, CancellationToken cancellationToken = default);
}

View File

@ -0,0 +1,121 @@
namespace Nocr.TextMatcher.AppServices.TextMatches;
/// <summary>
/// Represents a historical record of a text match between a message and a subscription.
/// Each update to the same message creates a new version.
/// </summary>
public sealed class TextMatch
{
public long Id { get; private set; }
/// <summary>
/// Telegram message ID
/// </summary>
public long MessageId { get; private set; }
/// <summary>
/// Text subscription that matched
/// </summary>
public long SubscriptionId { get; private set; }
/// <summary>
/// User ID who owns the subscription
/// </summary>
public long UserId { get; private set; }
/// <summary>
/// Telegram chat username where the message was posted
/// </summary>
public string ChatUsername { get; private set; }
/// <summary>
/// Message text at the time of this version
/// </summary>
public string MessageText { get; private set; }
/// <summary>
/// Author of the message
/// </summary>
public string From { get; private set; }
/// <summary>
/// Version number (1 for first match, increments with each update)
/// </summary>
public int Version { get; private set; }
/// <summary>
/// When the original message was posted in Telegram
/// </summary>
public DateTimeOffset MessageOccuredDateTime { get; private set; }
/// <summary>
/// When this match/version was created in our system
/// </summary>
public DateTimeOffset CreatedDateTime { get; private set; }
private TextMatch(
long messageId,
long subscriptionId,
long userId,
string chatUsername,
string messageText,
string from,
int version,
DateTimeOffset messageOccuredDateTime,
DateTimeOffset createdDateTime)
{
MessageId = messageId;
SubscriptionId = subscriptionId;
UserId = userId;
ChatUsername = chatUsername;
MessageText = messageText;
From = from;
Version = version;
MessageOccuredDateTime = messageOccuredDateTime;
CreatedDateTime = createdDateTime;
}
public static TextMatch Create(
long messageId,
long subscriptionId,
long userId,
string chatUsername,
string messageText,
string from,
int version,
DateTimeOffset messageOccuredDateTime,
DateTimeOffset createdDateTime)
{
if (messageId <= 0)
throw new ArgumentException("Message ID must be greater than 0", nameof(messageId));
if (subscriptionId <= 0)
throw new ArgumentException("Subscription ID must be greater than 0", nameof(subscriptionId));
if (userId <= 0)
throw new ArgumentException("User ID must be greater than 0", nameof(userId));
if (string.IsNullOrWhiteSpace(chatUsername))
throw new ArgumentException("Chat username cannot be empty", nameof(chatUsername));
if (string.IsNullOrWhiteSpace(messageText))
throw new ArgumentException("Message text cannot be empty", nameof(messageText));
if (string.IsNullOrWhiteSpace(from))
throw new ArgumentException("From cannot be empty", nameof(from));
if (version <= 0)
throw new ArgumentException("Version must be greater than 0", nameof(version));
return new TextMatch(
messageId,
subscriptionId,
userId,
chatUsername,
messageText,
from,
version,
messageOccuredDateTime,
createdDateTime);
}
}

View File

@ -1,5 +1,7 @@
using Microsoft.Extensions.Logging;
using Nocr.TelegramListener.Async.Api.Contracts.Events;
using Nocr.TextMatcher.AppServices.TextMatches;
using Nocr.TextMatcher.AppServices.TextMatches.Repositories;
using Nocr.TextMatcher.AppServices.TextSubscriptions.Repositories;
using Nocr.TextMatcher.Async.Api.Contracts;
using Nocr.TextMatcher.Core.Dates;
@ -12,17 +14,20 @@ public sealed class MessageReceivedHandler : IHandleMessages<MessageReceived>
{
private readonly ILogger<MessageReceivedHandler> _logger;
private readonly IBus _bus;
private readonly ITextSubscriptionRepository _textSubscriptionService;
private readonly ITextSubscriptionRepository _textSubscriptionRepository;
private readonly ITextMatchRepository _textMatchRepository;
private readonly ICurrentDateProvider _dateProvider;
public MessageReceivedHandler(ILogger<MessageReceivedHandler> logger,
IBus bus,
ITextSubscriptionRepository textSubscriptionRepository,
ITextMatchRepository textMatchRepository,
ICurrentDateProvider dateProvider)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
_textSubscriptionService = textSubscriptionRepository ?? throw new ArgumentNullException(nameof(textSubscriptionRepository));
_textSubscriptionRepository = textSubscriptionRepository ?? throw new ArgumentNullException(nameof(textSubscriptionRepository));
_textMatchRepository = textMatchRepository ?? throw new ArgumentNullException(nameof(textMatchRepository));
_dateProvider = dateProvider ?? throw new ArgumentNullException(nameof(dateProvider));
}
@ -30,28 +35,98 @@ public sealed class MessageReceivedHandler : IHandleMessages<MessageReceived>
{
_logger.LogInformation("Received message: {@Message}.", message);
var matches = await _textSubscriptionService.Get();
var subscriptions = await _textSubscriptionRepository.Get();
foreach (var match in matches.Where(x => x.Active))
foreach (var subscription in subscriptions.Where(x => x.Active))
{
if (match.IsMatches(message.ChatUsername, message.Text))
if (subscription.IsMatches(message.ChatUsername, message.Text))
{
_logger.LogInformation("Message {@Message} matched {@Match}.", message, match);
var @event = new TextSubscriptionMatched
_logger.LogInformation("Message {@Message} matched subscription {@Subscription}.", message, subscription);
// Check if we already have a match for this message and subscription
var existingMatch = await _textMatchRepository.GetLatestByMessageAndSubscription(
message.MessageId,
subscription.Id);
if (existingMatch == null)
{
SubscriptionId = match.Id,
SubscriptionUserId = match.UserId,
ChatUsername = match.ChatUsername,
MessageId = message.MessageId,
Rule = match.Rule,
Template = match.Template,
Text = message.Text,
From = message.From,
OccuredDateTime = message.OccuredDateTime,
PublishedDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish(Constants.RoutingKeys.MatchedSubscriptions, @event);
// First time match - create version 1 and publish TextSubscriptionMatched
var textMatch = TextMatch.Create(
messageId: message.MessageId,
subscriptionId: subscription.Id,
userId: subscription.UserId,
chatUsername: message.ChatUsername,
messageText: message.Text,
from: message.From ?? "Unknown",
version: 1,
messageOccuredDateTime: message.OccuredDateTime,
createdDateTime: _dateProvider.UtcNow);
await _textMatchRepository.Create(textMatch);
var @event = new TextSubscriptionMatched
{
SubscriptionId = subscription.Id,
SubscriptionUserId = subscription.UserId,
ChatUsername = subscription.ChatUsername,
MessageId = message.MessageId,
Rule = subscription.Rule,
Template = subscription.Template,
Text = message.Text,
From = message.From,
OccuredDateTime = message.OccuredDateTime,
PublishedDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish(Constants.RoutingKeys.MatchedSubscriptions, @event);
_logger.LogInformation("Published TextSubscriptionMatched for message {MessageId}, subscription {SubscriptionId}, version 1.",
message.MessageId, subscription.Id);
}
else
{
// Message was updated - create new version and publish TextSubscriptionUpdated
// Only create new version if text actually changed
if (existingMatch.MessageText != message.Text)
{
var newVersion = existingMatch.Version + 1;
var textMatch = TextMatch.Create(
messageId: message.MessageId,
subscriptionId: subscription.Id,
userId: subscription.UserId,
chatUsername: message.ChatUsername,
messageText: message.Text,
from: message.From ?? "Unknown",
version: newVersion,
messageOccuredDateTime: message.OccuredDateTime,
createdDateTime: _dateProvider.UtcNow);
await _textMatchRepository.Create(textMatch);
var updateEvent = new TextSubscriptionUpdated
{
SubscriptionId = subscription.Id,
SubscriptionUserId = subscription.UserId,
ChatUsername = subscription.ChatUsername,
MessageId = message.MessageId,
Rule = subscription.Rule,
Template = subscription.Template,
Text = message.Text,
From = message.From,
Version = newVersion,
OccuredDateTime = message.OccuredDateTime,
PublishedDateTime = _dateProvider.UtcNow
};
await _bus.Advanced.Topics.Publish(Constants.RoutingKeys.MatchedSubscriptions, updateEvent);
_logger.LogInformation("Published TextSubscriptionUpdated for message {MessageId}, subscription {SubscriptionId}, version {Version}.",
message.MessageId, subscription.Id, newVersion);
}
else
{
_logger.LogDebug("Message {MessageId} text unchanged, skipping duplicate notification.", message.MessageId);
}
}
}
}
}

View File

@ -0,0 +1,66 @@
using Nocr.TextMatcher.Contracts;
namespace Nocr.TextMatcher.Async.Api.Contracts;
/// <summary>
/// Event published when a previously matched message is updated (edited in Telegram)
/// </summary>
public class TextSubscriptionUpdated : IEvent
{
public Guid Id => Guid.NewGuid();
/// <summary>
/// Идентификатор матча
/// </summary>
public long SubscriptionId { get; set; }
/// <summary>
/// Идентификатор владельца матча
/// </summary>
public long SubscriptionUserId { get; set; }
/// <summary>
/// Username чата
/// </summary>
public required string ChatUsername { get; set; }
/// <summary>
/// Идентификатор сообщения
/// </summary>
public long MessageId { get; set; }
/// <summary>
/// Правило совпадения
/// </summary>
public TextSubscriptionRule Rule { get; set; }
/// <summary>
/// Шаблон совпадения
/// </summary>
public required string Template { get; set; }
/// <summary>
/// Имя или username отправителя
/// </summary>
public string? From { get; set; }
/// <summary>
/// Обновленный текст сообщения
/// </summary>
public required string Text { get; set; }
/// <summary>
/// Номер версии сообщения (2, 3, 4, ...)
/// </summary>
public int Version { get; set; }
/// <summary>
/// Дата получения оригинального сообщения слушателем
/// </summary>
public DateTimeOffset OccuredDateTime { get; set; }
/// <summary>
/// Дата публикации события в очередь
/// </summary>
public DateTimeOffset PublishedDateTime { get; set; }
}

View File

@ -0,0 +1,117 @@
// <auto-generated />
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Nocr.TextMatcher.Persistence;
#nullable disable
namespace Nocr.TextMatcher.Migrator.Migrations
{
[DbContext(typeof(TextMatcherContext))]
[Migration("20251014085918_AddTextMatchesTable")]
partial class AddTextMatchesTable
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "8.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 64);
MySqlModelBuilderExtensions.AutoIncrementColumns(modelBuilder);
modelBuilder.Entity("Nocr.TextMatcher.AppServices.TextMatches.TextMatch", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
MySqlPropertyBuilderExtensions.UseMySqlIdentityColumn(b.Property<long>("Id"));
b.Property<string>("ChatUsername")
.IsRequired()
.HasMaxLength(1024)
.HasColumnType("varchar(1024)");
b.Property<DateTimeOffset>("CreatedDateTime")
.HasColumnType("datetime(6)");
b.Property<string>("From")
.IsRequired()
.HasMaxLength(512)
.HasColumnType("varchar(512)");
b.Property<int>("MessageId")
.HasColumnType("int");
b.Property<DateTimeOffset>("MessageOccuredDateTime")
.HasColumnType("datetime(6)");
b.Property<string>("MessageText")
.IsRequired()
.HasMaxLength(4096)
.HasColumnType("varchar(4096)");
b.Property<long>("SubscriptionId")
.HasColumnType("bigint");
b.Property<long>("UserId")
.HasColumnType("bigint");
b.Property<int>("Version")
.HasColumnType("int");
b.HasKey("Id");
b.HasIndex("UserId")
.HasDatabaseName("IX_TextMatches_UserId");
b.HasIndex("MessageId", "SubscriptionId")
.HasDatabaseName("IX_TextMatches_MessageId_SubscriptionId");
b.ToTable("TextMatches", (string)null);
});
modelBuilder.Entity("Nocr.TextMatcher.AppServices.TextSubscriptions.TextSubscription", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
MySqlPropertyBuilderExtensions.UseMySqlIdentityColumn(b.Property<long>("Id"));
b.Property<bool>("Active")
.HasColumnType("tinyint(1)");
b.Property<string>("ChatUsername")
.IsRequired()
.HasMaxLength(1024)
.HasColumnType("varchar(1024)");
b.Property<DateTimeOffset>("CreatedDateTime")
.HasColumnType("datetime(6)");
b.Property<int>("Rule")
.HasColumnType("int");
b.Property<string>("Template")
.IsRequired()
.HasMaxLength(1024)
.HasColumnType("varchar(1024)");
b.Property<long>("UserId")
.HasColumnType("bigint");
b.HasKey("Id");
b.ToTable("TextSubscriptions");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,58 @@
using System;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace Nocr.TextMatcher.Migrator.Migrations
{
/// <inheritdoc />
public partial class AddTextMatchesTable : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "TextMatches",
columns: table => new
{
Id = table.Column<long>(type: "bigint", nullable: false)
.Annotation("MySql:ValueGenerationStrategy", MySqlValueGenerationStrategy.IdentityColumn),
MessageId = table.Column<long>(type: "bigint", nullable: false),
SubscriptionId = table.Column<long>(type: "bigint", nullable: false),
UserId = table.Column<long>(type: "bigint", nullable: false),
ChatUsername = table.Column<string>(type: "varchar(1024)", maxLength: 1024, nullable: false)
.Annotation("MySql:CharSet", "utf8mb4"),
MessageText = table.Column<string>(type: "varchar(4096)", maxLength: 4096, nullable: false)
.Annotation("MySql:CharSet", "utf8mb4"),
From = table.Column<string>(type: "varchar(512)", maxLength: 512, nullable: false)
.Annotation("MySql:CharSet", "utf8mb4"),
Version = table.Column<int>(type: "int", nullable: false),
MessageOccuredDateTime = table.Column<DateTimeOffset>(type: "datetime(6)", nullable: false),
CreatedDateTime = table.Column<DateTimeOffset>(type: "datetime(6)", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_TextMatches", x => x.Id);
})
.Annotation("MySql:CharSet", "utf8mb4");
migrationBuilder.CreateIndex(
name: "IX_TextMatches_MessageId_SubscriptionId",
table: "TextMatches",
columns: new[] { "MessageId", "SubscriptionId" });
migrationBuilder.CreateIndex(
name: "IX_TextMatches_UserId",
table: "TextMatches",
column: "UserId");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "TextMatches");
}
}
}

View File

@ -22,6 +22,58 @@ namespace Nocr.TextMatcher.Migrator.Migrations
MySqlModelBuilderExtensions.AutoIncrementColumns(modelBuilder);
modelBuilder.Entity("Nocr.TextMatcher.AppServices.TextMatches.TextMatch", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
MySqlPropertyBuilderExtensions.UseMySqlIdentityColumn(b.Property<long>("Id"));
b.Property<string>("ChatUsername")
.IsRequired()
.HasMaxLength(1024)
.HasColumnType("varchar(1024)");
b.Property<DateTimeOffset>("CreatedDateTime")
.HasColumnType("datetime(6)");
b.Property<string>("From")
.IsRequired()
.HasMaxLength(512)
.HasColumnType("varchar(512)");
b.Property<int>("MessageId")
.HasColumnType("int");
b.Property<DateTimeOffset>("MessageOccuredDateTime")
.HasColumnType("datetime(6)");
b.Property<string>("MessageText")
.IsRequired()
.HasMaxLength(4096)
.HasColumnType("varchar(4096)");
b.Property<long>("SubscriptionId")
.HasColumnType("bigint");
b.Property<long>("UserId")
.HasColumnType("bigint");
b.Property<int>("Version")
.HasColumnType("int");
b.HasKey("Id");
b.HasIndex("UserId")
.HasDatabaseName("IX_TextMatches_UserId");
b.HasIndex("MessageId", "SubscriptionId")
.HasDatabaseName("IX_TextMatches_MessageId_SubscriptionId");
b.ToTable("TextMatches", (string)null);
});
modelBuilder.Entity("Nocr.TextMatcher.AppServices.TextSubscriptions.TextSubscription", b =>
{
b.Property<long>("Id")

View File

@ -1,7 +1,9 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Nocr.TextMatcher.AppServices.TextMatches.Repositories;
using Nocr.TextMatcher.AppServices.TextSubscriptions.Repositories;
using Nocr.TextMatcher.Persistence.TextMatches;
using Nocr.TextMatcher.Persistence.TextSubscriptions;
namespace Nocr.TextMatcher.Persistence;
@ -12,11 +14,12 @@ public static class ServiceCollectionExtensions
{
if (services == null)
throw new ArgumentNullException(nameof(services));
if (string.IsNullOrWhiteSpace(connectionString))
throw new ArgumentNullException(nameof(connectionString));
services.AddScoped<ITextSubscriptionRepository, TextSubscriptionRepository>();
services.AddScoped<ITextMatchRepository, TextMatchRepository>();
services.AddDbContext<TextMatcherContext>(
(ctx, context) =>

View File

@ -1,5 +1,7 @@
using Microsoft.EntityFrameworkCore;
using Nocr.TextMatcher.AppServices.TextMatches;
using Nocr.TextMatcher.AppServices.TextSubscriptions;
using Nocr.TextMatcher.Persistence.TextMatches;
using Nocr.TextMatcher.Persistence.TextSubscriptions;
namespace Nocr.TextMatcher.Persistence;
@ -7,6 +9,7 @@ namespace Nocr.TextMatcher.Persistence;
public class TextMatcherContext : DbContext
{
public DbSet<TextSubscription> TextSubscriptions { get; set; }
public DbSet<TextMatch> TextMatches { get; set; }
public TextMatcherContext(DbContextOptions options)
: base(options)
@ -16,6 +19,7 @@ public class TextMatcherContext : DbContext
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.ApplyConfiguration(new TextSubscriptionConfiguration());
modelBuilder.ApplyConfiguration(new TextMatchConfiguration());
base.OnModelCreating(modelBuilder);
}

View File

@ -0,0 +1,53 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using Nocr.TextMatcher.AppServices.TextMatches;
namespace Nocr.TextMatcher.Persistence.TextMatches;
public class TextMatchConfiguration : IEntityTypeConfiguration<TextMatch>
{
public void Configure(EntityTypeBuilder<TextMatch> builder)
{
builder.ToTable("TextMatches");
builder.HasKey(x => x.Id);
builder.Property(x => x.MessageId)
.IsRequired();
builder.Property(x => x.SubscriptionId)
.IsRequired();
builder.Property(x => x.UserId)
.IsRequired();
builder.Property(x => x.ChatUsername)
.IsRequired()
.HasMaxLength(1024);
builder.Property(x => x.MessageText)
.IsRequired()
.HasMaxLength(4096); // Telegram message max length
builder.Property(x => x.From)
.IsRequired()
.HasMaxLength(512);
builder.Property(x => x.Version)
.IsRequired();
builder.Property(x => x.MessageOccuredDateTime)
.IsRequired();
builder.Property(x => x.CreatedDateTime)
.IsRequired();
// Create index for fast lookups by MessageId and SubscriptionId
builder.HasIndex(x => new { x.MessageId, x.SubscriptionId })
.HasDatabaseName("IX_TextMatches_MessageId_SubscriptionId");
// Create index for user queries
builder.HasIndex(x => x.UserId)
.HasDatabaseName("IX_TextMatches_UserId");
}
}

View File

@ -0,0 +1,48 @@
using Microsoft.EntityFrameworkCore;
using Nocr.TextMatcher.AppServices.TextMatches;
using Nocr.TextMatcher.AppServices.TextMatches.Repositories;
namespace Nocr.TextMatcher.Persistence.TextMatches;
public class TextMatchRepository : ITextMatchRepository
{
private readonly TextMatcherContext _context;
public TextMatchRepository(TextMatcherContext context)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public async Task<TextMatch?> GetLatestByMessageAndSubscription(long messageId, long subscriptionId, CancellationToken cancellationToken = default)
{
return await _context.TextMatches
.Where(x => x.MessageId == messageId && x.SubscriptionId == subscriptionId)
.OrderByDescending(x => x.Version)
.FirstOrDefaultAsync(cancellationToken);
}
public async Task<List<TextMatch>> GetByMessageId(long messageId, CancellationToken cancellationToken = default)
{
return await _context.TextMatches
.Where(x => x.MessageId == messageId)
.OrderBy(x => x.SubscriptionId)
.ThenByDescending(x => x.Version)
.ToListAsync(cancellationToken);
}
public async Task<TextMatch> Create(TextMatch match, CancellationToken cancellationToken = default)
{
await _context.TextMatches.AddAsync(match, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
return match;
}
public async Task<int> GetCurrentVersion(long messageId, long subscriptionId, CancellationToken cancellationToken = default)
{
var maxVersion = await _context.TextMatches
.Where(x => x.MessageId == messageId && x.SubscriptionId == subscriptionId)
.MaxAsync(x => (int?)x.Version, cancellationToken);
return maxVersion ?? 0;
}
}