Flynk.Net.Services.Transmit.Grpc.Client
1.1.0
Prefix Reserved
See the version list below for details.
dotnet add package Flynk.Net.Services.Transmit.Grpc.Client --version 1.1.0
NuGet\Install-Package Flynk.Net.Services.Transmit.Grpc.Client -Version 1.1.0
<PackageReference Include="Flynk.Net.Services.Transmit.Grpc.Client" Version="1.1.0" />
<PackageVersion Include="Flynk.Net.Services.Transmit.Grpc.Client" Version="1.1.0" />
<PackageReference Include="Flynk.Net.Services.Transmit.Grpc.Client" />
paket add Flynk.Net.Services.Transmit.Grpc.Client --version 1.1.0
#r "nuget: Flynk.Net.Services.Transmit.Grpc.Client, 1.1.0"
#:package Flynk.Net.Services.Transmit.Grpc.Client@1.1.0
#addin nuget:?package=Flynk.Net.Services.Transmit.Grpc.Client&version=1.1.0
#tool nuget:?package=Flynk.Net.Services.Transmit.Grpc.Client&version=1.1.0
Flynk.Net.Services.Transmit.Grpc
A high-performance gRPC-based messaging library that provides both low-level transmission capabilities and a high-level RPC client with routing, typed serialization, and service discovery features.
Table of Contents
- Overview
- Installation
- Architecture
- Quick Start
- TransmissionClient
- RpcClient
- Advanced Usage
- Testing
- Best Practices
Overview
This library provides two main client implementations:
- TransmissionClient: Low-level gRPC client for raw message transmission
- RpcClient: High-level client with routing, typed serialization/deserialization, and automatic service discovery
Key Features
- ✅ Bidirectional streaming communication
- ✅ Type-safe request/response handling
- ✅ Automatic serialization/deserialization (JSON)
- ✅ Route-based message handling
- ✅ Built-in service discovery via Info endpoint
- ✅ Echo endpoint for testing connectivity
- ✅ Extensible handler system
- ✅ Integration with Flynk.Common.Utils.Response
Installation
Prerequisites
- .NET 8.0 or later
- NuGet packages:
Flynk.Common.Utils(for Response types)Google.ProtobufGrpc.AspNetCoreGrpc.Net.ClientSystem.Text.Json
Package Installation
dotnet add package Flynk.Net.Services.Transmit.Grpc
Architecture
┌─────────────────┐ ┌─────────────────┐
│ RpcClient │────────▶│TransmissionClient│
└─────────────────┘ └─────────────────┘
│ │
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Route Handlers │ │ gRPC Proto │
└─────────────────┘ └─────────────────┘
Quick Start
Basic RpcClient Usage
using Flynk.Net.Services.Transmit.Grpc.Client;
using Microsoft.Extensions.Logging;
// Create logger
var logger = new LoggerFactory().CreateLogger<RpcClient>();
// Initialize client
using var client = new RpcClient(
address: "http://localhost:5000",
clientId: "my-service",
logger: logger
);
// Register a route handler
client.RegisterRoute<UserRequest, UserResponse>("user.create",
async (request, metadata) =>
{
// Process the request
var user = new UserResponse
{
Id = Guid.NewGuid().ToString(),
Name = request.Name
};
return Response<UserResponse>.OK(user);
});
// Start listening for incoming messages
client.StartListening();
// Send a typed request to another service
var response = await client.SendAsync<UserRequest, UserResponse>(
route: "user.create",
request: new UserRequest { Name = "John Doe" },
recipientId: "target-service"
);
if (response.IsOK)
{
Console.WriteLine($"User created: {response.Payload.Id}");
}
TransmissionClient
The TransmissionClient provides low-level message transmission capabilities.
Basic Usage
using var client = new TransmissionClient("my-service-001", "http://localhost:5000", logger);
// Send raw bytes
byte[] data = Encoding.UTF8.GetBytes("Hello World");
var response = await client.SendAsync(
messageType: "greeting",
payload: data,
recipientId: "target"
);
// Echo test (returns the same payload)
var echoResponse = await client.Echo(
messageType: "test",
payload: data
);
// Receive messages (streaming)
client.StartReceiving(
recipientId: "my-service",
onMessageReceived: (msg) =>
{
Console.WriteLine($"Received: {msg.MessageType}");
},
messageTypes: new List<string> { "greeting", "notification" }
);
Echo Method
The Echo method is useful for testing connectivity and payload integrity:
var testData = Encoding.UTF8.GetBytes("Test message");
var echoResponse = await client.Echo("test.echo", testData);
// Verify the payload is identical
Assert.Equal(testData, echoResponse.Payload.ToByteArray());
RpcClient
The RpcClient wraps TransmissionClient and provides high-level RPC functionality.
Route Registration
// Method 1: Inline handler with lambda
client.RegisterRoute<CreateOrderRequest, CreateOrderResponse>("order.create",
async (request, metadata) =>
{
var order = await ProcessOrder(request);
return Response<CreateOrderResponse>.OK(order);
});
// Method 2: Custom handler class
public class OrderHandler : TypedRouteHandler<CreateOrderRequest, CreateOrderResponse>
{
protected override async Task<Response<CreateOrderResponse>> HandleTypedAsync(
CreateOrderRequest request,
Dictionary<string, string> metadata)
{
// Custom processing logic
var order = await ProcessOrder(request);
return Response<CreateOrderResponse>.OK(order);
}
}
client.RegisterHandler("order.create", new OrderHandler());
Service Discovery with Info
The RpcClient automatically registers a system.info route that provides metadata about the running service:
// Query another service's info
var infoResponse = await client.Info("target-service");
if (infoResponse.IsOK)
{
foreach (var kvp in infoResponse.Payload)
{
Console.WriteLine($"{kvp.Key}: {kvp.Value}");
}
}
// Default info includes:
// - ClientId
// - MachineName
// - OSVersion
// - ProcessId
// - CLRVersion
// - ApplicationName
// - StartTime
// - WorkingDirectory
// - RegisteredRoutes
Customizing Service Info
Override the GetApplicationInfo method to add custom metadata:
public class MyRpcClient : RpcClient
{
public MyRpcClient(string address, string clientId, ILogger<RpcClient> logger)
: base(address, clientId, logger)
{
}
protected override async Task<Dictionary<string, string>> GetApplicationInfo(
Dictionary<string, string> request,
Dictionary<string, string> metadata)
{
var info = await base.GetApplicationInfo(request, metadata);
// Add custom metadata
info["Version"] = "1.0.0";
info["Environment"] = "Production";
info["DatabaseConnection"] = "Connected";
info["CustomMetric"] = GetCustomMetric();
return info;
}
}
Advanced Usage
Error Handling
client.RegisterRoute<ProcessRequest, ProcessResponse>("process.data",
async (request, metadata) =>
{
try
{
// Validate request
if (string.IsNullOrEmpty(request.Data))
{
return Response<ProcessResponse>.FAILED("Data is required");
}
var result = await ProcessData(request.Data);
return Response<ProcessResponse>.OK(result);
}
catch (ValidationException ex)
{
return Response<ProcessResponse>.FAILED($"Validation error: {ex.Message}");
}
catch (Exception ex)
{
logger.LogError(ex, "Processing failed");
return Response<ProcessResponse>.FAILED("Internal server error");
}
});
Custom JSON Serialization Options
var jsonOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = true,
Converters = { new JsonStringEnumConverter() }
};
var client = new RpcClient(
address: "http://localhost:5000",
clientId: "my-service",
logger: logger,
jsonOptions: jsonOptions
);
Bidirectional Communication
// Service A
var serviceA = new RpcClient("http://localhost:5000", "service-a", logger);
serviceA.RegisterRoute<PingRequest, PingResponse>("ping",
async (request, metadata) =>
{
// Send a pong back to the sender
var pongResponse = await serviceA.SendAsync<PongRequest, PongResponse>(
"pong",
new PongRequest { Message = "Pong from A" },
recipientId: metadata["sender"]
);
return Response<PingResponse>.OK(new PingResponse { Message = "Ping received" });
});
serviceA.StartListening();
// Service B
var serviceB = new RpcClient("http://localhost:5000", "service-b", logger);
serviceB.RegisterRoute<PongRequest, PongResponse>("pong",
async (request, metadata) =>
{
return Response<PongResponse>.OK(new PongResponse { Message = "Pong received" });
});
serviceB.StartListening();
// Initiate communication
var pingResponse = await serviceB.SendAsync<PingRequest, PingResponse>(
"ping",
new PingRequest { Message = "Ping from B" },
recipientId: "service-a"
);
Filtering Messages
// Only listen to specific routes
var routes = new List<string> { "order.create", "order.update", "order.delete" };
client.StartListening(routes);
// With TransmissionClient - filter by message types and metadata
transmissionClient.StartReceiving(
recipientId: "my-service",
onMessageReceived: HandleMessage,
messageTypes: new List<string> { "order", "payment" },
filters: new Dictionary<string, string>
{
{ "priority", "high" },
{ "region", "us-west" }
}
);
Testing
Unit Testing with Mock Handlers
[Fact]
public async Task OrderHandler_ShouldProcessOrderCorrectly()
{
// Arrange
var logger = new Mock<ILogger<RpcClient>>().Object;
var client = new RpcClient("http://localhost:5000", "test", logger);
var orderProcessed = false;
client.RegisterRoute<CreateOrderRequest, CreateOrderResponse>("order.create",
async (request, metadata) =>
{
orderProcessed = true;
return Response<CreateOrderResponse>.OK(new CreateOrderResponse
{
OrderId = "123",
Status = "Created"
});
});
// Act
var response = await client.SendAsync<CreateOrderRequest, CreateOrderResponse>(
"order.create",
new CreateOrderRequest { ProductId = "ABC", Quantity = 5 },
recipientId: "test"
);
// Assert
Assert.True(orderProcessed);
Assert.True(response.IsOK);
Assert.Equal("123", response.Payload.OrderId);
}
Integration Testing
public class TestRequest
{
public string Name { get; set; } = string.Empty;
}
public class TestResponse
{
public string Message { get; set; } = string.Empty;
}
[Fact]
public async Task Services_ShouldCommunicateSuccessfully()
{
// Setup server
var server = new RpcClient("http://localhost:5000", "server", logger);
server.RegisterRoute<TestRequest, TestResponse>("test.route",
async (request) =>
{
return Response<TestResponse>.OK(new TestResponse
{
Message = $"Processed: {request.Name}"
});
});
server.StartListening();
// Setup client
var client = new RpcClient("http://localhost:5000", "client", logger);
// Test communication
var response = await client.RequestAsync<TestRequest, TestResponse>(
"test.route",
new TestRequest { Name = "test" },
recipientId: "server"
);
Assert.True(response.IsOK);
Assert.Equal("Processed: test", response.Payload.Message);
// Cleanup
server.StopListening();
server.Dispose();
client.Dispose();
}
Best Practices
1. Resource Management
Always dispose of clients properly:
using var client = new RpcClient(address, clientId, logger);
// ... use client
// Automatically disposed at end of scope
2. Route Naming Convention
Use hierarchical dot notation for routes:
// Good
"user.create"
"user.update"
"user.delete"
"order.process"
"payment.authorize"
// Avoid
"CreateUser"
"UPDATE_USER"
"delete-user"
3. Error Handling
Always handle potential failures:
var response = await client.SendAsync<Request, Response>(route, request, recipientId);
if (response.IsOK)
{
// Handle success
ProcessResponse(response.Payload);
}
else
{
// Handle failure
logger.LogError($"Request failed: {response.ErrorMessage}");
}
4. Logging
Use structured logging for better debugging:
logger.LogInformation("Sending request to {Route} for {RecipientId}",
route, recipientId);
5. Timeout Configuration
Configure appropriate timeouts for long-running operations:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
// Pass cts.Token to async operations
6. Service Discovery
Use the Info endpoint for health checks and service discovery:
public async Task<bool> IsServiceHealthy(string serviceId)
{
try
{
var info = await client.Info(serviceId);
return info.IsOK;
}
catch
{
return false;
}
}
Troubleshooting
Common Issues
Connection Refused
- Verify the server is running
- Check firewall settings
- Ensure correct port number
Serialization Errors
- Verify JSON structure matches expected types
- Check for null reference issues
- Ensure all properties are serializable
Route Not Found
- Verify route is registered before listening starts
- Check for typos in route names
- Ensure recipient service is listening
Memory Leaks
- Always dispose clients when done
- Stop receiving before disposal
- Avoid capturing references in handlers
Contributing
Contributions are welcome! Please ensure:
- All tests pass
- Code follows existing style
- New features include tests
- Documentation is updated
License
This software is licensed under the Flynk Freeware License.
Support
For issues or questions, please contact the development team or create an issue in the repository.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Flynk.Common.Utils (>= 6.2.2)
- Google.Protobuf (>= 3.32.1)
- Grpc.Net.Client (>= 2.71.0)
- Microsoft.Extensions.Logging (>= 8.0.1)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.2)
- MongoDB.Bson (>= 3.5.0)
- System.Text.Json (>= 8.0.5)
- System.Threading.Channels (>= 8.0.0)
-
net9.0
- Flynk.Common.Utils (>= 6.2.2)
- Google.Protobuf (>= 3.32.1)
- Grpc.Net.Client (>= 2.71.0)
- Microsoft.Extensions.Logging (>= 9.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.0)
- MongoDB.Bson (>= 3.5.0)
- System.Text.Json (>= 9.0.0)
- System.Threading.Channels (>= 9.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.