Article Preview
TopIntroduction
Data intensive computing applications are increasingly relying on MapReduce (Dean & Ghemawat, 2008) platforms such as Hadoop (Bialecki, Cafarella, Cutting, & O’Malley, 2005), Dryad (Isard, Budiu, Yu, Birrell, & Fetterly, 2007), Hive (Thusoo et al., 2009), Pig (Olston, Reed, Srivastava, Kumar, & Tomkins, 2008) for achieving scalability. Systems such as Hive and Pig provide structured data processing primitives and limited automatic optimization techniques reminiscent of relational database systems. In these systems, a script describes a set of desired data operations (and their relationships) in a high-level language, which is subsequently compiled into a MapReduce workflow whose execution is coordinated over a Hadoop cluster.
Each MapReduce cycle implements the functionality of a subset of operations given in the script. For each such cycle, input is read in from the Hadoop distributed file system (HDFS), and partitioned across a set of slave nodes that act as mappers. Once the mappers finish execution of their function, a shuffle phase sorts, partitions and stores intermediate map output to local disks. The reducer (slave) nodes contact all mappers, and read and transfer their assigned partitions from the mapper nodes. When the reducer tasks complete, the output is saved back onto the HDFS, from which the next Map phase may read its input. Hence, it is clear that besides the amount of original input data, the amount of data materialized, sorted and transferred over the network during the shuffle phase have a significant impact on the overall costs of processing (Dittrich et al., 2010; Rao, Ramakrishnan, Silberstein, Ovsiannikov, & Reeves, 2012). While some initial data processing operations such as filtering steps reduce the original size of data, some other operations such as the relational join operations are state-producing where outputs could be larger than inputs. Consequently, managing intermediate results in such situations is very important for workloads that are join-intensive.
When processing graph or semi-structured data using relational frameworks, data is typically captured as “thin” relations as () for graph structured data or () in the Semantic Web parlance. The lack of uniform structure in such data makes it harder to model them as “fatter” relations representing a collection of common attributes. The fine-grained model results in the need for multiple join operations for assembling related data.
Systems such as Pig and Hive translate such queries into execution plans in which each cycle processes a set of joins that are on the same column. In graph-oriented view, such joins can be viewed as a star structure and are sometimes called star-joins. Join SJ1 in Figure 1 is a star-join between relations - , , and , to reassemble tuples related to a product's label, property, and features, respectively. Join J1' is the join linking SJ1 with another star subquery SJ2 . Thus, the MapReduce execution plan for this query will have 3 MR cycles. In general, such plans will have one MR cycle for each star-join, and (- 1) MR cycles to join the star subqueries. For graph-oriented data queries it is not unusual to have significantly larger than 2. This leads to longer data processing workflows. Given the I/O and network transfer costs associated with each cycle, longer data processing workflows are inherently expensive. Further, the costs compound across cycles for data processing workflows involving multiple state-producing (join) operations.