add: webhook

This commit is contained in:
h z
2025-03-10 09:39:59 +00:00
parent d106831879
commit 5718d41102
12 changed files with 289 additions and 6 deletions

View File

@@ -3,7 +3,17 @@
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<Nullable>disable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="7.1.1" />
</ItemGroup>
<ItemGroup>
<Reference Include="Microsoft.Extensions.Options">
<HintPath>..\..\..\..\.nuget\packages\microsoft.extensions.options\9.0.2\lib\net9.0\Microsoft.Extensions.Options.dll</HintPath>
</Reference>
</ItemGroup>
</Project>

View File

@@ -1,5 +0,0 @@
namespace Alchegos.Core;
public class Class1
{
}

View File

@@ -0,0 +1,6 @@
namespace Alchegos.Core.Attributes;
[AttributeUsage(AttributeTargets.Class)]
public class AutoRegister : Attribute
{
}

View File

@@ -0,0 +1,35 @@
using System.Threading.Channels;
using RabbitMQ.Client;
namespace Alchegos.Core.DataStructures;
public class Exchange
{
public string Name { get; set; }
public string ExchangeType { get; set; }
public bool Durable { get; set; }
public bool AutoDelete { get; set; }
public IDictionary<string, object> Arguments { get; set; }
public HashSet<MessageQueue> Queues { get; set; } = new();
public async Task CreateAsync(IChannel channel)
{
await channel.ExchangeDeclareAsync(Name, ExchangeType, Durable, AutoDelete, Arguments);
foreach (MessageQueue queue in Queues)
{
await channel.QueueDeclareAsync(
queue: queue.Name,
durable: queue.Durable,
exclusive: queue.Exclusive,
autoDelete: queue.AutoDelete,
arguments: queue.Arguments
);
await channel.QueueBindAsync(
queue: queue.Name,
exchange: Name,
routingKey: queue.Name,
arguments: queue.Arguments
);
}
}
}

View File

@@ -0,0 +1,10 @@
namespace Alchegos.Core.DataStructures;
public class MessageQueue
{
public string Name { get; set; }
public bool Exclusive { get; set; }
public bool AutoDelete { get; set; }
public bool Durable { get; set; }
public IDictionary<string, object> Arguments { get; set; }
}

42
src/GlobalRegistry.cs Normal file
View File

@@ -0,0 +1,42 @@
using System.Reflection;
using Alchegos.Core.Attributes;
using Alchegos.Core.DataStructures;
namespace Alchegos.Core;
public class GlobalRegistry
{
private static readonly Lock _lock = new Lock();
private GlobalRegistry()
{
}
private static readonly Lazy<GlobalRegistry> BackingInstance = new(() => new GlobalRegistry());
public static GlobalRegistry Instance {
get
{
lock (_lock)
{
return BackingInstance.Value;
}
}
}
public void Start()
{
Assembly assembly = Assembly.GetExecutingAssembly();
IEnumerable<Type> types = assembly
.GetTypes()
.Where(t => t.IsClass && t.GetCustomAttributes(typeof(AutoRegister), false).Length > 0);
foreach (Type t in types)
{
MethodInfo registerMethod = t.GetMethod("Register", BindingFlags.Static | BindingFlags.Public);
registerMethod?.Invoke(null, null);
}
}
public HashSet<Exchange> Exchanges { get; set; } = new();
}

View File

@@ -0,0 +1,64 @@
using Alchegos.Core.Attributes;
using Alchegos.Core.DataStructures;
using RabbitMQ.Client;
namespace Alchegos.Core.Registers;
[AutoRegister]
public static class ExchangeRegister
{
private static readonly HashSet<string> WebhookEvents = [
"create",
"delete",
"fork",
"push",
"issue_assign",
"issue_label",
"issue_milestone",
"issue_comment",
"pull_request",
"pull_request_assign",
"pull_request_label",
"pull_request_milestone",
"pull_request_comment",
"pull_request_review_approved",
"pull_request_review_rejected",
"pull_request_review_comment",
"pull_request_sync",
"pull_request_review_request",
"wiki",
"repository",
"release",
"package",
"status",
"schedule"
];
public static void Register()
{
Exchange giteaExchange = new Exchange
{
Name = "gitea.webhook.exchange",
ExchangeType = ExchangeType.Topic,
AutoDelete = false,
Durable = true,
Queues = new()
};
foreach (string webhookEvent in WebhookEvents)
{
giteaExchange.Queues.Add(new MessageQueue
{
Name = webhookEvent,
Exclusive = false,
AutoDelete = false,
Durable = true
});
}
GlobalRegistry.Instance.Exchanges =
[
giteaExchange
];
}
}

View File

@@ -0,0 +1,6 @@
namespace Alchegos.Core.Services.RabbitMQ;
public interface IRabbitPublisher
{
Task PublishAsync(string exchange, string routingKey, string message);
}

View File

@@ -0,0 +1,8 @@
using RabbitMQ.Client;
namespace Alchegos.Core.Services.RabbitMQ;
public interface IRabbitService
{
Task<IChannel> GetChannelAsync();
}

View File

@@ -0,0 +1,10 @@
namespace Alchegos.Core.Services.RabbitMQ;
public class RabbitConnectionOptions
{
public string HostName { get; set; } = "localhost";
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public int Port { get; set; } = 5672;
}

View File

@@ -0,0 +1,28 @@
using System.Text;
using RabbitMQ.Client;
namespace Alchegos.Core.Services.RabbitMQ;
public class RabbitPublisher(IRabbitService service) : IRabbitPublisher
{
private IRabbitService RabbitService { get; } = service;
private readonly SemaphoreSlim _lock = new(1, 1);
public async Task PublishAsync(string exchange, string routingKey, string message)
{
ReadOnlyMemory<byte> body = Encoding.UTF8.GetBytes(message).AsMemory();
await _lock.WaitAsync();
try
{
IChannel channel = await RabbitService.GetChannelAsync();
await channel.BasicPublishAsync(
exchange: exchange,
routingKey: routingKey,
body: body
);
}
finally
{
_lock.Release();
}
}
}

View File

@@ -0,0 +1,69 @@
using Alchegos.Core.DataStructures;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace Alchegos.Core.Services.RabbitMQ;
public class RabbitService : IAsyncDisposable, IRabbitService
{
private readonly SemaphoreSlim _initLock = new (1, 1);
private Lazy<Task> LazyInit { get; init; }
public async Task EnsureInitializedAsync()
{
if (Connection is not null)
return;
await _initLock.WaitAsync();
try
{
if (Connection is null)
await LazyInit.Value;
}
finally
{
_initLock.Release();
}
}
private IConnection Connection { get; set; }
public async Task<IChannel> GetChannelAsync()
{
await EnsureInitializedAsync();
return await Connection.CreateChannelAsync();
}
public RabbitService(IOptions<RabbitConnectionOptions> options)
{
ConnectionFactory = new()
{
HostName = options.Value.HostName,
Port = options.Value.Port,
UserName = options.Value.UserName,
Password = options.Value.Password,
};
LazyInit = new (InitAsync);
}
private ConnectionFactory ConnectionFactory { get; set; }
private async Task InitAsync()
{
if (Connection is null)
{
Connection = await ConnectionFactory.CreateConnectionAsync();
IChannel channel = await Connection.CreateChannelAsync();
foreach (Exchange exchange in GlobalRegistry.Instance.Exchanges)
await exchange.CreateAsync(channel);
}
}
public async ValueTask DisposeAsync()
{
if (Connection is not null)
{
await Connection.CloseAsync();
await Connection.DisposeAsync();
Connection = null;
}
}
}