No relevant resource is found in the selected language.

This site uses cookies. By continuing to browse the site you are agreeing to our use of cookies. Read our privacy policy>Search

Reminder

To have a better experience, please upgrade your IE browser.

upgrade

FusionInsight HD V100R002C60SPC200 Product Description 06

Rate and give feedback:
Huawei uses machine translation combined with human proofreading to translate this document to different languages in order to help you better understand the content of this document. Note: Even the most advanced machine translation cannot match the quality of professional translators. Huawei shall not bear any responsibility for translation accuracy and it is recommended that you refer to the English document (a link for which has been provided).
Spark

Spark

Basic Concept

Overview

Spark is a distributed computing framework based on memory. In iterative computation scenarios, data is stored in memory when being processed, and providing performance that is 10 to 100 times higher than that of MapReduce. Spark uses the Hadoop Distributed File System(HDFS), enabling users to quickly switch to Spark from MapReduce. Spark provides a one-stop data analysis capability and supports flow-based processing in small batches, offline processing in batches, structured query language(SQL)query, and data mining. Users can seamlessly use these functions in an application.

Features of Spark are as follows:

  • Improves the data processing capability through distributed memory computing and DAG(Directed Acyclic Graph)execution engine. The delivered performance is 10 to 100 times that of MapReduce.
  • Provides multiple development interfaces(Scala/Java/Python)and dozens of highly abstracted operators. Building distributed data processing applications is much easier.
  • Builds data processing stacks using SQL, Streaming, MLlib, and GraphX to provide one-stop data processing capabilities.
  • Fits into the Hadoop ecosystem, allowing Spark applications to run on Standalone, Mesos, or YARN, enabling access of multiple data sources such as HDFS, HBase, and Hive, and supporting smooth migration of the MapReduce application to Spark.
Architecture

Figure 2-32 describes the Spark architecture and Table 2-10 lists the Spark modules.

Figure 2-32 Spark architecture
Table 2-10 Basic concept instruction

Module

Description

Cluster Manager

Indicates the manager of a cluster, which manages all resources in the cluster. Spark supports multiple cluster managers, including Mesos, YARN, and the Standalone cluster manager that is delivered with Spark. By default, Huawei Spark clusters adopts the YARN mode.

Application

Indicates a Spark application, which consists of one Driver Program and multiple Executors.

Deploy Mode

Indicates the deployment mode, which can be cluster or client. When the cluster mode is in use, Driver runs on the nodes in the cluster. When the client mode is in use, Driver runs on a client outside the cluster.

Driver Program

Indicates the main process of the Spark application, runs the main() function of applications and creates SparkContext. Used for parsing applications, generating Stage, and scheduling tasks to Executors. Usually, SparkContext represents the Driver Program.

Executor

Indicates the process that runs on Work Node to start Task and manage and process the data used by applications. One Spark application generally consists of multiple Executors. Each Executor receives instructions from Driver and executes one to multiple Tasks.

Worker Node

Indicates the node that starts and manages Executor and resources.

Job

Consists of multiple concurrent Tasks. One Action operation (for example, collect) maps to one job.

Stage

Indicates a Task collection. It is split from DAG. One Job consists of multiple Stages.

Task

Carries the computation unit of the service logic. It is the minimum working unit that can be executed on the Spark platform. An application can be divided into multiple tasks based on the execution plan and computation amount.

Principle

Figure 2-33 describes the application running architecture of Spark.

  1. An application is running in the cluster as a collection of processes. Driver is responsible for coordination of applications.
  2. To run an application, Driver connects to the cluster manager (such as Standalone, Mesos, and YARN) to apply for the Executor resources, and start ExecutorBackend. The cluster manager schedules resources between different applications. Driver schedules DAGs, divides stages, and generates task for the application at the same time.
  3. Then Spark sends the codes of the application (the codes transferred to SparkContext, which are defined by JAR or Python) to Executor.
  4. After all tasks are finished, the running of the user application is stopped.
Figure 2-33 Spark application running architecture

Spark uses Master and Worker modes, as shown in Figure 2-34. A user submits an application on the Spark client, and then the scheduler divides a job into multiple tasks and sent the tasks to each Worker for execution. Each Worker reports the computation results to Driver (the master), and then Driver aggregates and returns the results to the client.

Figure 2-34 Spark Master and Worker nodes

Pay attention to the following information about this architecture:

  • Applications are isolated from each other.

    Each application has an independent Executor process and can execute tasks on multiple threads. Each Driver independently schedules the tasks in terms of scheduling and Executor. Different applications run on different JVMs, that is, different applications run on different Executors.

  • Different Spark applications do not share data,unless data is stored in the external storage system such as HDFS.
  • You are advised to deploy the Driver program in a location that is close to the Worker node because the startup program schedules tasks in the cluster. For example, deploy the Driver program on the network where the Worker node is located.

There are two deploy modes that can be used to launch Spark applications on YARN.

  • In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application.
  • In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
Spark Streaming Principle

Spark Streaming is a real-time computing framework built on the Spark, which expands the capability for processing massive streaming data.

Computation Process

When a Spark Streaming application starts (namely, a driver starts), the relevant StreamingContext (basis of all streaming functions) uses SparkContext to start Receiver as a resident running task. Receiver receives and stores streaming data to the Spark memory for processing. The following figure shows the data transfer life cycle.

Figure 2-35 Data transfer life cycle
  1. Receive data (blue arrow).

    Receiver divides a data stream into a series of blocks and stores them to the Executor memory. In addition, after Receiver is enabled, it writes data into write-ahead logs (WALs) in the fault-tolerant file system.

  2. Notify Driver (green arrow).

    Metadata in Receiver is sent to StreamingContext in Driver. The metadata includes:

    • Block reference ID used to locate the data position in the Executor memory.
    • Block data deviation information in logs (if the WAL function is enabled).
  3. Process data (red arrow).

    For each batch of data, StreamingContext generates Resilient Distributed Datasets (RDDs) and their jobs using Block at an interval between two data batches. StreamingContext executes Job by processing Block in the memory of Executor.

  4. Periodically set checkpoints (orange arrow).
  5. To meet tolerant requirements, StreamingContext periodically sets checkpoints and stores them to another file system.

Fault Tolerance

Spark and the abstract design of its RDD can seamlessly process any worker node in the cluster. Spark Streaming is built on top of Spark. Therefore, the worker node of Spark Streaming also has the same fault tolerance capability. However, Spark Streaming needs to run properly in case of long-time running. Therefore, Spark must be able to recover user-defined applications from faults through Driver (main process that coordinates all workers). Making the Spark driver fault tolerant is challenging because the Spark driver may be any user application implemented in any computation mode. Spark Streaming has internal computation architecture. That is, it periodically executes the same Spark computation in each batch data segment. Such architecture allows it to periodically store checkpoints to reliable storage space and recover them upon the restart of Driver.

For source data such as files, the recovery mechanism of Driver can achieve zero data loss because all data is stored in fault-tolerant file systems such as HDFS or S3. However, for data sources such as Kafka and Flume, some received data is stored in the memory and is not processed. Such data may lose, which is caused by the distributed operation mode of Spark applications. When Driver fails, all Executors running on the Cluster Manager and their data stored in the memory are terminated. Before being processed, all data received by Spark Streaming from data sources such as Kafka and Flume is stored in the Executor memory. The preceding data cannot be recovered even if Driven restarts. The write-ahead log (WAL) function is added to Spark Streaming to avoid such data loss.

WAL is often used in databases and file systems to ensure persistence of any data operation. The WAL mechanism is: first record an operation to a persistent log and perform this operation on data. If the intermediate system fails during the operation, the system recovers after Spark Streaming reads logs and performs the predefined operation. The following describes how to use WAL to ensure persistence of received data:

Receiver is used to receive data from data sources such as Kafka. As a resident running task in Executor, Receiver receives data. It also confirms received data if supported by data sources. Received data is stored in the Executor memory, and Driver delivers a task to Executor for processing.

After WAL is enabled, all received data is stored to log files in the fault-tolerant file system. Therefore, the received data does not lose even if Spark Streaming fails. Besides, Receiver checks correctness of received data only after the data is pre-written into logs. Data that is cached but not stored can be sent again by data sources after Driver restarts. These two mechanisms ensure zero data loss. That is, all data is recovered from logs or re-sent by data sources.

To enable the WAL function, perform the following operations:
  • Set streamingContext.checkpoint to configure the checkpoint directory, which is an HDFS file path used to store streaming checkpoints and WALs.
  • Set the spark.streaming.receiver.writeAheadLog.enable parameter of Spark Streaming to true (default value: false).
After WAL is enabled, all Receivers have the advantage of recovering from reliable received data. You are advised to disable the in-memory replication mechanism because the fault-tolerant file system of WAL may also replicate data.
NOTE:

The data receiving throughput is lowered after WAL is enabled. All data is written into the fault-tolerant file system. As a result, the write throughput of the file system and the network bandwidth for data replication may become the potential bottleneck. To solve this problem, you are advised to create more Receivers to increase the degree of data receiving parallelism or use better hardware to improve the throughput of the fault-tolerant file system.

Recovery Process

Restart a failed Driver as follows:

Figure 2-36 Computation recovery process
  1. Restart computation (orange arrow).

    Use checkpoint information to restart Driver, reconstruct SparkContext and restart Receiver.

  2. Recover metadata blocks (green arrow).

    All essential metadata blocks must be recovered to ensure that the process continues.

  3. Relaunch unfinished jobs (red arrow).

    Recovered metadata is used to generate RDDs and corresponding jobs for batch processing unfinished due to failures.

  4. Read block data stored in logs (blue arrow).

    Block data is directly read from WALs during execution of the preceding jobs, and therefore all essential data reliably stored in logs is recovered.

  5. Resend unconfirmed data (purple arrow).

    Data that is cached but not stored to logs upon failures is re-sent by data sources, because Receiver does not confirm the data.

Therefore, by using WALs and reliable Receiver, Spark Streaming can avoid input data loss caused by Driver failures.

SparkSQL Principle

SparkSQL

Figure 2-37 SparkSQL and DataFrame

Spark SQL is a module for processing structured data. In Spark application, SQL statements or DataFrame APIs can be seamlessly used for querying structured data.

Spark SQL and DataFrame also provide a universal method for accessing multiple data sources such as Hive, Avro, Parquet, ORC, JSON, and JDBC. These data sources also allow data interaction. Spark SQL reuses the Hive frontend processing logic and meta data processing module. With the Spark SQL, you can directly query existing Hive data.

In addition, Spark SQL also provides API, CLI, and JDBC interfaces allowing diverse inputs to the client.

DataFrame

DataFrame is a structured distributed data set composed of several columns, which is similar to a table in the relationship database or the data frame in R/Python. DataFrame is a basic concept in Spark SQL, and can be created by using multiple methods, such as structured data set, Hive table, external database, or RDD.

The Spark SQL portal is the SQLContext class (or its child class). When SQLContext is created, a SparkContext object is required serving as the construction parameter. SQLContext has a child class HiveContext. Compared with SQLContext, HiveContext has the functions of HiveQL parser, UDF, and reading existing Hive data. However, HiveContext does not reply on the running Hive, but only relies on the Hive class library.

With SQLContext and its child classes, you can create SparkSQL basic data set DataFrame. DataFrame provides diverse upstream programming interfaces and is compatible with many downstream data sources such as Parquet, JSON, Hive, Database, and HBase. All these data sources can be read by using a uniform syntax.

CLI and JDBCServer

Excluding programming APIs, Spark SQL also provides the CLI/JDBC interface. The spark-shell and spark-sql scripts can provide the CLI for debugging. JDBCServer provides the JDBC interface for directly sending external JDBC requests to compute and parse structured data.

Basic Concepts
  • RDD

    Resilient Distributed Dataset (RDD) is a core concept of Spark. It indicates a read-only and partitionable distributed dataset. Partial or all data of this dataset can be cached in the memory and reused between computations.

    RDD Generation

    • An RDD can be generated from the Hadoop file system or other storage systems that are compatible with Hadoop, such as Hadoop Distributed File System (HDFS).
    • A new RDD can be transferred from a parent RDD.
    • An RDD can be converted from a collection.

    RDD Storage

    • Users can select different storage levels to store an RDD for reuse. (There are 11 storage levels to store an RDD.)
    • The current RDD is stored in the memory by default. When the memory is insufficient, the RDD overflows to the disk.
  • RDD Dependency

    The RDD dependency includes the narrow dependency and wide dependency.

    Figure 2-38 RDD dependency
    • Narrow dependency: Each partition of the parent RDD is used by at most one partition of the child RDD partition.
    • Wide dependency: Partitions of the child RDD depends on all partitions of the parent RDD due to shuffle operations.

    The narrow dependency facilitates the optimization. Logically, each RDD operator is a fork/join process. Fork the RDD to each partition, and then perform the computation. After the computation, join the results, and then perform the fork/join operation on next operator. It takes a long period of time to directly translate the RDD to physical implementation. There are two reasons: Each RDD (even the intermediate results) must be physicalized to the memory or storage, which takes time and space; the partitions can be joined only when the computation of all partitions is complete (if the computation of a partition is slow, the entire join process is slowed down). If the partitions of the child RDD narrowly depend on the partitions of the parent RDD, the two fork/join processes can be combined to optimize the entire process. If the relationship in the continuous operator sequence is narrow dependency, multiple fork/join processes can be combined to reduce the time for waiting and improve the performance. This is called pipeline optimization in Spark.

  • Transformation and Action (RDD Operations)

    Operations on RDD include transformation (the returned value is an RDD) and action (the returned value is not an RDD). Figure 2-39 shows the process. The transformation is lazy, which indicates that the transformation from one RDD to another RDD is not immediately executed. Spark only records the transformation but does not execute it immediately. The real computation is started only when the action is started. The action returns results or writes the RDD data into the storage system. The action is the driving force for Spark to start the computation.

    Figure 2-39 RDD operation

    The data and operation model of RDD are quite different from those of Scala.

    val file = sc.textFile("hdfs://...") val errors = file.filter(_.contains("ERROR")) errors.cache() errors.count()
    1. The textFile operator reads log files from the HDFS and returns file (as an RDD).
    2. The filter operator filters rows with ERROR and assigns them to errors (a new RDD). The filter operator is a transformation.
    3. The cache operator caches errors for future use.
    4. The count operator returns the number of rows of errors. The count operator is an action.
    Transformation includes the following types:
    • The RDD elements are regarded as simple elements:

      The input and output have the one-to-one relationship, and the partition structure of the result RDD remains unchanged, for example, map.

      The input and output have the one-to-many relationship, and the partition structure of the result RDD remains unchanged, for example, flatMap (one element becomes a sequence containing multiple elements and then flattens to multiple elements).

      The input and output have the one-to-one relationship, but the partition structure of the result RDD changes, for example, union (two RDDs integrates to one RDD, and the number of partitions becomes the sum of the number of partitions of two RDDs) and coalesce (partitions are reduced).

      Operators of some elements are selected from the input, such as filter, distinct (duplicate elements are deleted), subtract (elements only exist in this RDD are retained), and sample (samples are taken).

    • The RDD elements are regarded as Key-Value pairs.

      Perform the one-to-one calculation on the single RDD, such as mapValues (the partition mode of the source RDD is retained, which is different from map).

      Sort the single RDD, such as sort and partitionBy (partitioning with consistency, which is important to the local optimization).

      Restructure and reduce the single RDD based on key, such as groupByKey and reduceByKey.

      Join and restructure two RDDs based on key, such as join and cogroup.

      NOTE:

      The later three operations involve sorting and are called shuffle operations.

    Action is classified into the following types:

    • Generate scalar configuration items, such as count (the number of elements in the returned RDD), reduce, fold/aggregate (the number of scalar configuration items that are returned), and take (the number of elements before the return).
    • Generate the Scala collection, such as collect (import all elements in the RDD to the Scala collection) and lookup (look up all values corresponds to the key).
    • Write data to the storage, such as saveAsTextFile (which corresponds to the preceding textFile).
    • Check points, such as checkpoint. When Lineage is quite long (which occurs frequently in graphics computation), it takes a long period of time to execute the whole sequence again when a fault occurs. In this case, checkpoint is used as the check point to write the current data to stable storage.
  • Shuffle

    Shuffle is a specific phase in the MapReduce framework, which is located between the Map phase and the Reduce phase. If the output results of Map are to be used by Reduce, the output results must be hashed based on the key and distributed to each Reducer. This process is called Shuffle. Shuffle involves the read and write of the disk and the transmission of the network, so that the performance of Shuffle directly affects the operation efficiency of the entire program.

    The following figure describes the entire process of the MapReduce algorithm.

    Figure 2-40 Algorithm process

    Shuffle is a bridge connecting data. The following describes the implementation of shuffle in Spark.

    Shuffle divides the Job of a Spark into multiple stages. The former stages contain one or multiple ShuffleMapTasks, and the last stage contains one or multiple ResultTasks.

  • Spark Application Structure

    The Spark application structure includes the initial SparkContext and the main program.

    • Initial SparkContext: constructs the operation environment of the Spark application.

      Constructs the SparkContext object, for example,

      new SparkContext(master, appName, [SparkHome], [jars])

      Parameter description

      master: indicates the link string. The link modes include local, YARN-cluster, and YARN-client.

      appName: indicates the application name.

      SparkHome: indicates the directory where Spark is installed in the cluster.

      jars: indicates the code and dependency package of the application.

    • Main program: processes data.

    For submitting applications details, see https://spark.apache.org/docs/1.5.1/submitting-applications.html

  • Spark shell Command

    The basic Spark shell command supports the submitting of the Spark application. The Spark shell command is

    ./bin/spark-submit \
     --class <main-class> \
     --master <master-url> \ 
    ... # other options 
     <application-jar> \ 
     [application-arguments]

    Parameter description:

    --class: indicates the name of the class of the Spark application.

    --master: indicates the master that the Spark application links, such as YARN-client and YARN-cluster.

    application-jar: indicates the path of the jar package of the Spark application.

    application-arguments: indicates the parameter required to submit the Spark application. (This parameter can be empty.)

  • Spark JobHistory Server

    The Spark Web UI is used to monitor the details in each phase of the Spark framework of a running or historical Spark job and provide the log display, which helps users to develop, configure, and optimize the job in more fine-grained units.

HA Overview

Background

The Thrift JDBC service similar to HiveServer2 is implemented in the Spark. You can use Beeline and JDBC interfaces for access.

The Spark JDBC HA feature is implemented based on the Thrift Server existing in the Open Source Community.

Implementation

The following figure shows the basic principle of Spark Thrift Server HA.

Figure 2-41 Spark Thrift Server HA

Upon startup, a Thrift Server process registers its own information with the Zookeeper and starts election. If the Thrift Server is elected as the active node, it opens ports to listen on requests from clients and writes its own host and port information into the ZooKeeper to be used by clients for connection. If the Thrift Server is elected as the standby node, it listens on status of the active Thrift Server in the ZooKeeper and takes over when the active Thrift Server fails.

The client connection is as follows:

Clients can use Beeline to connect to the Thrift Server in HA mode. The difference between a connection string used in HA mode and that in non-HA mode lies in that ip:port is replaced with ha-cluster in HA mode. Table 2-11 lists parameters involved.

Table 2-11 Client parameter list

Parameter

Description

Default Value

spark.thriftserver.ha.enabled

Indicates whether to connect to the Thrift Server in HA mode. Set this parameter to true and change host:port in the connection string to ha-cluster.

false

spark.thriftserver.zookeeper.dir

Path where ThriftServer saves metadata on ZooKeeperThe configuration must be the same as that on the server.

/thriftserver

spark.deploy.zookeeper.url

URL of ZooKeeper. The configuration must be the same as that on the server.

N/A

spark.thriftserver.retry.times

Maximum number of attempts to connect to the server.

5

spark.thriftserver.retry.wait.time

Interval between two attempts to connect to the server, in seconds.

10

Parameters listed in Table 2-11 must be set in the hive-site.xml file in classpath on the client. An example is as follows:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<configuration>
    <property>
        <name>spark.thriftserver.ha.enabled</name>
        <value>true</value>
    </property>
</configuration>

The spark.deploy.zookeeper.url parameter can be replaced by zk.quorum in the connection string. An example is as follows:

!connect jdbc:hive2://ha-cluster/default;zk.quorum=spark25:2181,spark26:2181,spark27:2181

Other usage modes are the same as those in non-HA mode. For details, see https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients.

Relationship with Other Components

Relationship Between Spark and HDFS

Data computed by Spark can come from multiple data sources, such as Local files and HDFS. Most data computed by Spark comes from the HDFS. The HDFS can read data in large scale for parallel computation. After being computed, data can be stored in the HDFS.

Spark involves Driver and Executor. Driver schedules tasks and Executor runs tasks.

Figure 2-42 describes the file reading process.

Figure 2-42 File reading process
The file reading process is as follows:
  1. Driver interconnects with the HDFS to obtain the information of File A.
  2. The HDFS returns the detailed block information about this file.
  3. Driver sets a parallel degree based on the block data amount, and creates multiple tasks to read the blocks of this file.
  4. Executor runs the tasks and reads the detailed blocks as part of the Resilient Distributed Dataset (RDD).

Figure 2-43 describes the file writing process.

Figure 2-43 File writing process
The file writing process is as follows:
  1. Driver creates a directory where the file is to be written.
  2. Based on the RDD distribution status, the number of tasks related to data writing is computed, and these tasks are sent to Executor.
  3. Executor runs these tasks, and writes the RDD data to the directory created in step 1.
Relationship Between Spark and YARN

The Spark computing and scheduling can be implemented using YARN mode. Spark enjoys the computing resources provided by YARN clusters and runs tasks in a distributed way. Spark on YARN involves two modes: yarn-cluster and yarn-client.

  • yarn-cluster mode

    Figure 2-44 describes the operation framework.

    Figure 2-44 Spark on yarn-cluster operation framework

    Spark on yarn-cluster implementation process:

    1. The client generates the Application information, and then sends the information to ResourceManager.
    2. ResourceManager allocates a Container(ApplicationMaster) to Spark Application, and then starts Driver on the Container node.
    3. ApplicationMaster applies for resources from ResourceManager to run Executor.

      ResourceManager allocates a Container to ApplicationMaster. Then ApplicationMaster communicates with the related NodeManager, and starts Executor on the obtained Container. After Executor is started, it registers in Driver and applies for tasks.

    4. Driver allocates tasks to Executor.
    5. Executor executes tasks and reports the operating status to Driver.
  • yarn-client mode

    Figure 2-45 describes the operation framework.

    Figure 2-45 Spark on yarn-client operation framework

    Spark on yarn-client implementation process:

    NOTE:

    In yarn-client mode, the Driver is deployed and run on the Client.

    1. The client sends the Spark application request to ResourceManager, and ResourceManager returns the results. The results include information such as ApplicationId and the upper limit as well as lower limit of available resources. The client packages all information required to start ApplicationMaster, and sends the information to ResourceManager.
    2. After receiving the request, ResourceManager finds a proper node for ApplicationMaster and starts it on this node. ApplicationMaster is a role in YARN, and the process name in Spark is ExecutorLauncher.
    3. Based on the resource requirements of each task, ApplicationMaster can apply for a series of Containers to run tasks from ResourceManager.
    4. After receiving the newly allocated Container list (from ResourceManager), ApplicationMaster sends information to the related NodeManager to start the Container.

      ResourceManager allocates a Container to ApplicationMaster. Then ApplicationMaster communicates with the related NodeManager, and starts Executor on the obtained Container. After Executor is started, it registers in Driver and applies for tasks.

      NOTE:

      The running containers will not be hung to release resources.

    5. Driver allocates tasks to Executor. Executor executes tasks and reports the operating status to Driver.
Translation
Download
Updated: 2019-04-10

Document ID: EDOC1000104139

Views: 6594

Downloads: 65

Average rating:
This Document Applies to these Products
Related Documents
Related Version
Share
Previous Next