data:image/s3,"s3://crabby-images/8c556/8c55612fffd1c5db9cd6e83e334fe23aa6759fb6" alt="Pig Design Patterns"
Pig – a quick intro
Pig is MapReduce simplified. It is a combination of the Pig compiler and the Pig Latin script, which is a programming language designed to ease the development of distributed applications for analyzing large volumes of data. We will refer to the whole entity as Pig.
The high-level language code written in the Pig Latin script gets compiled into sequences of the MapReduce Java code and it is amenable to parallelization. Pig Latin promotes the data to become the main concept behind any program written in it. It is based on the dataflow paradigm, which works on a stream of data to be processed; this data is passed through instructions, which processes the data. This programming style is analogous to how electrical signals flow through circuits or water flows through pipes.
This dataflow paradigm is in stark contrast to the control flow language, which works on a stream of instructions, and operates on external data. In a traditional program, the conditional executions, jumps, and procedure calls change the instruction stream to be executed.
Processing statements in Pig Latin consist of operators, which take inputs and emit outputs. The inputs and outputs are structured data expressed in bags, maps, tuples, and scalar data. Pig resembles a dataflow graph, where the directed vertices are the paths of data and the nodes are operators (such as FILTER, GROUP, and JOIN) that process the data. In Pig Latin, each statement executes as soon as all data reaches them in contrast to a traditional program that executes as soon as it encounters the statement.
A programmer writes code using a set of standard data-processing Pig operators, such as JOIN
, FILTER
, GROUP
BY
, ORDER
BY
, and UNION
. These are then translated into MapReduce jobs. Pig itself does not have the capability to run these jobs and it delegates this work to Hadoop. Hadoop acts as an execution engine for these MapReduce jobs.
It is imperative to understand that Pig is not a general purpose programming language with all the bells and whistles that come with it. For example, it does not have the concept of control flow or scope resolution, and has minimal variable support, which many developers are accustomed to in traditional languages. This limitation can be overcome by using User Defined Functions (UDFs), which is an extensibility feature of Pig.
For a deeper understanding, you may have to refer the Apache web site at http://pig.apache.org/docs/r0.11.0/ to understand the intricacies of the syntax, usage, and other language features.
Understanding the rationale of Pig
Pig Latin is designed as a dataflow language to address the following limitations of MapReduce:
- The MapReduce programming model has tightly coupled computations that can be decomposed into map phase, shuffle phase, and a reducer phase. This limitation is not appropriate for real-world applications that do not fit into this pattern and tasks having a different flow like joins or n-phases. Few other real-world data pipelines require additional coordination code to combine separate MapReduce phases for management of the intermediate results between pipeline phases. This takes its toll in terms of the learning curve for new developers to understand the computation.
- Complex workarounds have to be implemented in MapReduce even for the simplest of operations like projection, filtering, and joins.
- The MapReduce code is difficult to develop, maintain, and reuse, sometimes taking the order of the magnitude than the corresponding code written in Pig.
- It is difficult to perform optimizations in MapReduce because of its implementation complexity.
Pig Latin brings the double advantage of being a SQL-like language with its declarative style and the power of a procedural programming language such as MapReduce using various extensibility features.
Pig supports nested data and enables complex data types to be embedded as fields of a table. The support for nested data models makes data modeling more intuitive since this is closer to the reality of how data exists than the way a database models it in the first normal form. The nested data model also reflects how the data is stored on the disk and enables users to write custom UDFs more intuitively.
Pig supports creation of user-defined functions, which carry out specialized data processing tasks; almost all aspects of programming in Pig are extensible using UDFs. What it implies is that a programmer can customize Pig Latin functions like grouping, filtering, and joining using the EvalFunc
method. You can also customize load/store capabilities by extending LoadFunc
or StoreFunc
. Chapter 2, Data Ingest and Egress Patterns, has examples showing Pig's extensibility.
Pig has a special feature, called the ILLUSTRATE
function to aid the Big Data developer to develop code using sample data quickly. The sample data closely resembles the real data as much as possible and fully illustrates the semantics of the program. This example data evolves automatically as the program grows in complexity. This systematic example data can help in detecting errors and its sources early.
One other advantage of using Pig is that there is no need to perform an elaborate data import process prior to parsing the data into tuples as in conventional database management systems. What it implies is, if you have a data file, the Pig Latin queries can be run on it directly without importing it. Without importing means that the data can be accessed and queried in any format as long as it can be read by Pig as tuples. We don't need to import data as we do it while working with a database, for example, importing a CSV file into a database before querying it. Still, you need to provide a function to parse the content of the file into tuples.
Understanding the relevance of Pig in the enterprise
In the current enterprises, the Big Data processing cycle is remarkable for its complexity and it widely differs from a traditional data processing cycle. The data collected from a variety of data sources is loaded to a target platform; then a base level analysis is performed so that a discovery happens through a metadata layer being applied to the data. This will result in the creation of a data structure or schema for the content in order to discover the context and relevance of the data. Once the data structure is applied, the data is then integrated, transformed, aggregated, and prepared to be analyzed. This reduced and structured dataset is used for reporting and ad hoc analytics. The result from the process is what provides insights into the data and any associated context (based on the business rules processed). Hadoop can be used as a processing and storage framework at each of the stages.
The following diagram shows a typical Big Data processing flow:
data:image/s3,"s3://crabby-images/b1b74/b1b74c02520198fc7d11b214e66087d5b2a86582" alt="Understanding the relevance of Pig in the enterprise"
Big Data in the enterprise
The role of Pig as per the preceding diagram is as follows:
- In the collect phase, Pig is used to interface with the acquired data from multiple sources including real-time systems, near-real-time systems, and batch-oriented applications. Another way to use Pig is to process the data through a knowledge discovery platform, which could be upstream and store the subset of the output rather than the whole dataset.
- Pig is used in the data discovery stage where Big Data is first analyzed and then processed. It is in this stage that Big Data is prepared for integration with the structured analytical platforms or the data warehouse. The discovery and analysis stage consists of tagging, classification, and categorization of data, which closely resembles the subject area and results in the creation of data model definition or metadata. This metadata is the key to decipher the eventual value of Big Data through analytical insights.
- Pig is used in the data processing phase, where the context of the data is processed to explore the relevance of the data within the unstructured environment; this relevance would facilitate the application of appropriate metadata and master data in Big Data. The biggest advantage of this kind of processing is the ability to process the same data for multiple contexts, and then looking for patterns within each result set for further data mining and data exploration. For example, consider the word "cold", the context of the word has to be ascertained correctly based on the usage, semantics, and other relevant information. This word can be related to the weather or to a common disease. After getting the correct context for this word, further master data related to either weather or common diseases can be applied on the data.
- In the processing phase, Pig can also be used to perform data integration right after the contextualization of data, by cleansing and standardizing Big Data with metadata, master data, and semantic libraries. This is where the data is linked with the enterprise dataset. There are many techniques to link the data between structured and unstructured datasets with metadata and master data. This process is the first important step in converting and integrating the unstructured and raw data into a structured format. This is the stage where the power of Pig is used extensively for data transformation, and to augment the existing enterprise data warehouse by offloading high volume, low value data, and workloads from the expensive enterprise data warehouse.
- As the processing phase in an enterprise is bound by tight SLAs, Pig, being more predictable and having the capability to integrate with other systems, makes it more suitable to regularly schedule data cleansing, transformation, and reporting workloads.
Pig scores in situations where incoming data is not cleansed and normalized. It gracefully handles situations where data schemas are unknown until runtime or are inconsistent. Pig's procedural language model and schema-less approach offers much more flexibility and efficiency in data access so that data scientists can build research models on the raw data to quickly test a theory.
Pig is typically used in situations where the solution can be expressed as a Directed Acyclic Graph (DAG), involving the combination of standard relational operations of Pig (join, aggregation, and so on) and utilizing custom processing code via UDFs written in Java or a scripting language. This implies that if you have a very complex chain of tasks where the outputs of each job feeds as an input to the next job, Pig makes this process of chaining the jobs easy to accomplish.
Pig is useful in Big Data workloads where there is one very large dataset, and processing on that dataset includes constantly adding in new small pieces of data that will change the state of the large dataset. Pig excels in combining the newly arrived data so that the whole of the data is not processed, but only the delta of the data along with the results of the large data is processed efficiently. Pig provides operators that perform this incremental processing of data in a reasonable amount of time.
Other than the previously mentioned traditional use cases where Pig is generally useful, Pig has the inherent advantage in the form of much less development time needed to write and optimize code than to write in Java MapReduce. Pig is a better choice when performing optimization-by-hand is tedious. Pig's extensibility capabilities, through which you can integrate your existing executable and UDFs with Pig Latin scripts, enables even faster development cycles.
Working of Pig – an overview
This subsection is where an example Pig script gets dissected threadbare and is explained to illustrate the language features of Pig.
This subsection helps you to get a very quick understanding of booting Pig into the action mode by installing and configuring it.
The primary prerequisite for Pig to work in a Hadoop cluster is to maintain the Hadoop version compatibility, which in essence means Pig 0.11.0 works with Hadoop versions 0.20.X, 1.X, 0.23.X, and 2.X. This is done by changing the directory for HADOOP_HOME. The following table shows version compatibility between Apache Pig and Hadoop.
The following table summarizes the Pig versus Hadoop compatibility:
data:image/s3,"s3://crabby-images/cde41/cde417f787b303f73c2221357fee940326ed2b69" alt=""
Pig core is written in Java and it works across operating systems. Pig's shell, which executes the commands from the user, is a bash script and requires a UNIX system. Pig can also be run on Windows using Cygwin and Perl packages.
Java 1.6 is also mandatory for Pig to run. Optionally, the following can be installed on the same machine: Python 2.5, JavaScript 1.7, Ant 1.7, and JUnit 4.5. Python and JavaScript are for writing custom UDFs. Ant and JUnit are for builds and unit testing, respectively. Pig can be executed with different versions of Hadoop by setting HADOOP_HOME to point to the directory where we have installed Hadoop. If HADOOP_HOME is not set, Pig will run with the embedded version by default, which is currently Hadoop 1.0.0.
The following table summarizes the prerequisites for installing Pig (we have considered major versions of Pig until 0.9.1):
data:image/s3,"s3://crabby-images/48708/48708d3d425a50f5ad555e566e99fcf4e3de764c" alt=""
Pig is typically installed in a machine, which is not a part of the Hadoop cluster. This can be a developer's machine, which has connectivity to the Hadoop cluster. This machine is called a gateway or edge machine.
The installation of Pig is a straightforward process. Download Pig from your favorite distribution site, be it Apache, Cloudera, or Hortonworks and follow the instructions specified in the installation guide specific to the distribution. These instructions generally involve steps to untar the tarball in a directory of your choice and setting the only configuration required, which is the JAVA_HOME property to the location that contains the Java distribution.
To verify if Pig was indeed installed correctly, try the command $ pig -help
.
Pig can be run in two modes: local and MapReduce.
- The local mode: To run Pig in the local mode, install this mode on a machine where Pig is run using your local File System. The
-x
local flag is used to denote the local mode ($ pig -x local ...
). The result of this command is the Pig shell called Grunt where you can execute command lines and scripts. The local mode is useful when a developer wants to prototype, debug, or use small data to quickly perform a proof of concept locally and then apply the same code on a Hadoop cluster (the MapReduce mode).$ pig -x local ... - Connecting to ... grunt>
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
- The MapReduce mode: This mode is used when you need to access a Hadoop cluster and run the application on it. This is the default mode and you can specify this mode using the
-x
flag ($ pig
or$ pig -x mapreduce
). The result of this command is the Pig shell called Grunt where you can execute commands and scripts.$ pig -x mapreduce ... - Connecting to ... grunt>
You can also perform the following code snippet instead of the previous one:
$ pig ... - Connecting to ... grunt>
It is important to understand that in both the local and MapReduce modes, Pig does the parsing, checking, compiling, and planning locally. Only the job execution is done on the Hadoop cluster in the MapReduce mode and on the local machine in the local mode. This implies that parallelism cannot be evidenced in the local mode.
In the local and MapReduce mode, Pig can be run interactively and also in the batch mode. Running Pig interactively implies executing each command on the Grunt shell, and running it in the batch mode implies executing the combination of commands in a script file (called Pig script) on the Grunt shell.
Here is a quick example of the interactive mode:
grunt> raw_logs_Jul = LOAD 'NASA_access_logs/Jul/access_log_Jul95' USING ApacheCommonLogLoader AS (jaddr, jlogname, juser, jdt, jmethod, juri, jproto, jstatus, jbytes); grunt> jgrpd = GROUP raw_logs_Jul BY DayExtractor(jdt); grunt> DESCRIBE jgrpd;
Please note that in the previous example, each of the Pig expressions are specified on the Grunt shell. Here is the example for the batch mode execution:
grunt> pigexample.pig
In the previous example, a Pig script (pigexample.pig
) is created initially and it is executed on the Grunt shell. Pig scripts can also be executed outside the grunt shell at the command prompt. The following is the method to do it:
$>pig <filename>.pig (mapreduce mode)
You can also use the following code line instead of the previous one:
$>pig –x local <filename>.pig (local mode)
This section covers a quick introduction of the use case. Log data is generated by nearly every web-based software application. The applications log all the events into logfiles along with the timestamps at which the events occurred. These events may include changes to system configurations, access device information, information on user activity and access locations, alerts, transactional information, error logs, and failure messages. The value of the data in logfiles is realized through the usage of Big Data processing technologies and is consistently used across industry verticals to understand and track applications or service behavior. This can be done by finding patterns, errors, or suboptimal user experience, thereby converting invisible log data into useful performance insights. These insights can be leveraged across the enterprise with use cases providing both operational and business intelligence.
The Pig Latin script in the following Code listing section loads two month's logfiles, analyses the logs, and finds out the number of unique hits for each day of the month. The analysis results in two relations: one for July and the other for August. These two relations are joined on the day of month that produces an output where we can compare number of visits by day for each month (for example, the number of visits on the first of July versus the number of visits on the first of August).
The following is the complete code listing:
-- Register the jar file to be able to use the UDFs in it REGISTER 'your_path_to_piggybank/piggybank.jar'; /* Assign aliases ApacheCommonLogLoader, DayMonExtractor, DayExtractor to the CommonLogLoader and DateExtractor UDFs */ DEFINE ApacheCommonLogLoader org.apache.pig.piggybank.storage.apachelog.CommonLogLoader(); DEFINE DayMonExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('dd/MMM/yyyy:HH:mm:ss Z','dd-MMM'); DEFINE DayExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('dd-MMM','dd'); /* Load July and August logs using the alias ApacheCommonLogLoader into the relations raw_logs_Jul and raw_logs_Aug */ raw_logs_Jul = LOAD '/user/cloudera/pdp/datasets/logs/NASA_access_logs/Jul/access_log_Jul95' USING ApacheCommonLogLoader AS (jaddr, jlogname, juser, jdt, jmethod, juri, jproto, jstatus, jbytes); raw_logs_Aug = LOAD '/user/cloudera/pdp/datasets/logs/NASA_access_logs/Aug/access_log_Aug95' USING ApacheCommonLogLoader AS (aaddr, alogname, auser, adt, amethod, auri, aproto, astatus, abytes); -- Group the two relations by date jgrpd = GROUP raw_logs_Jul BY DayMonExtractor(jdt); DESCRIBE jgrpd; agrpd = GROUP raw_logs_Aug BY DayMonExtractor(adt); DESCRIBE agrpd; -- Count the number of unique visits for each day in July jcountd = FOREACH jgrpd { juserIP = raw_logs_Jul.jaddr; juniqIPs = DISTINCT juserIP; GENERATE FLATTEN(group) AS jdate,COUNT(juniqIPs) AS jcount; } -- Count the number of unique visits for each day in August acountd = FOREACH agrpd { auserIP = raw_logs_Aug.aaddr; auniqIPs = DISTINCT auserIP; GENERATE FLATTEN(group) AS adate,COUNT(auniqIPs) AS acount; } -- Display the schema of the relations jcountd and acountd DESCRIBE jcountd; DESCRIBE acountd; /* Join the relations containing count of unique visits in July and August where a match is found for the day of the month */ joind = JOIN jcountd BY DayExtractor(jdate), acountd BY DayExtractor(adate); /* Filter by removing the records where the count is less than 2600 */ filterd = FILTER joind BY jcount > 2600 and acount > 2600; /* Debugging operator to understand how the data passes through FILTER and gets transformed */ ILLUSTRATE filterd; /* Sort the relation by date, PARALLEL specifies the number of reducers to be 5 */ srtd = ORDER filterd BY jdate,adate PARALLEL 5; -- Limit the number of output records to be 5 limitd = LIMIT srtd 5; /* Store the contents of the relation into a file in the directory unique_hits_by_month on HDFS */ STORE limitd into '/user/cloudera/pdp/output/unique_hits_by_month';
As an illustration, we would be using logs for two month's web requests in a web server at NASA. These logs were collected from July 1 to 31, 1995 and from August 1 to 31, 1995. The following is the description of the fields in the files:
- Hostname (or the Internet address), which initiates the request, for example, 109.172.181.143 in the next code snippet.
- Logname is empty in this dataset and is represented by
–
in the next code snippet. - The user is empty in this dataset and is represented by
–
in the next code snippet. - The timestamp is in the
DD/MMM/YYYY HH:MM:SS
format. In the next code snippet, the time zone is-0400
, for example,[02/Jul/1995:00:12:01 -0400]
. - HTTP request is given in quotes, for example,
GET /history/xxx/ HTTP/1.0
in the next code snippet. - HTTP response reply code, which is
200
in the next code snippet.
The snippet of the logfile is as follows:
109.172.181.143 - - [02/Jul/1995:00:12:01 -0400] "GET /history/xxx/ HTTP/1.0" 200 6545