Flynk.Net.Services.Transmit.Grpc.Client 1.1.0

Prefix Reserved
There is a newer version of this package available.
See the version list below for details.
dotnet add package Flynk.Net.Services.Transmit.Grpc.Client --version 1.1.0
                    
NuGet\Install-Package Flynk.Net.Services.Transmit.Grpc.Client -Version 1.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Flynk.Net.Services.Transmit.Grpc.Client" Version="1.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Flynk.Net.Services.Transmit.Grpc.Client" Version="1.1.0" />
                    
Directory.Packages.props
<PackageReference Include="Flynk.Net.Services.Transmit.Grpc.Client" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Flynk.Net.Services.Transmit.Grpc.Client --version 1.1.0
                    
#r "nuget: Flynk.Net.Services.Transmit.Grpc.Client, 1.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Flynk.Net.Services.Transmit.Grpc.Client@1.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Flynk.Net.Services.Transmit.Grpc.Client&version=1.1.0
                    
Install as a Cake Addin
#tool nuget:?package=Flynk.Net.Services.Transmit.Grpc.Client&version=1.1.0
                    
Install as a Cake Tool

Flynk.Net.Services.Transmit.Grpc

A high-performance gRPC-based messaging library that provides both low-level transmission capabilities and a high-level RPC client with routing, typed serialization, and service discovery features.

Table of Contents

Overview

This library provides two main client implementations:

  1. TransmissionClient: Low-level gRPC client for raw message transmission
  2. RpcClient: High-level client with routing, typed serialization/deserialization, and automatic service discovery

Key Features

  • ✅ Bidirectional streaming communication
  • ✅ Type-safe request/response handling
  • ✅ Automatic serialization/deserialization (JSON)
  • ✅ Route-based message handling
  • ✅ Built-in service discovery via Info endpoint
  • ✅ Echo endpoint for testing connectivity
  • ✅ Extensible handler system
  • ✅ Integration with Flynk.Common.Utils.Response

Installation

Prerequisites

  • .NET 8.0 or later
  • NuGet packages:
    • Flynk.Common.Utils (for Response types)
    • Google.Protobuf
    • Grpc.AspNetCore
    • Grpc.Net.Client
    • System.Text.Json

Package Installation

dotnet add package Flynk.Net.Services.Transmit.Grpc

Architecture

┌─────────────────┐         ┌─────────────────┐
│   RpcClient     │────────▶│TransmissionClient│
└─────────────────┘         └─────────────────┘
        │                            │
        │                            │
        ▼                            ▼
┌─────────────────┐         ┌─────────────────┐
│ Route Handlers  │         │   gRPC Proto    │
└─────────────────┘         └─────────────────┘

Quick Start

Basic RpcClient Usage

using Flynk.Net.Services.Transmit.Grpc.Client;
using Microsoft.Extensions.Logging;

// Create logger
var logger = new LoggerFactory().CreateLogger<RpcClient>();

// Initialize client
using var client = new RpcClient(
    address: "http://localhost:5000",
    clientId: "my-service",
    logger: logger
);

// Register a route handler
client.RegisterRoute<UserRequest, UserResponse>("user.create", 
    async (request, metadata) =>
    {
        // Process the request
        var user = new UserResponse 
        { 
            Id = Guid.NewGuid().ToString(),
            Name = request.Name 
        };
        
        return Response<UserResponse>.OK(user);
    });

// Start listening for incoming messages
client.StartListening();

// Send a typed request to another service
var response = await client.SendAsync<UserRequest, UserResponse>(
    route: "user.create",
    request: new UserRequest { Name = "John Doe" },
    recipientId: "target-service"
);

if (response.IsOK)
{
    Console.WriteLine($"User created: {response.Payload.Id}");
}

TransmissionClient

The TransmissionClient provides low-level message transmission capabilities.

Basic Usage

using var client = new TransmissionClient("my-service-001", "http://localhost:5000", logger);

// Send raw bytes
byte[] data = Encoding.UTF8.GetBytes("Hello World");
var response = await client.SendAsync(
    messageType: "greeting",
    payload: data,
    recipientId: "target"
);

// Echo test (returns the same payload)
var echoResponse = await client.Echo(
    messageType: "test",
    payload: data
);

// Receive messages (streaming)
client.StartReceiving(
    recipientId: "my-service",
    onMessageReceived: (msg) => 
    {
        Console.WriteLine($"Received: {msg.MessageType}");
    },
    messageTypes: new List<string> { "greeting", "notification" }
);

Echo Method

The Echo method is useful for testing connectivity and payload integrity:

var testData = Encoding.UTF8.GetBytes("Test message");
var echoResponse = await client.Echo("test.echo", testData);

// Verify the payload is identical
Assert.Equal(testData, echoResponse.Payload.ToByteArray());

RpcClient

The RpcClient wraps TransmissionClient and provides high-level RPC functionality.

Route Registration

// Method 1: Inline handler with lambda
client.RegisterRoute<CreateOrderRequest, CreateOrderResponse>("order.create",
    async (request, metadata) =>
    {
        var order = await ProcessOrder(request);
        return Response<CreateOrderResponse>.OK(order);
    });

// Method 2: Custom handler class
public class OrderHandler : TypedRouteHandler<CreateOrderRequest, CreateOrderResponse>
{
    protected override async Task<Response<CreateOrderResponse>> HandleTypedAsync(
        CreateOrderRequest request, 
        Dictionary<string, string> metadata)
    {
        // Custom processing logic
        var order = await ProcessOrder(request);
        return Response<CreateOrderResponse>.OK(order);
    }
}

client.RegisterHandler("order.create", new OrderHandler());

Service Discovery with Info

The RpcClient automatically registers a system.info route that provides metadata about the running service:

// Query another service's info
var infoResponse = await client.Info("target-service");

if (infoResponse.IsOK)
{
    foreach (var kvp in infoResponse.Payload)
    {
        Console.WriteLine($"{kvp.Key}: {kvp.Value}");
    }
}

// Default info includes:
// - ClientId
// - MachineName
// - OSVersion
// - ProcessId
// - CLRVersion
// - ApplicationName
// - StartTime
// - WorkingDirectory
// - RegisteredRoutes

Customizing Service Info

Override the GetApplicationInfo method to add custom metadata:

public class MyRpcClient : RpcClient
{
    public MyRpcClient(string address, string clientId, ILogger<RpcClient> logger)
        : base(address, clientId, logger)
    {
    }

    protected override async Task<Dictionary<string, string>> GetApplicationInfo(
        Dictionary<string, string> request,
        Dictionary<string, string> metadata)
    {
        var info = await base.GetApplicationInfo(request, metadata);
        
        // Add custom metadata
        info["Version"] = "1.0.0";
        info["Environment"] = "Production";
        info["DatabaseConnection"] = "Connected";
        info["CustomMetric"] = GetCustomMetric();
        
        return info;
    }
}

Advanced Usage

Error Handling

client.RegisterRoute<ProcessRequest, ProcessResponse>("process.data",
    async (request, metadata) =>
    {
        try
        {
            // Validate request
            if (string.IsNullOrEmpty(request.Data))
            {
                return Response<ProcessResponse>.FAILED("Data is required");
            }
            
            var result = await ProcessData(request.Data);
            return Response<ProcessResponse>.OK(result);
        }
        catch (ValidationException ex)
        {
            return Response<ProcessResponse>.FAILED($"Validation error: {ex.Message}");
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Processing failed");
            return Response<ProcessResponse>.FAILED("Internal server error");
        }
    });

Custom JSON Serialization Options

var jsonOptions = new JsonSerializerOptions
{
    PropertyNameCaseInsensitive = true,
    PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
    WriteIndented = true,
    Converters = { new JsonStringEnumConverter() }
};

var client = new RpcClient(
    address: "http://localhost:5000",
    clientId: "my-service",
    logger: logger,
    jsonOptions: jsonOptions
);

Bidirectional Communication

// Service A
var serviceA = new RpcClient("http://localhost:5000", "service-a", logger);
serviceA.RegisterRoute<PingRequest, PingResponse>("ping",
    async (request, metadata) =>
    {
        // Send a pong back to the sender
        var pongResponse = await serviceA.SendAsync<PongRequest, PongResponse>(
            "pong",
            new PongRequest { Message = "Pong from A" },
            recipientId: metadata["sender"]
        );
        
        return Response<PingResponse>.OK(new PingResponse { Message = "Ping received" });
    });
serviceA.StartListening();

// Service B
var serviceB = new RpcClient("http://localhost:5000", "service-b", logger);
serviceB.RegisterRoute<PongRequest, PongResponse>("pong",
    async (request, metadata) =>
    {
        return Response<PongResponse>.OK(new PongResponse { Message = "Pong received" });
    });
serviceB.StartListening();

// Initiate communication
var pingResponse = await serviceB.SendAsync<PingRequest, PingResponse>(
    "ping",
    new PingRequest { Message = "Ping from B" },
    recipientId: "service-a"
);

Filtering Messages

// Only listen to specific routes
var routes = new List<string> { "order.create", "order.update", "order.delete" };
client.StartListening(routes);

// With TransmissionClient - filter by message types and metadata
transmissionClient.StartReceiving(
    recipientId: "my-service",
    onMessageReceived: HandleMessage,
    messageTypes: new List<string> { "order", "payment" },
    filters: new Dictionary<string, string> 
    { 
        { "priority", "high" },
        { "region", "us-west" }
    }
);

Testing

Unit Testing with Mock Handlers

[Fact]
public async Task OrderHandler_ShouldProcessOrderCorrectly()
{
    // Arrange
    var logger = new Mock<ILogger<RpcClient>>().Object;
    var client = new RpcClient("http://localhost:5000", "test", logger);
    
    var orderProcessed = false;
    client.RegisterRoute<CreateOrderRequest, CreateOrderResponse>("order.create",
        async (request, metadata) =>
        {
            orderProcessed = true;
            return Response<CreateOrderResponse>.OK(new CreateOrderResponse 
            { 
                OrderId = "123",
                Status = "Created"
            });
        });
    
    // Act
    var response = await client.SendAsync<CreateOrderRequest, CreateOrderResponse>(
        "order.create",
        new CreateOrderRequest { ProductId = "ABC", Quantity = 5 },
        recipientId: "test"
    );
    
    // Assert
    Assert.True(orderProcessed);
    Assert.True(response.IsOK);
    Assert.Equal("123", response.Payload.OrderId);
}

Integration Testing

public class TestRequest
{
    public string Name { get; set; } = string.Empty;
}

public class TestResponse
{
    public string Message { get; set; } = string.Empty;
}

[Fact]
public async Task Services_ShouldCommunicateSuccessfully()
{
    // Setup server
    var server = new RpcClient("http://localhost:5000", "server", logger);
    server.RegisterRoute<TestRequest, TestResponse>("test.route",
        async (request) =>
        {
            return Response<TestResponse>.OK(new TestResponse
            {
                Message = $"Processed: {request.Name}"
            });
        });
    server.StartListening();

    // Setup client
    var client = new RpcClient("http://localhost:5000", "client", logger);

    // Test communication
    var response = await client.RequestAsync<TestRequest, TestResponse>(
        "test.route",
        new TestRequest { Name = "test" },
        recipientId: "server"
    );

    Assert.True(response.IsOK);
    Assert.Equal("Processed: test", response.Payload.Message);

    // Cleanup
    server.StopListening();
    server.Dispose();
    client.Dispose();
}

Best Practices

1. Resource Management

Always dispose of clients properly:

using var client = new RpcClient(address, clientId, logger);
// ... use client
// Automatically disposed at end of scope

2. Route Naming Convention

Use hierarchical dot notation for routes:

// Good
"user.create"
"user.update"
"user.delete"
"order.process"
"payment.authorize"

// Avoid
"CreateUser"
"UPDATE_USER"
"delete-user"

3. Error Handling

Always handle potential failures:

var response = await client.SendAsync<Request, Response>(route, request, recipientId);

if (response.IsOK)
{
    // Handle success
    ProcessResponse(response.Payload);
}
else
{
    // Handle failure
    logger.LogError($"Request failed: {response.ErrorMessage}");
}

4. Logging

Use structured logging for better debugging:

logger.LogInformation("Sending request to {Route} for {RecipientId}", 
    route, recipientId);

5. Timeout Configuration

Configure appropriate timeouts for long-running operations:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
// Pass cts.Token to async operations

6. Service Discovery

Use the Info endpoint for health checks and service discovery:

public async Task<bool> IsServiceHealthy(string serviceId)
{
    try
    {
        var info = await client.Info(serviceId);
        return info.IsOK;
    }
    catch
    {
        return false;
    }
}

Troubleshooting

Common Issues

  1. Connection Refused

    • Verify the server is running
    • Check firewall settings
    • Ensure correct port number
  2. Serialization Errors

    • Verify JSON structure matches expected types
    • Check for null reference issues
    • Ensure all properties are serializable
  3. Route Not Found

    • Verify route is registered before listening starts
    • Check for typos in route names
    • Ensure recipient service is listening
  4. Memory Leaks

    • Always dispose clients when done
    • Stop receiving before disposal
    • Avoid capturing references in handlers

Contributing

Contributions are welcome! Please ensure:

  • All tests pass
  • Code follows existing style
  • New features include tests
  • Documentation is updated

License

This software is licensed under the Flynk Freeware License.

Support

For issues or questions, please contact the development team or create an issue in the repository.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.5.0 93 2/16/2026
1.4.0 87 2/15/2026
1.3.2 102 1/13/2026
1.2.1 297 12/18/2025
1.1.2 287 12/16/2025
1.1.1 285 12/16/2025
1.1.0 295 12/16/2025