Langage PIGPig est une plateforme haut niveau pour la création de programme MapReduce utilisé avec Hadoop

Introduction

Pig se rapproche d'un ETL où l'on part d'un ou plusieurs flux de données que l'on transforme étape par étape jusqu'à atteindre le résultat souhaité. Les différentes étapes de la transformation sont exprimés dans un langage procédural (Pig Latin).

Conçu pour ingérer à peu près tout et n'importe quoi (d'où son nom), Pig n'impose aucune contrainte sur le format des données à traiter.

Lancement de Pig

Modes de lancement

Il existe deux manières de lancer pig :

  • En mode local, les fichiers de données doivent être localisés dans le même répertoire que le répertoire d’exécution du programme Pig. Dans ce cas, utiliser -x local.
  • En mode distribué, les fichiers de données doivent d’abord être déposés sur le système de fichiers distribué HDFS. Dans ce cas, utiliser -x mapreduce.

Lancer l'invite de commandes grunt

Pour lancer pig, ouvrir un terminal et lancer la commande :

pig -x mapreduce

Pour lancer pig en local :

pig -x local

Lancer un script pig

Pour lancer un script on utilise la commande suivante

pig -x mapreduce ./instructions.pig

Paramétrage d'un script

On peut passer des paramètres à un script pig lors du lancement en utilisant l'option -p :

pig -x mapreduce -p "var=valeur" ./instructions.pig

Dans le script pig, on accède au contenu de la variable en utilisant : '$var'

On peut préciser un chemin vers un fichier de paramètres à utiliser et le nom du script à exécuter:

pig -x mapreduce -param_file '/param.txt' ./instructions.pig

Le fichier de paramètres contient des variables définies en ligne sous la forme variable=valeur.

Gestion des logs

Pour rediriger les logs de pig, on utilise l'option -l <chemin_log>

pig -x mapreduce -l "$dossier/log_pig.txt" ./instructions.pig
Chargement des données

Dans cette partie, nous allons utiliser un fichier de test qui contient les informations suivantes :

COMTE;FROMAGE;10;8.5
VIN JAUNE;VIN;5;16
MORBIER;FROMAGE;7;18

Ce fichier est enregistré sous /etc/produits.txt

Fonction load

Pour charger un fichier, on utilise la fonction load. Si aucun argument n'est fournit, pig considère que le fichier en entrée est un fichier délimité par des tabulations.

data = LOAD '/etc/produits.txt';

Types de données

Les champs peuvent être typés et nommés et typés à l'aide de la commande as. Les types de données sont les suivantes

Types de colonnes Pig
TypeDescriptionExemple
intentier stocké sur 32 bit10
longentier stocké sur 64 bit10
floatréel stocké sur 32 bit14.2
doubleréel stocké sur 64 bit14.2
chararraychaine de caracatèreschaine
tupleun ensemble de valeurs(19,2)
bagune collection de tuples{(19,2), (18,1)}
mapun ensemble clé#valeur[open#apache]

Pour typer et nommer les données, on utilise le code suivant

data = LOAD '/etc/produits.txt' AS (produit:chararray, type:chararray, quantite:int, prix:float);

Formats de fichiers

L'option using permet de définir la facon dont les colonnes sont séparées. Sont présentés ci-dessous quelques formats mais il en existe d'autres.

Fichier délimité : PigStorage

PigStorage indique que le fichier à utiliser est un fichier délimité par un séparateur passé en argument. Par défaut, la tabulation est utilisée.

L'exemple ci-dessous permet de lire un fichier csv

data = LOAD '/etc/produits.txt' USING PigStorage(';');

Sous forme de tuples : PigDump

PigDump indique que le fichier est enregistré sous forme de tuples (col1,col2,col2)

data = LOAD '/etc/produits.txt' USING PigDump();

Données non structurées : TextLoader

Si les données ne présentent pas de scructure particulière, on peut utiliser un TextLoader qui enregistre les données sous forme d'une seule colonne :

data = LOAD '/etc/produits.txt' USING TextLoader();

Format personnalisé

Il est possible de définir des formats de stockage grâce à un programme java. Cette possibilité sera traitée dans une autre partie.

Table hive

Il est possible de charger une table hive avec pig, pour celà, il faut lancer pig avec l'option -useHCatalog :

pig -useHCatalog ./lanceur.pig

Le loader s'écrit alors :

data = LOAD 'db.table' USING org.apache.hive.hcatalog.pig.HCatLoader();

Réorganiser les colonnes : generate

On peut accéder aux champs via leur numéro :

b = FOREACH data GENERATE $1, $0;
DUMP b;

Dans la mesure ou les données ont été nommées, on peut modifier l'ordre d'apparition des champs sur la ligne :

b = FOREACH data GENERATE produit, prix;
DUMP b;
Créer un fichier à partir d'une relation

Fonction store

La fonction store permet de créer une fichier texte à partir d'une relation. Cette fonction s'utilise comme dans l'exemple suivant :

STORE data INTO '/user/cloudera/data';

Format de stockage

Les formats de stockage sont les mêmes que dans la partie sur le chargement des données. On écrira ainsi la ligne suivante pour enregsitrer des données au format csv :

STORE data INTO '/user/cloudera/data' USING PigStorage(';');
Affichage des données

Afficher le contenu d'un fichier : dump

La commande dump permet d'afficher le contenu d'une variable :

DUMP data;

Afficher la structure d'un élément : describe

La commande describe permet de donner la structure de l'élement passé en paramètre :

DESCRIBE data;
data: {produit:chararray,type:chararray,quantite:int,prix:float}
Opérateurs relationnels

Filtrer les données : filter

L'instruction filter est équivalente à une clause WHERE. Elle permet de ne sélectionner qu'une partie des données en fonction d'un critère.

Par exemple, pour conserver les données dont le prix est supérieur à 10 :

c1 = FILTER data BY prix > 10;
c2 = FILTER data BY prix == 10;
c3 = FILTER data BY NOT (prix == 10);
c4 = FILTER data BY (prix > 10) OR (prix < 5);

Trier les données : order

L'instruction order permet de trier les données en fonction d'un critère. On peut préciser desc pour que le tri soit fait par ordre décroissant.

d = ORDER data BY prix DESC;

Grouper des données : group

Réaliser un groupe

Créer un groupe signifie regrouper des variables selon les valeurs d'un champ. Le format du groupe est donc un tuple constitué de l'identifiant du groupe (group) et d'un tuple de tuples contenant les données associées.

Pour grouper des données pour lesquelles un champ est identique, on utilise la commande group :

e = GROUP data BY type;

Le résultat est le suivant :

(VIN,{(VIN JAUNE,VIN,5,16.0)})
(FROMAGE,{(MORBIER,FROMAGE,7,18.0),(COMTE,FROMAGE,10,8.5)})

Réorganiser le groupe

f = FOREACH e GENERATE group, data.produit, data.quantite, data.prix;

Le résultat est le suivant :

(VIN,{(VIN JAUNE)},{(5)},{(16.0)})
(FROMAGE,{(MORBIER),(COMTE)},{(7),(10)},{(18.0),(8.5)})

Récupérer des données

Pour travailler sur les tuples, on utilise la commande flatten :

g = FOREACH f GENERATE FLATTEN($1);

Le résultat est le suivant :

(VIN JAUNE)
(MORBIER)
(COMTE)

Compter le nombre d'éléments d'un groupe

prodByGroup = FOREACH d GENERATE group, COUNT($1);

Découper des données : split

La commande split permet de créer plusieurs relations en fonction de certaines conditions. Par exemple, le code suivant permet de créer deux relations en fonction des valeurs de la variable type.

SPLIT data INTO h IF type == 'FROMAGE', i IF type == 'VIN';

Réaliser des jointures : join

Pour cet exemple, nous disposons du fichier suivant :

VIN;ALCCOL
FROMAGE;ALIMENTAIRE

La jointure se réalise de la manière suivante

ref = LOAD '/etc/referentiel.txt' USING PigStorage(';') AS (type:chararray, famille:chararray);
k = JOIN data BY type, ref BY type;

Le dump de k donnera :

(VIN JAUNE,VIN,5,16.0,VIN,ALCCOL)
(MORBIER,FROMAGE,7,18.0,FROMAGE,ALIMENTAIRE)
(COMTE,FROMAGE,10,8.5,FROMAGE,ALIMENTAIRE)
UDF java pour définir un chargeur

Définition

Les UDFs (User Defined Fonctions) sont un ensemble de fonctions implémentant un traitement précis allant du calcul mathématique à la transformation de données. Les fonctions Pig comme COUNT, AVG, MAX, MIN sont en fait des UDFs déjà embarqués dans Pig. Il est possible d'étendre les fonctions de bases en développant des UDFs en Java.

Création d'une UDF en java

Mise en place du projet

Pour créer un udf pig, le programme java devra contenir les dépendances suivantes :

  • org.apache.hadoop.hadoop-core
  • org.apache.pig.pig

Afin de pouvoir tester unitairement les programmes, on peut utiliser la librairie suivante :

  • org.apache.pig.pigunit

Attention, dans le livrable final, il ne faut pas réempaqueter les librairies pig et hadoop. L'environnement d'exécution du cluster hadoop les contient déjà ainsi que les libraires les plus fréquentes : log4j par exemple. Le scope à utiliser est donc le scope provided.

Le pom.xml pourra contenir les dépendances suivantes :

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-core</artifactId>
	<version>1.0.3</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.pig</groupId>
	<artifactId>pig</artifactId>
	<version>0.12.1</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.pig</groupId>
	<artifactId>pigunit</artifactId>
	<version>0.12.1</version>
	<scope>test</scope>
</dependency>

Ecriture du programme

Pour créer une UDF, il suffit d'étendre la classe LoadFund et d'implémenter les méthodes :

  • getNext() : Tuple est la méthode qui doit renvoyer le tuple associé à chaque ligne du fichier en entrée
  • prepareToRead(recordReader, pigSplit) : void est la méthode qui prend en entrée les informations permettant d'effectuer le découpage. Le recordReader sera à conserver dans la classe, c'est la classe qui permettra de récupérer la ligne à traiter. PigSplit contient des informations sur le traitement.
  • setLocation(location, job) : void est la méthode qui communique au loader la destination du fichier à traiter. Pour cela, il suffit d'utiliser FileInputFormat.setInputPaths(job, location);
  • getInputFormat() : InputFormat est la méthode permettant de récupérer le format de l'entrée.
public class Chargeur extends LoadFunc {

	/** Premier paramètre du traitement*/
	private String param1;

	/** Second paramètre du traitement */
	private String param2;

	/**Service qui permet d'accéder à la ligne du fichier */
	private RecordReader<?, ?> reader = null;

	/** Service permettant de créer un Tuple à partir d'une liste */
	private TupleFactory factory = TupleFactory.getInstance();

	public Chargeur(String param1, String param2) {
		this.param1 = param1;
		this.param2 = param2;
	}

	@Override
	public InputFormat<?, ?> getInputFormat() throws IOException {
		Object res = null;
		String className = TextInputFormat.class.getName();
		try {
			res = PigContext.resolveClassName(className).newInstance();
		}
		catch (InstantiationException | IllegalAccessException e) {
			e.printStackTrace();
		}
		return (InputFormat<?, ?>) res;
	}

	@Override
	public void setLocation(String l, Job job) throws IOException {
		FileInputFormat.setInputPaths(job, l);
	}

	@Override
	public void prepareToRead(RecordReader r, PigSplit ps) throws IOException {
		this.reader = r;
	}

	@Override
	public Tuple getNext() throws IOException {
		Tuple res = null;
		try {
			if (!reader.nextKeyValue()) {
				reader.close();
				return null;
			}
			Text value = (Text) reader.getCurrentValue();
			String line = value.toString();
			List<String> champs = new ArrayList<String>();
			/*
			 * TODO il faut construire la liste des colonnes à retourner
			 */
			if (champs != null) {
				res = factory.newTuple(champs);
			}
		}
		catch (InterruptedException | IntegrationException e) {
			throw new ExecException(e.getMessage(), 202, PigException.REMOTE_ENVIRONMENT, e);
		}
		return res;
	}
}

Génération de l'exécutable

Le livrable doit être constitué d'un unique jar. Il faudra donc empaqueter les dépendances dans ce jar.

Pour cela, on peut utiliser le build suivant :

<plugin>
	<artifactId>maven-assembly-plugin</artifactId>
	<version>2.2.1</version>
	<executions>
		<execution>
			<phase>package</phase>
			<goals>
				<goal>single</goal>
			</goals>
		</execution>
	</executions>
	<configuration>
		<descriptorRefs>
			<descriptorRef>
				jar-with-dependencies
			</descriptorRef>
		</descriptorRefs>
	</configuration>
</plugin>

Appel d'une UDF dans Pig

Pour appeler une UDF pig, il faut d'abord ajouter le jar que l'on a créé précédement dans le classpath de pig) à l'aide de REGISTER et ensuite, on fait appel à la classe que l'on a créée pour charger le fichier/

REGISTER '/home/julien/udf_pig/udf-jar-with-dependencies.jar';
data = LOAD '$entree' USING fr.insee.Chargeur('p1', 'p2');
Appel de pig via Java

Les dépendances

Il est possible d'exécuter des commandes pig via un code java, pour celà, il faut ajouter au classapth du projet les librairies hadoop-common et pig. Ces librairies ne doivent pas être générées dans le livrable du projet.

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>2.6.0</version>
	<scope>provided</scope>
	<exclusions>
		<exclusion>
			<artifactId>jdk.tools</artifactId>
			<groupId>jdk.tools</groupId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>org.apache.pig</groupId>
	<artifactId>pig</artifactId>
	<version>0.10.0</version>
	<scope>provided</scope>
</dependency>

Utilisation de la classe PigServer

La classe PigServer permet d'exécuter des commandes pig. Ci-dessous un exemple d'utilisation de cette classe

PigContext context = new PigContext(ExecType.MAPREDUCE, new Properties());
try {
	PigServer pigServer = new PigServer(context);
	pigServer.registerQuery("data = LOAD '/etc/produits.txt' USING PigStorage('|')");
}
	catch (IOException e) {
	e.printStackTrace();
}

Lancement de java

Les librairies hadoop et ne doivent pas faire partie de la livraison du programme, le classpath devrai être modifié afin d'inclure les librairies déjà présentes sur le serveur dans les dossiers suivants :

  • /usr/lib/hadoop/client
  • /usr/lib/hadoop
  • /usr/lib/pig

Le dossier /etc/hadoop/conf devra également être ajouté au classpath.

dosier_lib="/lib"
dossier_properties="/properties"

function creer_classpath {
	classpath=
	for jar in $1/*.jar
	do
		classpath="$classpath:$jar"
	done
	echo $classpath
}
classpath="$(creer_classpath $dossier_lib)"
hadoop_classpath="$(creer_classpath /usr/lib/hadoop/client)"
hadoop_classpath="$hadoop_classpath:$(creer_classpath /usr/lib/hadoop)"
hadoop_classpath="$hadoop_classpath:/etc/hadoop/conf"
hadoop_classpath="$hadoop_classpath:/usr/lib/pig"

java7 -cp "$hadoop_classpath:$classpath" -Dproperties.path=$dossier_properties package.Lanceur