Analog der Warteschlange.Peek() für die BlockingCollection beim hören der Konsum von IEnumerable<T>
Ich bin mit Rohrleitungen Muster Implementierung zu entkoppeln Nachrichten, die Verbraucher von einem Hersteller zu vermeiden, slow-Verbraucher-Problem.
Im Falle einer Ausnahme auf eine message-Verarbeitung Bühne [1]
es verloren und nicht entsandt, um eine andere Dienst - /Schicht [2]
. Wie kann ich damit umgehen, wie Problem in [3]
also die Nachricht, nicht verloren gehen, und was Ihnen wichtig ist! Reihenfolge der Nachrichten wird nicht gemischt werden, so dass Ober-Dienst/Schicht werden Nachrichten in der Reihenfolge, wie Sie kamen. Ich habe eine Idee, die beinhaltet eine andere intermediate Queue
aber es scheint Komplex? Leider BlockingCollection<T>
nicht auszusetzen, Analog Queue.Peek()
Methode, so kann ich nur Lesen, nächsten verfügbaren Nachricht und im Falle der erfolgreichen Verarbeitung Dequeue()
private BlockingCollection<IMessage> messagesQueue;
//TPL Task does following:
//Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in
messagesQueue.GetConsumingEnumerable(cancellation))
{
const int maxRetries = 3;
int retriesCounter = 0;
bool isSent = false;
//On this point a message already is removed from messagesQueue
while (!isSent && retriesCounter++ <= maxRetries)
{
try
{
//[1] Preprocess a message
//[2] Dispatch to an other service/layer
clientProxyCallback.SendMessage(cachedMessage);
isSent = true;
}
catch(Exception exception)
{
//[3]
//logging
if (!isSent && retriesCounter < maxRetries)
{
Thread.Sleep(NSeconds);
}
}
if (!isSent && retriesCounter == maxRetries)
{
//just log, message is lost on this stage!
}
}
}
BEARBEITEN: habe Vergessen zu sagen, das ist IIS gehosteten WCF-Dienst die Nachrichten verteilt zurück zur Silverlight-client, WCF-Proxy via client-callback-Vertrag.
EDIT2: Unten ist, wie würde ich dies tun, mit Peek()
, Bin ich etwas fehlt?
bool successfullySent = true;
try
{
var item = queue.Peek();
PreProcessItem(item);
SendItem(item);
}
catch(Exception exception)
{
successfullySent = false;
}
finally
{
if (successfullySent)
{
//just remove already sent item from the queue
queue.Dequeue();
}
}
EDIT3: Sicherlich kann ich verwenden alten Stil Ansatz mit while-Schleife bool flag, Queue
und AutoResetEvent
, aber ich Frage mich, ob das selbe ist auch mit BlockingCollection
und GetConsumingEnumerable()
ich denke, Einrichtung wie Peek
wäre
sehr hilfreich bei Verwendung zusammen mit aufwändig Durchlaufen werden, da sonst alle Pipeline-pattern-Umsetzung Beispiele neue Sachen wie BlockingCollection
und GetConsumingEnumerable()
sieht nicht haltbar und ich haben gelangen Sie zurück zum alten Konzept.
- Erwarten Sie, dass die exception nicht geworfen werden, wenn Sie es erneut versuchen? Wenn ja, einfach eine while-Schleife, bis keine exception geworfen. Wenn nicht, müssen Sie entscheiden, was zu tun ist, wenn ein Element nicht verarbeitet werden kann. Vielleicht pass eine Fehlermeldung an die nächste Schicht?
- Dies ist der server, WCF-service, welches Versand-Nachricht an einen client-proxy kann es sein, dass, wenn Verbindung fehlgeschlagen, und in 5 Sekunden-client Verbindung zu einem Dienst und bereit zum empfangen von Nachrichten,
while
Schleife ist, was mache ich jetzt mit N Wiederholungen senden, muss ich erwähnen, es ist aber immer die Frage, welches Magische Zahl für reconectRetries und timeout zu wählen - Es klingt wie Sie sind auf dem Weg zu erfinden, message queues. Haben Sie sich überlegt mit so etwas wie MSMQ oder RabbitMQ?
- Dies ist eine schwere Technik, Im nicht sicher, dass dies der beste Weg zu gehen, für den WCF service?! aber muss zugeben es ist sehr robust und zuverlässig
- Und warum denkst du, dass mit so etwas wie
Peek()
würde Ihnen helfen, vermeiden Sie diese magischen zahlen? - fertig stackoverflow.com/a/13583279/659190
- sehen
EDIT2
- Das bedeutet, dass, wenn es einen permanenten Fehler, du wirst niemals Fortschritte Vergangenheit Element. Wenn es das ist, was Sie wollen, Sie können einfach verwenden, unbegrenzte Anzahl von Wiederholungen, Sie brauchen nicht
Peek()
für, die.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Sollten Sie überlegen, intermediate queue.
BlockingCollection<T>
kann nicht "peek" - Artikel wegen seiner Natur - es kann mehr als ein Verbraucher. Einer von Ihnen kann peek ein Element, und kann eine andere nehmen es somit zunächst wird man versuchen, nehmen Element, das schon genommen wurde.First()
Erweiterung Methode fürIEnumerable<T>
, die nicht entfernen Sie ein Element aus der Blockierung Auflistung.Take()
aber ohne entfernen des Elements).Als Dennis sagt in seinem Kommentar,
BlockingCollection<T>
bietet eine blockierende wrapper auf jede Implementierung desIProducerConsumerCollection<T>
- Schnittstelle.Wie Sie sehen können,
IProducerConsumerCollection<T>
per design nicht definierenPeek<T>
oder andere Methoden zu implementieren. Dies bedeutet, dassBlockingCollection<T>
nicht, wie es steht, bieten eine analouge zuPeek
.Wenn Sie erwägen, diese großartige reduziert die concurrencey Probleme geschaffen, indem Sie das Dienstprogramm trade-off eines
Peek
Umsetzung. Wie kann man konsumieren, ohne in Anspruch genommen? ZuPeek
gleichzeitig müsstest du den Kopf von der Sammlung bis in diePeek
Vorgang abgeschlossen wurde, das ich und die Designer vonBlockingCollection<T>
Sicht als sub-optimal. Ich denke, es wäre auch chaotisch und schwierig zu implementieren, erfordert eine Art von Einweg-peek Kontext.Wenn Sie verbrauchen eine Nachricht und der Verbrauch ausfällt Sie haben zu behandeln es. Man könnte hinzufügen, es zu einem anderen Fehler-Warteschlange erneut hinzufügen, um die normale Verarbeitung in die Warteschlange für ein furture versuchen Sie es erneut oder melden Sie sich einfach, seine Fehler für die Nachwelt oder eine andere Aktion abhängig von Ihrem Kontext.
Wenn Sie nicht wollen, zu konsumieren, die Nachrichten gleichzeitig, dann gibt es keine Notwendigkeit zu verwenden
BlockingCollection<T>
da brauchen Sie nicht gleichzeitigen Verbrauch. Könnten SieConcurrentQueue<T>
direkt, erhalten Sie immer noch von Synchronizität aus fügt, und Sie könnenTryPeek<T>
sicher, da Sie die Kontrolle über einen einzelnen Verbraucher. Wenn Konsum fehlschlägt, Sie zu stoppen könnte der Verbrauch mit ein infinite retry loop in Ihr den Wunsch, obwohl, ich schlage vor, dies erfordert einige design gedacht.BlockingCollection
fügt eine weitere potentielle nutzen, nämlich, dassTryTake()
warten, bis ein Element verfügbar ist. Wenn wir wollen, dass dasBlockingCollection
weil das so ist, können wir noch peek mitAny()
, richtig? (Unter der Annahme einer einzigen Konsum-thread.)BlockingCollection<T>
ist ein wrapper umIProducerConsumerCollection<T>
, die mehr allgemein als z.B.ConcurrentQueue
und gibt dem Implementierer die Freiheit, nicht zu müssen, implementieren Sie eine(Try)Peek
Methode.Jedoch, Sie können immer rufen Sie
TryPeek
auf die zugrunde liegende Warteschlange direkt:Beachten Sie jedoch, dass Sie müssen nicht ändern Sie Ihre Warteschlange über
useOnlyForPeeking
, sonstblockingCollection
wird verwirrt und kann werfenInvalidOperationException
s an Sie, aber ich wäre überrascht, wenn der Aufruf der nicht-ändernTryPeek
auf diese gleichzeitige Daten-Struktur wäre ein Problem.Könnten Sie
ConcurrentQueue<T>
statt, es hatTryDequeue()
Methode.Also, keine Notwendigkeit zu prüfen, einen Blick zuerst.
TryDequeue()
ist thread-sicher:Soweit ich das verstanden habe es gibt nur dann false, wenn die Warteschlange leer ist,:
http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx
UPDATE
Vielleicht die einfachste option wäre die Verwendung
Aufgabenplaner
Klasse. Mit ihm können Sie wickeln Sie alle Ihre Aufgaben zur Verarbeitung in die Warteschlange Elemente und vereinfachen die Umsetzung der synchronisation.BlockingCollection<T>
ist ein wrapper umConcurrentQueue<T>
standardmäßig. Es gibt einen Unterschied zwischen Ihnen:BlockingCollection<T>
erlaubt, zu warten, bis es verfügbar sein wird, Objekte in der Warteschlange, in der Erwägung, dassConcurrentQueue<T>
noch nicht diese Funktionalität.TryDequeue()
nur ermöglicht die überprüfung, ob ein Element aus der Warteschlange entfernt werden, es ist wieInt32.TryParse()
- funktioniert ohne eine Ausnahme für den Fall des Scheiterns"Tries to remove and return the object at the beginning of the concurrent queue."
Es gibt nur dann false, wenn die Warteschlange leer ist.Queue.Dequeue()
: es bedeutet nicht, werfenInvalidOperationException
wenn die queue leer ist und nur false zurück, nichts mehr. Wie würden Sie also empfehlen es in meinem Fall? Vor allem listenign in die Warteschlange? Ich weiß nicht, wiewhile(queue.TryDequeue(), tiemOut)
SchleifePeek()
Methode. Sie brauchen sicher einige zusätzliche Synchronisations-Objekt, das ich verwenden würde, ein Ereignis, ManualResetEvent, vielleicht...