Le langage HiveApache Hive est une infrastructure d’entrepôt de donnée intégrée sur Hadoop permettant l'analyse, le requétage SQL de données

Introduction

Hive est un moteur SQL qui travaille sur des fichiers textes présents sur hdfs. Hive permet de définir la notion de table, de colonne, etc.

Lancement de Hive

Lancer l'invite de commandes hive

Pour lancer une invite de commande hive, on lance la commande suivante :

hive

On peut lancer une commande particulière en lancant :

hive -e 'commande'

Pour lancer un script on utilise :

hive -f script.hql

Paramétrage d'un script

Pour passer un paramètre à un programme hive, on utilise l'option -hiveconf :

hive -hiveconf "variable=valeur" -f ./script.hql

Dans le script hive, pour accéder au contenu de la variable, on utilise ${hiveconf:variable}

Gestion des logs

Pour envoyer les logs du traitement vers un fichier texte, on utilise les deux properties hive.log.dir et hive.log.file de la manière suivante :

hive -hiveconf hive.log.dir=/etc/dossier -hiveconf hive.log.file=fichier.txt -f script.hql 
Fonctionnement de hive

Hive ne réorganise pas les données, il se contente de créer un catalogue de table associé à un ensemble de fichiers. Ce catalogue se nomme le metastore hive.

Metastore
Création et suppression des objets hive

Les opérations de création et de suppression des objets hive sont très rapides, en effet, les données ne sont ni consultées ni écrites. Seules le metastore de hive est impacté par ces opérations. Le temps de traitement est donc très faible.

Les databases

Pour créer une database :

CREATE DATABASE nom_database;

Pour utiliser une database, on utilise la commande suivante :

USE nom_database;

Pour supprimer une database, on utilise la commande DROP. On peut utiliser l'option IF EXISTS pour ne pas afficher de message d'erreur si la database n'existe pas. Pour supprimer une database non vide en supprimant les tables qu'elle contient, on utilise l'option CASCADE comme décrit ci-dessous :

DROP DATABASE nom_database;
DROP DATABASE IF EXISTS nom_database;
DROP DATABASE IF EXISTS nom_database CASCADE;

Les tables

Tables internes et externes

Il existe deux catégories de tables : Les tables internes et les tables externes. La différence n'est pas très grande :

  • les managed tables : la gestion de ces tables est exclusivement faite par hive, un drop de la table supprimera le fichiers texte associés. Il n'est pas possible d'insérer des fichiers texte dans la table en utilisant un autre outil.
  • Les tables externes sont des tables dont le contenu n'est pas géré par hive. Un drop table supprimera le référencement de la table dans le metastore mais ne supprimera pas les données. Si la table est recréée, les données seront de nouveau présentes. Cette solution permet d'insérer des données dans une table hive en utilisant un outil externe.

Les instructions pour créer les tables sont :

CREATE TABLE table_interne <colonnes>;
CREATE EXTERNAL TABLE table_externe <colonnes> LOCATION /user/tables/t1;

Définition des colonnes

Pour définir les colonnes, on leur donne un nom et un type en respectant l'ordre d'appartion de la colonne dans le fichier

Les types de colonnes sont :

Types de colonnes hive
Type Description Exemple
int entier stocké sur 32 bit 10
long entier stocké sur 64 bit 10
float réel stocké sur 32 bit 14.2
double réel stocké sur 64 bit 14.2
string chaine de caracatères chaine
date date yyyy-mm-dd 2015-12-30
timestamp date yyyy-mm-dd hh:mm:ss 2015-12-30 11:35:20

Pour définir les colonnes, on utilise le code suivant :

-- Création d'une table interne
CREATE TABLE tailles (
 nom string,
 prenom string,
 taille int
);
-- Création d'une table externe
CREATE EXTERNAL TABLE tailles (
 nom string,
 prenom string,
 taille int
)
LOCATION /user/tables/tailles;

Définir le format de stockage

Comme nous l'avons vu précédement, les tables hive correspondent à des fichiers texte formatés en colonnes. Nous pouvons définir le format de stockage en utilisant :

  • ROW FORMAT : pour préciser que les information sont stockées en ligne
  • DELIMITED FIELDS : pour préciser que les données sont délimitées par un champ
  • TERMINATED BY <sep> pour préciser le caractère utilisé pour séparer les champs. Attention, certains caractères doivent être échappés.

Nous aurons alors l'instruction suivante :

CREATE EXTERNAL TABLE tailles (
 nom string,
 prenom string,
 taille int
)
LOCATION /user/tables/tailles
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;';

Ignorer les premières lignes

Il est possibles d'ignorer automatiquement la première lignes d'un fichier en ajoutant l'option tblproperties("skip.header.line.count"='0');

CREATE EXTERNAL TABLE tailles (
 nom string,
 prenom string,
 taille int
)
LOCATION /user/tables/tailles
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;'
tblproperties("skip.header.line.count"='1');

Création d'une partition

Il est possible de partitionner les tables suivant une colonne. Les partitions correspondront à des sous dossiers du dossier contenant la table.

Créer une partition augmentera la durée des traitements d'insertion mais fera diminuer la durée des traitements pour lesquels on regroupe ou on ne séléctionne qu'une partie des données en fonction de la variables qui aura servi à créer la partition.

Pour créer une partition selon la taille :

CREATE EXTERNAL TABLE tailles (
 nom string,
 prenom string,
) 
PARTITIONED BY (taille int) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;' 
LOCATION /user/tables/tailles
tblproperties("skip.header.line.count"='1');
Chargement de données dans une table hive

Chargement direct avec load data

Le chargement direct consiste à déplacer un fichier de données vers le répertoire contenant les données associée à une table hive. De cette manière, les données seront très rapidement mobilisables. Cette opération est très rapide (un déplacement ne touche pas au contenu des fichiers).

On peut a l'aide de l'instruction LOAD DATA insérer directement des données dans une table hive. Le fichier en entrée devra avoir le format spécifié au moment de la création de la table hive.

LOAD DATA INPATH '/user/cloudera/tailles.txt' 
INTO TABLE tailles;

Si la table a été partitionnée, la colonne selon laquelle la table a été partitionnée de devra pas apparaitre dans le fichier en entrée, la partition sera précisée lors du load de la manière suivante :

LOAD DATA INPATH '/user/cloudera/tailles.txt' 
INTO TABLE tailles
PARTITION (taille=180);

Il n'est donc pas possible d'insérer avec load dans une table partitionnée des données pour lesquelles la varialbe servant à créer la partition n'est pas définie en dehors du fichier. Pour cela, il y a deux solutions :

  • Découper au préalable le fichier en aurant de fichiers que de valeurs différente que la variable servant à faire les partition et daire autant de load que de fichiers.
  • Utiliser une commande INSERT INTO après avoir enregistré les données dans une table temporaire.

Chargement de données à l'aide d'un INSERT

On peut insérer des données dans une table à l'aide d'une requête INSERT. Ce traitement est plus long que le load et nécéssite d'avoir chargé les données dans une table hive au préalable, en revanche, les possibilités de traitement des données sont plus grandes.

Par exemple, si nous avons une table temp_tailles qui contient les colonnes : nom, prénom, taille, poids, nous pouvons réaliser un insert de la manière suivante :

INSERT INTO tailles
SELECT nom, prenom, taille
FROM temp_tailles
WHERE NOT taille=0;

Si la table a été partitionnée :

INSERT INTO tailles 
PARTITION(taille)
SELECT nom, prenom, taille
FROM temp_tailles
WHERE NOT taille=0;
Explorer les databases, les tables et les partitions

Pour lister les databases on utilise :

SHOW DATABASES;

Pour se positionner sur une database, on utilise :

USE db1;

Pour visualiser les tables et les partitions :

SHOW TABLES;
SHOW PARTITIONS tailles;
Interrogation des données

Une fois les objets créés et renseignés, l'interrogation des données se fait grâce au langage SQL.

Exemple de jointure

SELECT *
FROM tailles t 
JOIN poids p 
ON p.nom = t.nom 
AND p.prenom = t.prenom
WHERE t.taille > 150;

Exemple d'agrégation

SELECT taille, count(*)
FROM tailles
GROUP BY taille;
Fonctions définies par l'utilisateur

Définition

Les fonctions définies par l'utilisateur ou UDF sont des programmes java qui sont utilisés par hive pour créer de nouvelles fonctions de calcul sur les lignes comme par exemple CONCAT().

Dans cette partie, nous allons utiliser une fonction qui normalise les chaines de caractères de la manière suivante :

  • La chaine de caractère commencera pas une majuscule et toutes les autres lettres seront des minuscules
  • La chaine de caractère sera tronquée à partir d'un cacatère dont l'index est fournit en paramètre

Création de l'udf

>Mise en place du projet

Pour créer un udaf hive, le programme java devra contenir les dépendances suivantes :

  • org.apache.hadoop.hadoop-core
  • org.apache.hive.hive-exec

Attention, dans le livrable final, il ne faut pas réempaqueter les librairies pig et hadoop. L'environnement d'exécution du cluster hadoop les contient déjà ainsi que les libraires les plus fréquentes : log4j par exemple. Le scope à utiliser est donc le scope provided.

Le pom.xml pourra contenir les dépendances suivantes :

<dependency>
	<groupId>org.apache.hive</groupId>
	<artifactId>hive-exec</artifactId>
	<version>1.2.1</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-core</artifactId>
	<version>1.0.3</version>
	<scope>provided</scope>
</dependency>

Ecriture de l'udf

Pour créer l'udf, il faut étendre la classe org.apache.hadoop.hive.ql.udf.generic.GenericUDF et implémenter les méthodes

  • initialize(ObjectInspector[] arguments) : qui est exécuter lors de la déclaration de la méthode dans hive
  • evaluate(DeferredObject[] arguments) : qui sert à effectuer l'opération de transformation
  • getDisplayString(String[] children) : qui sert à afficher un descriptif du traitement
public class MiseAuFormatChaine extends GenericUDF {

    private StringObjectInspector chaineInspector;
    private IntObjectInspector tailleInspector;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length != 2) {
            throw new UDFArgumentLengthException("L'appel de l'udf prend deux paramètres : la chaine à transformer, la taille à partir de laquelle on tronque");
        }
        ObjectInspector a = arguments[0];
        ObjectInspector b = arguments[1];
        if (!(a instanceof StringObjectInspector) || !(b instanceof IntObjectInspector)) {
            throw new UDFArgumentException("Le premier argument doit être une chaine / Le second argument doit être un int");
        }
        this.chaineInspector = (StringObjectInspector) a;
        this.tailleInspector = (IntObjectInspector) b;
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        String chaine = chaineInspector.getPrimitiveJavaObject(arguments[0].get());
        int tailleMax = (int)tailleInspector.getPrimitiveJavaObject(arguments[1].get());
        String res=chaine.toLowerCase();
        res=String.valueOf(chaine.charAt(0)).toUpperCase()+res.substring(1);
        if(res.length()>tailleMax){
            res=res.substring(0, tailleMax);
        }
        return res;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "UDF de mise au format des chaines de caractères";
    }
}

Appel de l'udf dans hive

ADD JAR udf-mise-au-format-whith-dependencies.jar;
CREATE TEMPORARY FUNCTION maf as 'fr.MiseAuFormatChaine';
SELECT maf(colonne, 50) FROM table;
Fonctions d'agrégation définies par l'utilisateur

Définition

Les fonctions d'agrégation définies par l'utilisateur ou UDAF sont des programmes java qui sont utilisés par hive pour créer de nouvelles fonctions d'agrégation (comme count() ou sum()).

Il existe deux moyens de créer des fonctions d'agrégation : simple ou générique. Les fonctions simples sont peu performantes et terme de temps d'exécution et très limitées fonctionnellement, nous ne présenterons donc que les génériques. Ces fonctions sont un peu complexes.

Création de l'udaf

Nous allons créer une UDAF produit(n) qui réalisera le produit des nombres passés en paramètre.

Mise en place du projet

Pour créer un udaf hive, le programme java devra contenir les dépendances suivantes :

  • org.apache.hadoop.hadoop-core
  • org.apache.hive.hive-exec

Attention, dans le livrable final, il ne faut pas réempaqueter les librairies pig et hadoop. L'environnement d'exécution du cluster hadoop les contient déjà ainsi que les libraires les plus fréquentes : log4j par exemple. Le scope à utiliser est donc le scope provided.

Le pom.xml pourra contenir les dépendances suivantes :

<dependency>
	<groupId>org.apache.hive</groupId>
	<artifactId>hive-exec</artifactId>
	<version>1.2.1</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-core</artifactId>
	<version>1.0.3</version>
	<scope>provided</scope>
</dependency>

Ecriture de l'udaf

Dans un premier temps, il faut créer une classe qui étend AbstractGenericUDAFResolver. Il faut alors implémenter une méthode getEvaluator qui retourne une instance de GenericUDAFEvaluator. Cette méthode prend en entrée des paramètres qui nous permettent d'aiguiller vers un evaluateur qui peut être variable. A priori, nous n'avons rien n'a faire dans cette méthode si ce n'est de vérifier que les paramètres sont corrects.

La classe à crée est la suivante :

@Description(name = "produit", value = "_FUNC_(n) - Retourne le produit")
public class ProduitUDAFResolver extends AbstractGenericUDAFResolver {

	@Override
	public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
		int nbArg = parameters.length;
		if (nbArg != 1) {
			throw new UDFArgumentTypeException(
			nbArg - 1, "nombre d'arguments incorrect");
		}
		ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
		if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
			throw new UDFArgumentTypeException(0, "type non primitif");
		}
		PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
		if (poi.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) {
			throw new UDFArgumentTypeException(0, "type incorrect");
		}
		return ProduitUDAFEvaluator.getInstance();
	}

}

GenericUDAFEvaluator possède 4 modes d'exécution en fonction de l'étape de l'agrégation :

  • PARTIAL1 : cette étape commence avec les données au format originales et se termine lors du premier niveau d'agrégation : les résultats sont calculés sur chaque datanode.
  • PARTIAL2 : cette étape commence avec les données agrégées sur chaque datanode et se termine lorsqu'il ne reste plus que deux résultats à agréger.
  • FINAL : cette étape commence avec deux donées agrégées partiellement et se termine avec le résutat final du traitement.
  • COMPLETE : mode utilisé pour une exécution sur un seul datanode : il n'y a pas de résultat partiel.

Pour créer l'évaluateur, nous devons disposer d'une instance d'AgregationBuffer qui réalisera le calcul et qui enregistrera le résultat.

Dans l'évaluateur, nous devrons créer les méthodes suivantes :

  • AggregationBuffer getNewAggregationBuffer() : méthode permettant de récupérer une nouvelle instance de la classe qui effectue le calcul : la classe héritant de AgregationBuffer
  • void reset(AggregationBuffer agg) : méthode permettant de remettre à 0 le résultat teporaire de la classe héritant de AgregationBuffer
  • void iterate(AggregationBuffer agg, Object[] parameters) : Cette méthode sera appelée sur chaque entrée. Les données sont enregistrées dans le deuxième paramètre. Dans cette méthode, nous devons mettre à jour l'aggregationBuffer qui nous est fournit en paramètre en utilisant le deuxième paramètre.
  • Object terminatePartial(AggregationBuffer agg) : cette méthode sert à obtenir un résultat partiel sur un datanode. Pour générer ce résultat partiel, on se base sur l'AggregationBuffer qui est passé en paramètre.
  • void merge(AggregationBuffer agg, Object partial) : cette méthode permet d'agréger deux résultats partiels : le premier est contenu dans agg et le second est fournit sous forme d'un object. L'aggrégation consiste à mettre à jour l'AggregationBuffer,
  • Object terminate(AggregationBuffer agg) : C'est la méthode qui fournit le résultat final : à partir d'un AggregationBuffer, on donne le résultat.

En plus de toutes ces méthode abstraites à ré-implémenter, il est quasiment toujours utile d'imprlémenter la méthode ci-dessous :

  • ObjectInspector init(Mode m, ObjectInspector[] parameters) : Cette méthode permet d'initialiser le traitement, elle est appelée avant toutes les étapes qui modifient un aggregationBuffer et sert à récupérer les ObjectInspector qui vont parser les paramètres. On peut déterminer de quels paramètres il s'agit (données brutes ou résultat partiel) en fonction du mode. Si le mode est PARTIAL1 ou COMPLETE alors il s'agit des données brutes. Le retour de cette méthode est l'objectInspector permettant de parser le résultat.

Nous devons, pour écrire l'evaluateur comprendre comment fonctionne l'agrégation de données dans hive. Le diagramme ci-dessous illustre l'appel des différentes méthodes de l'évaluateur lors de la réalisation d'une agrégation :

Fonctionnement de l'aggrégation sur plusieurs serveurs

Allez, si vous êtes arrivés ici, vous avez fait le plus dur. Nous allons dans un même temps écrire l'agregationBuffer et l'évaluateur. Cest deux classes sont étroitement liées : l'agregationBuffer ne sert qu'à stocker un résultat temporaire : il n'y a aucune méthode à réimplémenter sur cette classe.

public class ProduitBuffer extends AbstractAggregationBuffer {

	private int produit;

	public ProduitBuffer() {
		init();
	}

	public void init() {
		this.produit = 1;
	}

	public void multiplier(int nombre) {
		produit *= nombre;
	}

	public int getProduit() {
		return produit;
	}
}

Et enfin, nous pouvons écrire l'évaluateur :

public class ProduitUDAFEvaluator extends GenericUDAFEvaluator {

	private static final ProduitUDAFEvaluator instance = new ProduitUDAFEvaluator();

	private PrimitiveObjectInspector parseurEntree;
	private PrimitiveObjectInspector parseurPartiel;

	private ProduitUDAFEvaluator() {
		super();
	}

	@Override
	public ObjectInspector init(Mode m, ObjectInspector[] p) throws HiveException {
		super.init(m, p);
		if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
			parseurEntree = (PrimitiveObjectInspector) p[0];
		}
		else {
			parseurPartiel = (PrimitiveObjectInspector) p[0];
		}
		return ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorOptions.JAVA);
	}

	@Override
	public AggregationBuffer getNewAggregationBuffer() throws HiveException {
		return new ProduitBuffer();
	}

	@Override
	public void reset(AggregationBuffer agg) throws HiveException {
		((ProduitBuffer) agg).init();
	}

	@Override
	public void iterate(AggregationBuffer agg, Object[] p) throws HiveException {
		ProduitBuffer buffer = (ProduitBuffer) agg;
		Object entree = parseurEntree.getPrimitiveJavaObject(p[0]);
		int nombre = Integer.parseInt(entree.toString());
		buffer.multiplier(nombre);
	}

	@Override
	public Object terminatePartial(AggregationBuffer agg) throws HiveException {
		ProduitBuffer buffer = (ProduitBuffer) agg;
		return buffer.getProduit();
	}

	@Override
	public void merge(AggregationBuffer agg, Object partial) throws HiveException {
		ProduitBuffer buffer = (ProduitBuffer) agg;
		Object resTemp = parseurPartiel.getPrimitiveJavaObject(partial);
		int resultatTemporaire = Integer.parseInt(resTemp.toString());
		buffer.multiplier(resultatTemporaire);
	}

	@Override
	public Object terminate(AggregationBuffer agg) throws HiveException {
		ProduitBuffer buffer = (ProduitBuffer) agg;
		return buffer.getProduit();
	}

	public static ProduitUDAFEvaluator getInstance() {
		return instance;
	}
}

Appel de l'udaf dans hive

Pour utiliser la fonction dans hive, il faut déclarer le resolver en lui donndant un nom en tant que fonction temporaire. On utilise ensuite cette fonction comme dans l'exemple suivant :

CREATE TEMPORARY FUNCTION produit AS 'fr.exemple.ProduitUDAFResolver';
SELECT produit(n1) FROM data;
SELECT type, produit(n1) FROM data GROUP BY type;