Das Lesen von mehreren Warteschlangen, RabbitMQ
Ich bin neu RabbitMQ. Ich möchte in der Lage sein zu handhaben, Nachrichten Lesen, ohne zu blockieren, wenn es mehrere Warteschlangen (Lesen). Alle Eingänge, wie kann ich das tun?
//Edit 1
public class Rabbit : IMessageBus
{
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
Subscription sub = null;
public void writeMessage( Measurement m1 ) {
byte[] body = Measurement.AltSerialize( m1 );
int msgCount = 1;
Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);
string finalQueue = publishToQueue( m1.id );
while (msgCount --> 0) {
channel.BasicPublish("amq.direct", finalQueue, null, body);
}
Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
}
public string publishToQueue(string firstQueueName) {
Console.WriteLine("Creating a queue and binding it to amq.direct");
string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
channel.QueueBind(queueName, "amq.direct", queueName, null);
Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName);
return queueName;
}
public Measurement readMessage() {
Console.WriteLine("Receiving message...");
Measurement m = new Measurement();
int i = 0;
foreach (BasicDeliverEventArgs ev in sub) {
m = Measurement.AltDeSerialize(ev.Body);
//m.id = //get the id here, from sub
if (++i == 1)
break;
sub.Ack();
}
Console.WriteLine("Done.\n");
return m;
}
public void subscribeToQueue(string queueName )
{
sub = new Subscription(channel, queueName);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public Rabbit(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
//consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
~Rabbit()
{
//observer??
connection.Dispose();
//channel.Dispose();
System.Console.WriteLine("\nDestroying RABBIT");
}
}
//Edit 2
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
foreach (BasicDeliverEventArgs ev in element) {
//ev = element.Next();
if( ev != null) {
m = Measurement.AltDeSerialize( ev.Body );
return m;
}
m = null;
}
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
//Edit 3
//MessageHandler.cs
public class MessageHandler
{
//Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
QueueingBasicConsumer consumer = null;
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public void writeMessage ( Measurement m1 )
{
byte[] body = Measurement.AltSerialize( m1 );
//declare a queue if it doesn't exist
publishToQueue(m1.id);
channel.BasicPublish("amq.direct", m1.id, null, body);
Console.WriteLine("\n [x] Sent to queue {0}.", m1.id);
}
public void publishToQueue(string queueName)
{
string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
channel.QueueBind(finalQueueName, "amq.direct", "", null);
}
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
if( element.QueueName == null)
{
m = null;
}
else
{
BasicDeliverEventArgs ev = element.Next();
if( ev != null) {
m = Measurement.AltDeSerialize( ev.Body );
m.id = element.QueueName;
element.Ack();
return m;
}
m = null;
}
element.Ack();
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public MessageHandler(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
public void disposeAll()
{
connection.Dispose();
channel.Dispose();
foreach(Subscription element in subscriptions)
{
element.Close();
}
System.Console.WriteLine("\nDestroying RABBIT");
}
}
//App1.cs
using System;
using System.IO;
using UtilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
MessageHandler obj1 = MessageHandler("Rabbit");
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create new Measurement messages
Measurement m1 = new Measurement("q1", 2345, 23.456);
Measurement m2 = new Measurement("q2", 222, 33.33);
System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id);
System.Console.WriteLine(" Time: {0}", m1.time);
System.Console.WriteLine(" Value: {0}", m1.value);
System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id);
System.Console.WriteLine(" Time: {0}", m2.time);
System.Console.WriteLine(" Value: {0}", m2.value);
//Ask queue name and store it
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName = (System.Console.ReadLine()).ToString();
obj1.publishToQueue( queueName );
//Write message to the queue
obj1.writeMessage( m1 );
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName2 = (System.Console.ReadLine()).ToString();
obj1.publishToQueue( queueName2 );
obj1.writeMessage( m2 );
obj1.disposeAll();
}
}
//App2.cs
using System;
using System.IO;
using UtilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
//Asks for the message system
System.Console.WriteLine("\nEnter name of messageing system: ");
System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
string MsgSysName = (System.Console.ReadLine()).ToString();
//Declare an IMessageBus instance:
//Here, an object of the corresponding Message System
//(ex. Rabbit, Zmq, etc) is instantiated
IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create a new Measurement object m
Measurement m = new Measurement();
System.Console.WriteLine("Queue name to subscribe to: ");
string QueueName1 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue( QueueName1 );
//Read message into m
m = obj1.readMessage();
if (m != null ) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
System.Console.WriteLine("Another queue name to subscribe to: ");
string QueueName2 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue( QueueName2 );
m = obj1.readMessage();
if (m != null ) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
obj1.disposeAll();
}
}
InformationsquelleAutor Demi | 2011-07-14
Du musst angemeldet sein, um einen Kommentar abzugeben.
zwei Quellen der Information:
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Sollten Sie wirklich versuchen zu verstehen, die Beispiele zuerst.
%Program Files%\RabbitMQ\DotNetClient\examples\src (basic Beispiele)
erhalten, voll funktionsfähig, Beispiele aus der Mercurial-repository (c# - Projekte).
Nützliche Operationen zu verstehen:
Re:
deine Frage-es gibt keinen Grund, warum Sie können nicht mehrere listenners. Oder Sie können abonnieren Sie n routing-Pfade mit einer listenner auf eine "exchange".
** re: non-blocking **
Einem typischen listenner verbraucht Nachrichten ein zu einer Zeit. Sie können ziehen Sie Sie aus der Warteschlange, oder Sie werden automatisch platziert werden, nahe am Verbraucher in ein 'Fenster' Mode (definiert durch quality of service-qos-Parameter). Die Schönheit der Ansatz ist, dass viel harte Arbeit für Sie erledigt (re: Zuverlässigkeit, garantierte Zustellung, etc.).
Ein wesentliches Merkmal von RabbitMQ, ist, dass wenn es ein Fehler in der Verarbeitung, dann die Nachricht ist neu Hinzugefügt, wieder in der Warteschlange (eine Fehlertoleranz-Funktion).
Müssen, um zu wissen, mehr über Sie situation.
Oft, wenn Sie einen Beitrag in der Liste, die ich oben erwähnt, Sie bekommen können, jemanden auf die Mitarbeiter an RabbitMQ. Sie sind sehr hilfreich.
Hoffe, das hilft ein wenig. Es ist viel um Ihren Kopf herum auf den ersten, aber es lohnt sich, beharren.
Q&A
finden Sie unter: http://www.rabbitmq.com/faq.html
F. Können Sie abonnieren, um mehrere Warteschlangen mit neuen Abonnements(Kanal, queueName) ?
Ja. Sie verwenden entweder einen verbindlichen Schlüssel (z.B. abc.*.hij, oder abc.#.hij, oder Sie fügen mehrere Bindungen. Erstere geht davon aus, dass Sie entwickelt haben, Ihre routing-Tasten, um eine Art Prinzip, dass für Sie Sinn macht (siehe routing-Tasten in der FAQ). Für die letztere, Sie zu binden, müssen Sie mehr als eine Warteschlange.
Umsetzung von n-Bindungen manuell.
siehe: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs
es ist nicht viel code, der sich hinter diesem Muster, so könnte man Rollen Sie Ihre eigene Abonnement-Muster, wenn wildcards sind nicht genug. Sie könnten von dieser Klasse Erben aus und fügen Sie eine weitere Methode für zusätzliche Bindungen... wahrscheinlich wird das der Arbeit oder etwas in der Nähe dieser (ungetestete).
Den AQMP spec sagt, dass mehrere manuelle Bindung sind möglich: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind
Mit einem Abonnenten Sie werden benachrichtigt, wenn eine Nachricht verfügbar ist. Ansonsten, was Sie beschreiben, ist ein pull-Schnittstelle, wo Sie ziehen Sie die Nachricht auf Anfrage. Wenn keine Nachrichten vorhanden, erhalten Sie eine null, wie Sie möchten. btw: die Notify-Methode ist wahrscheinlich bequemer.
Live-code:
diese version verwenden müssen, wild cards zu abonnieren, um mehr als eine routing-Taste
n manuelle routing-Schlüssel mit Abonnement ist Links als übung für den Leser. 😉 Ich glaube, Sie waren gelehnt, hin zu einer pull-Schnittstelle sowieso. btw: pull-Schnittstellen sind weniger effizient als diejenigen Benachrichtigen.
Hinweis: die foreach verwendet IEnumerable, IEnumerable umschließt den Fall, dass eine neue Nachricht eingetroffen durch die "Ausbeute" - Anweisung. Effektiv ist es eine Endlosschleife.
- - - - UPDATE
AMQP wurde mit der Idee halten die Anzahl der TCP-verbindungen so gering wie die Anzahl der Anwendungen, so dass heißt, Sie können mehrere Kanäle pro Verbindung.
den code in dieser Frage (edit 3) versucht, zwei Abonnenten mit einem Kanal, in der Erwägung, dass sollte es (glaube ich) werden ein Abonnent pro Kanal-pro-thread zu vermeiden, sperren Probleme. Sugestion: Verwendung einer routing-key "wildcard". Es ist möglich, zu abonnieren, um mehr als eine eindeutige queue-Namen mit der java-client, aber der .net-client nicht nach meinem wissen haben diese umgesetzt in die Abonnenten-helper-Klasse.
Wenn Sie wirklich brauchen, zwei verschiedene queue-Namen auf dem gleichen Abonnement-thread, dann sind die folgenden pull-Sequenz vorgeschlagen .net:
-- UPDATE 2:
von RabbitMQ-Liste:
"davon ausgehen, dass element.Next() ist blockierend auf eines der Abonnements.
Konnte Sie abrufen Lieferungen von jedem Abonnement mit einem timeout
Lesen Vergangenheit. Alternativ könnte man die Einrichtung einer einzigen Warteschlange zu erhalten
alle Messungen und abrufen von Nachrichten mit einem einzigen Abonnement." (Emile)
Was das bedeutet ist, dass, wenn die erste Warteschlange leer ist .Next() blockiert, wartet auf die nächste Nachricht angezeigt. D. H. der Abonnent hat eine wait-for-next-message eingebaut.
-- UPDATE 3:
unter .net, verwenden Sie die QueueingBasicConsumer für den Verbrauch von mehreren Warteschlangen.
Tatsächlich ist hier ein thread über Sie, um ein Gefühl nach Verbrauch:
Warten auf eine Einzel-RabbitMQ-Meldung mit einem timeout
-- UPDATE4:
einige mehr info auf der .QueueingBasicConsumer
Gibt es Beispiel-code hier.
http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html
Beispiel kopiert und in die Antwort mit ein paar Modifikationen (siehe //<-----).
-- UPDATE 5 : ein einfach zu bekommen, wird auf eine Warteschlange (ein langsamer, manchmal aber auch mehr praktische Methode).
Nochmals vielen Dank. Ich bearbeitet den code für abonnieren und schreib-Funktionen vor. Allerdings habe ich diesen Lauf Zeit Fehler: wenn ich ein Abonnement zu sagen, zwei Warteschlangen und versuchen, Nachrichten zu Lesen, ich kann nur ermittelt werden Nachrichten für die erste Zeit. Ich kann nicht sehen, wo ich es versaut. Können Sie schauen, ob für mich?
das dauerte einige Jagd. Ich denke, Sie sind fehlt " - Abonnements.Ack()" am Ende Ihrer Leser-Schleife? Was bedeutet 'ich habe erfolgreich verarbeitet diese Nachricht, so gib mir die nächste.' Lassen Sie mich wissen, wenn, dass war es. Sonst ist Sie genau hinschauen.
N. B., sollte am Ende der äußeren Schleife
Ich fürchte, die Fehler sind immer noch vorhanden. :/
InformationsquelleAutor
Der einfachste Weg ist die Verwendung des EventingBasicConsumer. Ich habe ein Beispiel auf meiner Website, wie es zu benutzen. RabbitMQ EventingBasicConsumer
Diese Consumer-Klasse stellt eine Empfangene Ereignis, das Sie verwenden können, und daher NICHT blockiert. Der rest des Codes im Grunde bleibt die gleiche.
InformationsquelleAutor Kelly