Berechnen Sie die Unterschiede zwischen succesive Datensätze in Hadoop mit Hive-Abfragen
Habe ich eine Hive-Tabelle, die enthält Daten, die der Kunde fordert.
Einfachheit halber betrachten, es hat 2 Spalten, erste Spalte enthält die Kunden-ID und die zweite Spalte enthält den Zeitstempel des Aufrufs (unix-timestamp).
Ich kann eine Abfrage an diese Tabelle zu finden, die alle Aufrufe für jeden Kunden:
SELECT * FROM mytable SORT BY customer_id, call_time;
Ist das Ergebnis:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
Ist es möglich, das erstellen einer Hive-Abfrage zurückgibt, für jeden Kunden, beginnend ab dem zweiten Aufruf, das Zeitintervall zwischen zwei succesive Anrufe?
Für das obige Beispiel, die Abfrage zurückgeben soll:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
Ich habe versucht, passen Sie die Lösungen aus der sql-Lösung, aber ich bin stecken mit dem Bienenkorb Einschränkungen: es akzeptiert Unterabfragen nur in AUS und joins enthalten muss, nur Gleichheit.
Danke.
EDIT1:
Habe ich versucht, ein Hive UDF-Funktion:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
und verwenden Sie es mit dem Namen "delta".
Aber es scheint (aus den Protokollen und Ergebnis), es ist verwendet bei der KARTE Zeit. 2 Probleme die sich daraus ergeben:
Erste: Die Daten der Tabelle muss sortiert werden, indem Sie Kunden-ID und timestamp-VOR der Verwendung dieser Funktion. Die Abfrage:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
funktioniert nicht, weil die Sortierung der Artikel erfolgt bei der REDUZIERUNG der Zeit, lange nach meiner Funktion verwendet wird.
Kann ich die Tabelle Sortieren Daten, bevor Sie die Funktion, aber ich bin nicht zufrieden mit diesem, weil es ist ein Aufwand, ich hoffe zu vermeiden.
Zweite: Im Falle einer verteilten Hadoop-Konfiguration werden die Daten aufgeteilt unter den verfügbaren job-Tracker. So, glaube ich, wird es mehrere Instanzen dieser Funktion, eine für jeden mapper, so ist es möglich, die gleichen Kundendaten split zwischen 2 Mapper. In diesem Fall werde ich verlieren, Kunden anrufen, die nicht akzeptabel ist.
Ich weiß nicht, wie dieses Problem zu lösen. Ich weiß, dass zu VERBREITEN, INDEM sichergestellt wird, dass alle Daten mit einem bestimmten Wert gesendet wird, um die gleiche reducer (so sicherzustellen, dass die ART funktioniert, wie erwartet), weiß jemand ob es etwas ähnliches für den mapper?
Nächsten I-plan zu Folgen libjack ' s Vorschlag, eine Reduzierung Skript. Diese "Berechnung" ist notwendig, zwischen einigen anderen hive-Abfragen, so will ich versuchen alles Hive bietet, bevor Sie zu einem anderen Werkzeug, wie vorgeschlagen, durch Balaswamy vaddeman.
EDIT2:
Begann ich zu untersuchen, die benutzerdefinierten Skripts Lösung. Aber, auf der ersten Seite des Kapitels 14 in der Programmierung mit Hive-Buch (dieses Kapitel zeigt die benutzerdefinierten Skripts), fand ich den folgenden Absatz:
Streaming ist in der Regel weniger effizient als die Kodierung der vergleichbaren UDFs oder
InputFormat Objekte. Serialisieren und Deserialisieren von Daten zu übergeben, die es in und
aus dem Rohr ist relativ ineffizient. Es ist auch schwieriger zu Debuggen die ganze
Programm in einer einheitlichen Art und Weise. Es ist jedoch nützlich für schnelles prototyping
und für die Wiederverwendung von vorhandenen code, der nicht in Java geschrieben. Für Hive
Benutzer, die nicht wollen, um das schreiben von Java-code, kann es eine sehr effektive
Ansatz.
So war es klar, dass die benutzerdefinierte Skripts ist nicht die beste Lösung in Bezug auf die Effizienz.
Aber wie sollte ich meine UDF-Funktion, aber stellen Sie sicher, dass es funktioniert wie erwartet in einem verteilten Hadoop-Konfiguration? Ich fand die Antwort auf diese Frage in der UDF-Internals-Abschnitt der Sprache Manual UDF wiki-Seite. Wenn ich Schreibe, meine Abfrage:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
durchgeführt, ist auf die Zeit VERRINGERN und zu VERTEILEN, DURCH und SORTIEREN Konstrukte garantieren, dass alle Datensätze vom gleichen Kunden bearbeitet werden, die von den gleichen reducer, in der Reihenfolge der Anrufe.
So dass die oben genannten UDF und diese Abfrage konstruieren, die mein problem lösen.
(Sorry für die nicht das hinzufügen von links, aber ich bin nicht erlaubt, es zu tun, weil ich nicht genug Ruf-Punkte)
- Ich denke, dies ist sehr ähnlich zu diese Frage, antwortete ich mit einer custom map/reduce im hive. Sie würde nur noch um die entsprechende reduzieren Skript.
- Ich weiß nicht, wie das im Bienenstock, aber es ist cascading-api, dies zu tun.es gibt so genannte Puffer, die in cascading.docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html
Du musst angemeldet sein, um einen Kommentar abzugeben.
Es ist eine alte Frage, aber für die Zukunft Verweise, Schreibe ich hier eine weitere Aussage:
Hive Windowing Funktionen ermöglicht, um den vorherigen /nächsten Werten in der Abfrage.
Einen ähnlichen code der Abfrage kann sein :
SELECT customer_id, LAG(call_time, 1, 0) OVER (PARTITION BY customer_id, UM DURCH call_time) - call_time FROM mytable;
Können Sie die explizite
MAP-REDUCE
mit anderen Programmiersprache wie Java oder Python.Wo emittieren von anzeigen
{cutomer_id,call_time}
und in reducer erhalten Sie{customer_id,list{time_stamp}}
und in reducer Sortieren Sie diese Zeitstempel und kann die Daten verarbeiten.Vielleicht jemand trifft auf eine ähnliche Anforderung, die Lösung, die ich fand, ist die folgende:
1) Erstellen Sie eine benutzerdefinierte Funktion:
2) Erstellen Sie eine jar mit dieser Funktion. Nehmen wir an, die jarname ist myjar.jar.
3) Kopieren Sie die jar, um die Maschine mit Bienenkorb. Nehme an, es ist in /tmp
4) Definieren Sie die benutzerdefinierte Funktion in Hive:
5) Führen Sie die Abfrage:
Bemerkungen:
ein. Ich vermutete, dass der call_time Spalte speichert Daten als bigint. In Fall ist es string, in-Prozess-Funktion rufen wir es als string (wie tun wir mit den "customerId"), dann analysiert es zu Lange
b. Ich habe mich entschieden, eine UDTF statt UDF, denn auf diese Weise generiert es alle Daten, die er braucht. Ansonsten (mit UDF), die generierten Daten müssen gefiltert werden, um überspringen von NULL-Werten. Also, mit der UDF-Funktion (DeltaComputerUDF) beschrieben, in der ersten Bearbeitung des ursprünglichen Beitrags, wird die Abfrage:
c. Beide Funktionen (UDF und UDTF) arbeiten als gewünscht, egal die Reihenfolge der Zeilen innerhalb der Tabelle (so ist es nicht erforderlich, dass die Tabelle die Daten sortiert nach Kunden-id und rufen Sie die Zeit, bevor Sie delta-Funktionen)