Verwenden von Async IO zur Verarbeitung von Observablen bei Verwendung von Reactive Extensions (Rx) - c #, asynchron, system.reactive

Wie kann ich Async-IO (z. B. WriteAsyn, SaveAsync) bei der Verarbeitung von Nachrichten aus einem beobachtbaren Stream verwenden?

Ich verwende derzeit Rx, um vom Client-Socket gelesene Nachrichten als Observable zu streamen. Eine vereinfachte Version, die Erweiterungsmethoden verwendet, wäre:

var messagesFromClient = socket.ReadChunksAsObservable()
.ScanChunksIntoFrames()
.MapFrameToString()
.DoLogString()
.FilterEmptyStrings()
.MapToMessageObject();

Jetzt habe ich Schwierigkeiten, diese Nachrichten in meinem Repository zu speichern, da mein Repository nur asynchrone, nicht blockierende TPL-Methoden aufweist, die Async-IO (z. await dbConnection.SaveAsync(...))

Nach dem, was ich gelesen und getestet habe, ist es nicht möglich, etwas zu tun wie:

messagesFromClient.Subscribe(async message=>{
await myRepo.SaveAsync();
});

Wo kann ich diese asynchronen IO-Operationen verwenden? Sollte ich diese als Nebenwirkungen behandeln? Irgendwelche Beispiele?

Antworten:

1 für die Antwort № 1

Sie können ein erstellen Observable von einem async Methode mit Observable.FromAsync, dann Flatmap messagesFromClient zu diesem Observablen:

messagesFromClient
.SelectMany(message => Observable.FromAsync(() => myRepo.SaveAsync(message)))
.Subscribe();

Verwandte Fragen