Skip to content

Commit

Permalink
MqttClientExtensions: Using ArrayPool makes PublishStringAsync reach …
Browse files Browse the repository at this point in the history
…0 allocations.
  • Loading branch information
xljiulang committed Nov 21, 2024
1 parent e2ee3f8 commit f1593ff
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions Source/MQTTnet/MqttClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public static Task<MqttClientPublishResult> PublishSequenceAsync(
ArgumentNullException.ThrowIfNull(mqttClient);
ArgumentNullException.ThrowIfNull(topic);

var applicationMessage = new MqttApplicationMessageBuilder().WithTopic(topic)
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithRetainFlag(retain)
.WithQualityOfServiceLevel(qualityOfServiceLevel)
Expand All @@ -66,18 +67,43 @@ public static Task<MqttClientPublishResult> PublishBinaryAsync(
return mqttClient.PublishSequenceAsync(topic, new ReadOnlySequence<byte>(payload), qualityOfServiceLevel, retain, cancellationToken);
}

public static Task<MqttClientPublishResult> PublishStringAsync(
public static async Task<MqttClientPublishResult> PublishStringAsync(
this IMqttClient mqttClient,
string topic,
string payload = null,
string payload = default,
MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce,
bool retain = false,
CancellationToken cancellationToken = default)
{
var payloadBuffer = payload == null ? ReadOnlyMemory<byte>.Empty : Encoding.UTF8.GetBytes(payload);
return mqttClient.PublishBinaryAsync(topic, payloadBuffer, qualityOfServiceLevel, retain, cancellationToken);
var builder = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithRetainFlag(retain)
.WithQualityOfServiceLevel(qualityOfServiceLevel);

byte[] buffer = null;
if (!string.IsNullOrEmpty(payload))
{
var byteCount = Encoding.UTF8.GetByteCount(payload);
buffer = ArrayPool<byte>.Shared.Rent(byteCount);
Encoding.UTF8.GetBytes(payload, buffer);
builder.WithPayload(buffer.AsMemory(0, byteCount));
}

try
{
var applicationMessage = builder.Build();
return await mqttClient.PublishAsync(applicationMessage, cancellationToken);
}
finally
{
if (buffer != null)
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}


public static Task ReconnectAsync(this IMqttClient client, CancellationToken cancellationToken = default)
{
if (client.Options == null)
Expand Down

0 comments on commit f1593ff

Please sign in to comment.