No relevant resource is found in the selected language.

This site uses cookies. By continuing to browse the site you are agreeing to our use of cookies. Read our privacy policy>Search

Reminder

To have a better experience, please upgrade your IE browser.

upgrade

FusionInsight HD 6.5.0 Product Description 02

Rate and give feedback:
Huawei uses machine translation combined with human proofreading to translate this document to different languages in order to help you better understand the content of this document. Note: Even the most advanced machine translation cannot match the quality of professional translators. Huawei shall not bear any responsibility for translation accuracy and it is recommended that you refer to the English document (a link for which has been provided).
Spark2x

Spark2x

Basic Concept

Overview

Spark is a distributed computing framework based on memory. In iterative computation scenarios, data is stored in memory when being processed, and providing performance that is 10 to 100 times higher than that of MapReduce. Spark uses the Hadoop Distributed File System(HDFS), enabling users to quickly switch to Spark from MapReduce. Spark provides a one-stop data analysis capability and supports flow-based processing in small batches, offline processing in batches, structured query language(SQL)query, and data mining. Users can seamlessly use these functions in an application. For new features of Spark2x open-source details, see Spark2x Open-Source New Features.

Features of Spark are as follows:

  • Improves the data processing capability through distributed memory computing and DAG(Directed Acyclic Graph)execution engine. The delivered performance is 10 to 100 times that of MapReduce.
  • Provides multiple development interfaces(Scala/Java/Python) and dozens of highly abstracted operators. Building distributed data processing applications is much easier.
  • Builds data processing stacks using SQL, Streaming, MLlib, and GraphX to provide one-stop data processing capabilities.
  • Fits into the Hadoop ecosystem, allowing Spark applications to run on Standalone, Mesos, or YARN, enabling access of multiple data sources such as HDFS, HBase, and Hive, and supporting smooth migration of the MapReduce application to Spark.
Architecture

Figure 2-91 describes the Spark architecture and Table 2-21 lists the Spark modules.

Figure 2-91 Spark architecture
Table 2-21 Basic concept instruction

Module

Description

Cluster Manager

Indicates the manager of a cluster, which manages all resources in the cluster. Spark supports multiple cluster managers, including Mesos, YARN, and the Standalone cluster manager that is delivered with Spark. By default, Huawei Spark clusters adopt the YARN mode.

Application

Indicates a Spark application, which consists of one Driver Program and multiple Executors.

Deploy Mode

Indicates the deployment mode, which can be cluster or client. When the cluster mode is in use, Driver runs on the nodes in the cluster. When the client mode is in use, Driver runs on a client outside the cluster.

Driver Program

Indicates the main process of the Spark application, runs the main() function of applications and creates SparkContext. Used for parsing applications, generating Stage, and scheduling tasks to Executors. Usually, SparkContext represents the Driver Program.

Executor

Indicates the process that runs on Work Node to start Task and manage and process the data used by applications. One Spark application generally consists of multiple Executors. Each Executor receives instructions from Driver and executes one to multiple Tasks.

Worker Node

Indicates the node that starts and manages Executor and resources.

Job

Consists of multiple concurrent Tasks. One Action operation (for example, collect) maps to one job.

Stage

Indicates a Task collection. It is split from DAG. One Job consists of multiple Stages.

Task

Carries the computation unit of the service logic. It is the minimum working unit that can be executed on the Spark platform. An application can be divided into multiple tasks based on the execution plan and computation amount.

Spark Principle

Figure 2-92 describes the application running architecture of Spark.

  1. An application is running in the cluster as a collection of processes. Driver is responsible for coordination of applications.
  2. To run an application, Driver connects to the cluster manager (such as Standalone, Mesos, and YARN) to apply for the Executor resources, and start ExecutorBackend. The cluster manager schedules resources between different applications. Driver schedules DAGs, divides stages, and generates task for the application at the same time.
  3. Then Spark sends the codes of the application (the codes transferred to SparkContext, which are defined by JAR or Python) to Executor.
  4. After all tasks are finished, the running of the user application is stopped.
Figure 2-92 Spark application running architecture

Spark uses Master and Worker modes, as shown in Figure 2-93. A user submits an application on the Spark client, and then the scheduler divides a job into multiple tasks and sent the tasks to each Worker for execution. Each Worker reports the computation results to Driver (the master), and then Driver aggregates and returns the results to the client.

Figure 2-93 Spark Master and Worker nodes

Pay attention to the following information about this architecture:

  • Applications are isolated from each other.

    Each application has an independent Executor process and can execute tasks on multiple threads. Each Driver independently schedules the tasks in terms of scheduling and Executor. Different applications run on different JVMs, that is, different applications run on different Executors.

  • Different Spark applications do not share data, unless data is stored in the external storage system such as HDFS.
  • You are advised to deploy the Driver program in a location that is close to the Worker node because the startup program schedules tasks in the cluster. For example, deploy the Driver program on the network where the Worker node is located.

There are two deploy modes that can be used to launch Spark applications on YARN.

  • In YARN-Cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application.
  • In YARN-Client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
Spark Streaming Principle

Spark Streaming is a real-time computing framework built on the Spark, which expands the capability for processing mass streaming data. Spark supports two data computing approaches: Direct Streaming and Receiver.

Direct Streaming computing process

In Direct Streaming approach, Direct API is used to process data. Use Kafka Direct interface as an example, Direct API provides offset location that each batch range will read from, which is much simpler than starting a Receiver to continuously receive data from Kafka and written data to the WAL. Then, each batch Job is running and the corresponding offset data is ready in the Kafka. These offset information can be securely stored in the checkpoint file and read by applications that failed to start.

Figure 2-94 Data transmission through Direct Kafka API

After the failure, Spark Streaming can read data from Kafka again and process the data segment. The processing result is the same no matter Spark Streaming fails or not, because the semantic is processed only once.

Direct API does not need to use the WAL and Receivers, and ensures that each Kafka record is received only once, which is more efficient. Spark Streaming and Kafka can be integrated together, enabling high fault-tolerance, high efficiency, and ease-of-use streaming channel. Therefore, you are advised to use Direct Streaming to compute data.

Receiver computing process

When a Spark Streaming application starts (namely, a driver starts), the relevant StreamingContext (basis of all streaming functions) uses SparkContext to start Receiver as a resident running task. Receiver receives and stores streaming data to the Spark memory for processing. The following figure shows the data transfer life cycle.

Figure 2-95 Data transfer life cycle
  1. Receive data (blue arrow).

    Receiver divides a data stream into a series of blocks and stores them to the Executor memory. In addition, after Receiver is enabled, it writes data into write-ahead logs (WALs) in the fault-tolerant file system.

  2. Notify Driver (green arrow).

    Metadata in Receiver is sent to StreamingContext in Driver. The metadata includes:

    • Block reference ID used to locate the data position in the Executor memory.
    • Block data deviation information in logs (if the WAL function is enabled).
  3. Process data (red arrow).

    For each batch of data, StreamingContext generates Resilient Distributed Datasets (RDDs) and their jobs using Block at an interval between two data batches. StreamingContext executes Job by processing Block in the memory of Executor.

  4. Periodically set checkpoints (orange arrow).
  5. To meet tolerant requirements, StreamingContext periodically sets checkpoints and stores them to another file system.

Fault Tolerance

Spark and the abstract design of its RDD can seamlessly process any worker node in the cluster. Spark Streaming is built on top of Spark. Therefore, the worker node of Spark Streaming also has the same fault tolerance capability. However, Spark Streaming needs to run properly in case of long-time running. Therefore, Spark must be able to recover user-defined applications from faults through Driver (main process that coordinates all workers). Making the Spark driver fault tolerant is challenging because the Spark driver may be any user application implemented in any computation mode. Spark Streaming has internal computation architecture. That is, it periodically executes the same Spark computation in each batch data segment. Such architecture allows it to periodically store checkpoints to reliable storage space and recover them upon the restart of Driver.

For source data such as files, the recovery mechanism of Driver can achieve zero data loss because all data is stored in fault-tolerant file systems such as HDFS or S3. However, for data sources such as Kafka and Flume, some received data is stored in the memory and is not processed. Such data may lose, which is caused by the distributed operation mode of Spark applications. When Driver fails, all Executors running on the Cluster Manager and their data stored in the memory are terminated. The write-ahead log (WAL) function is added to Spark Streaming to avoid such data loss.

WAL is often used in databases and file systems to ensure persistence of any data operation. The WAL mechanism is: first record an operation to a persistent log and perform this operation on data. If the intermediate system fails during the operation, the system recovers after Spark Streaming reads logs and performs the predefined operation. The following describes how to use WAL to ensure persistence of received data:

Receiver is used to receive data from data sources such as Kafka. As a resident running task in Executor, Receiver receives data. It also confirms received data if supported by data sources (Received data is stored in the Executor memory, and Driver delivers a task to Executor for processing.)

After WAL is enabled, all received data is stored to log files in the fault-tolerant file system. Therefore, the received data does not lose even if Spark Streaming fails. Besides, Receiver checks correctness of received data only after the data is pre-written into logs. Data that is cached but not stored can be sent again by data sources after Driver restarts. These two mechanisms ensure zero data loss. That is, all data is recovered from logs or re-sent by data sources.

To enable the WAL function, perform the following operations:
  • Set streamingContext.checkpoint to configure the checkpoint directory, which is an HDFS file path used to store streaming checkpoints and WALs.
  • Set the spark.streaming.receiver.writeAheadLog.enable parameter of Spark Streaming to true (default value: false).
After WAL is enabled, all Receivers have the advantage of recovering from reliable received data. You are advised to disable the in-memory replication mechanism because the fault-tolerant file system of WAL may also replicate data.
NOTE:

The data receiving throughput is lowered after WAL is enabled. All data is written into the fault-tolerant file system. As a result, the write throughput of the file system and the network bandwidth for data replication may become the potential bottleneck. To solve this problem, you are advised to create more Receivers to increase the degree of data receiving parallelism or use better hardware to improve the throughput of the fault-tolerant file system.

Recovery Process

Restart a failed Driver as follows:

Figure 2-96 Computation recovery process
  1. Restart computation (orange arrow).

    Use checkpoint information to restart Driver, reconstruct SparkContext and restart Receiver.

  2. Recover metadata blocks (green arrow).

    All essential metadata blocks must be recovered to ensure that the process continues.

  3. Relaunch unfinished jobs (red arrow).

    Recovered metadata is used to generate RDDs and corresponding jobs for batch processing unfinished due to failures.

  4. Read block data stored in logs (blue arrow).

    Block data is directly read from WALs during execution of the preceding jobs, and therefore all essential data reliably stored in logs is recovered.

  5. Resend unconfirmed data (purple arrow).

    Data that is cached but not stored to logs upon failures is re-sent by data sources, because Receiver does not confirm the data.

Therefore, by using WALs and reliable Receiver, Spark Streaming can avoid input data loss caused by Driver failures.

SparkSQL and Dataset Principle

SparkSQL

Figure 2-97 SparkSQL and DataSet

Spark SQL is a module for processing structured data. In Spark application, SQL statements or DataSet APIs can be seamlessly used for querying structured data.

Spark SQL and DataSet also provide a universal method for accessing multiple data sources such as Hive, CSV, Parquet, ORC, JSON, and JDBC. These data sources also allow data interaction. Spark SQL reuses the Hive frontend processing logic and meta data processing module. With the Spark SQL, you can directly query existing Hive data.

In addition, Spark SQL also provides API, CLI, and JDBC interfaces allowing diverse inputs to the client.

Spark SQL Native DDL/DML

In Spark 1.5, lots of DDL/DML commands are pushed down to and run on the Hive, causing coupling with the Hive and inflexibility such as inconsistent error report.

Spark2x realizes command localization and replace the Hive with SparkSQL Native DDL/DML to run DDL/DML commands. Additionally, the decoupling from the Hive is realized and command customization becomes more convenient.

DataSet

A DataSet is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row.

DataFrame is a structured distributed data set composed of several columns, which is similar to a table in the relationship database or the data frame in R/Python. DataFrame is a basic concept in Spark SQL, and can be created by using multiple methods, such as structured data set, Hive table, external database, or RDD.

Operations available on Datasets are divided into transformations and actions.

  • Transformations are the ones that produce new DataSet.

    e.g. map, filter, select, and aggregate (groupBy).

  • Actions are the ones that trigger computation and return results.

    e.g. count, show, or writing data out to file systems.

There are typically two ways to create a DataSet.

  • The most common way is by pointing Spark to some files on storage systems, using the read function available on a SparkSession.
    val people = spark.read.parquet("...").as[Person]  // Scala
    Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class));//Java
  • Datasets can also be created through transformations available on existing Datasets. For example, the following creates a new Dataset by applying a map operation on the existing one:
    val names = people.map(_.name)  // in Scala; names is a Dataset[String]
    Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // Java

CLI and JDBCServer

Excluding programming APIs, Spark SQL also provides the CLI/JDBC interface.

  • The spark-shell and spark-sql scripts can provide the CLI for debugging.
  • JDBCServer provides the JDBC interface for directly sending external JDBC requests to compute and parse structured data.
SparkSession Principle

SparkSession is a unified API in Spark2x. Hence, it is also a unified entry point for reading the data. SparkSession provides a single entry point to perform many operations that were previously scattered across multiple classes, and also provides accessor methods to these older classes for maximum compatibility.

A SparkSession can be created using a builder pattern. The builder will automatically reuse an existing SparkSession if one exists; and create a SparkSession if it does not exist. Configuration options set in the builder are automatically propagated over to Spark and Hadoop during I/O.

import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()
  • It can be used to execute SQL queries over data, getting the results back as a DataFrame.
    sparkSession.sql("select * from person").show
  • SparkSession can also be used to set runtime configuration options and these config options set can be used in SQL using variable substitution.
    sparkSession.conf.set("spark.some.config", "abcd")
    sparkSession.conf.get("spark.some.config")
    sparkSession.sql("select ${spark.some.config}")
  • SparkSession also includes a "catalog" method that contains methods to work with the metastore (i.e. data catalog). Methods there return Datasets so you can use the same Dataset API to play with them
    val tables = sparkSession.catalog.listTables()
    val columns = sparkSession.catalog.listColumns("myTable")
  • Underlying sparkContext can be accessed by sparkContext api of sparkSession.
    val sparkContext = sparkSession.sparkContext
Structured Streaming Principle

Structured Streaming is a stream processing engine built on the Spark SQL engine. You can use the Dataset/DataFrame API in Scala, Java, Python, or R to express streaming aggregations, event-time windows, and stream-to-batch joins. If streaming data is incrementally and continuously produced, Spark SQL will continue to process the data and synchronize the result to the result set. The system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs.

The core of Structured Streaming is to take streaming data as an incremental database table. Similar to the data block processing model, the streaming data processing model applies query operations on a static database table to streaming computing, and Spark uses standard SQL statements for query, to obtain data from the incremental and unbounded table.

Figure 2-98 Unbounded table

Each query operation will generate a Result Table. At each trigger interval, updated data will be synchronized to the Result Table. Whenever the Result Table is updated, the updated result will be written into an external storage system.

Figure 2-99 Data processing model

Storage modes of Structured Streaming at the OutPut phase are as follows:
  • Complete Mode: A connector of an external system writes the updated result set into the external storage system.
  • Append Mode: If an interval is triggered, only added data in the Result Table will be written into an external system. This mode is applicable only to a result set that has already existed and will not be updated.
  • Update Mode: If an interval is triggered, only updated data in the Result Table will be written into an external system, which is a difference between the Appendix Mode and Update Mode.
Basic Concepts
  • RDD

    Resilient Distributed Dataset (RDD) is a core concept of Spark. It indicates a read-only and partitionable distributed dataset. Partial or all data of this dataset can be cached in the memory and reused between computations.

    RDD Generation

    • An RDD can be generated from the Hadoop file system or other storage systems that are compatible with Hadoop, such as Hadoop Distributed File System (HDFS).
    • A new RDD can be transferred from a parent RDD.
    • An RDD can be converted from a collection.

    RDD Storage

    • Users can select different storage levels to store an RDD for reuse. (There are 11 storage levels to store an RDD.)
    • The current RDD is stored in the memory by default. When the memory is insufficient, the RDD overflows to the disk.
  • RDD Dependency

    The RDD dependency includes the narrow dependency and wide dependency.

    Figure 2-100 RDD dependency
    • Narrow dependency: Each partition of the parent RDD is used by at most one partition of the child RDD partition.
    • Wide dependency: Partitions of the child RDD depend on all partitions of the parent RDD due to shuffle operations.

    The narrow dependency facilitates the optimization. Logically, each RDD operator is a fork/join process. Fork the RDD to each partition, and then perform the computation. After the computation, join the results, and then perform the fork/join operation on next operator. It takes a long period of time to directly translate the RDD to physical implementation. There are two reasons: Each RDD (even the intermediate results) must be physicalized to the memory or storage, which takes time and space; the partitions can be joined only when the computation of all partitions is complete (if the computation of a partition is slow, the entire join process is slowed down). If the partitions of the child RDD narrowly depend on the partitions of the parent RDD, the two fork/join processes can be combined to optimize the entire process. If the relationship in the continuous operator sequence is narrow dependency, multiple fork/join processes can be combined to reduce the time for waiting and improve the performance. This is called pipeline optimization in Spark.

  • Transformation and Action (RDD Operations)

    Operations on RDD include transformation (the returned value is an RDD) and action (the returned value is not an RDD). Figure 2-101 shows the process. The transformation is lazy, which indicates that the transformation from one RDD to another RDD is not immediately executed. Spark only records the transformation but does not execute it immediately. The real computation is started only when the action is started. The action returns results or writes the RDD data into the storage system. The action is the driving force for Spark to start the computation.

    Figure 2-101 RDD operation

    The data and operation model of RDD are quite different from those of Scala.

    val file = sc.textFile("hdfs://...") val errors = file.filter(_.contains("ERROR")) errors.cache() errors.count()
    1. The textFile operator reads log files from the HDFS and returns file (as an RDD).
    2. The filter operator filters rows with ERROR and assigns them to errors (a new RDD). The filter operator is a transformation.
    3. The cache operator caches errors for future use.
    4. The count operator returns the number of rows of errors. The count operator is an action.
    Transformation includes the following types:
    • The RDD elements are regarded as simple elements:

      The input and output have the one-to-one relationship, and the partition structure of the result RDD remains unchanged, for example, map.

      The input and output have the one-to-many relationship, and the partition structure of the result RDD remains unchanged, for example, flatMap (one element becomes a sequence containing multiple elements and then flattens to multiple elements).

      The input and output have the one-to-one relationship, but the partition structure of the result RDD changes, for example, union (two RDDs integrates to one RDD, and the number of partitions becomes the sum of the number of partitions of two RDDs) and coalesce (partitions are reduced).

      Operators of some elements are selected from the input, such as filter, distinct (duplicate elements are deleted), subtract (elements only exist in this RDD are retained), and sample (samples are taken).

    • The RDD elements are regarded as Key-Value pairs.

      Perform the one-to-one calculation on the single RDD, such as mapValues (the partition mode of the source RDD is retained, which is different from map).

      Sort the single RDD, such as sort and partitionBy (partitioning with consistency, which is important to the local optimization).

      Restructure and reduce the single RDD based on key, such as groupByKey and reduceByKey.

      Join and restructure two RDDs based on key, such as join and cogroup.

      NOTE:

      The later three operations involve sorting and are called shuffle operations.

    Action is classified into the following types:

    • Generate scalar configuration items, such as count (the number of elements in the returned RDD), reduce, fold/aggregate (the number of scalar configuration items that are returned), and take (the number of elements before the return).
    • Generate the Scala collection, such as collect (import all elements in the RDD to the Scala collection) and lookup (look up all values corresponds to the key).
    • Write data to the storage, such as saveAsTextFile (which corresponds to the preceding textFile).
    • Check points, such as checkpoint. When Lineage is quite long (which occurs frequently in graphics computation), it takes a long period of time to execute the whole sequence again when a fault occurs. In this case, checkpoint is used as the check point to write the current data to stable storage.
  • Shuffle

    Shuffle is a specific phase in the MapReduce framework, which is located between the Map phase and the Reduce phase. If the output results of Map are to be used by Reduce, each output result must be hashed based on the key and distributed to the corresponding Reducer. This process is called Shuffle. Shuffle involves the read and write of the disk and the transmission of the network, so that the performance of Shuffle directly affects the operation efficiency of the entire program.

    The following figure describes the entire process of the MapReduce algorithm.

    Figure 2-102 Algorithm process

    Shuffle is a bridge connecting data. The following describes the implementation of shuffle in Spark.

    Shuffle divides the Job of a Spark into multiple stages. The former stages contain one or multiple ShuffleMapTasks, and the last stage contains one or multiple ResultTasks.

  • Spark Application Structure

    The Spark application structure includes the initial SparkContext and the main program.

    • Initial SparkContext: constructs the operation environment of the Spark application.

      Constructs the SparkContext object. For example:

      new SparkContext(master, appName, [SparkHome], [jars])

      Parameter description

      master: indicates the link string. The link modes include local, YARN-cluster, and YARN-client.

      appName: indicates the application name.

      SparkHome: indicates the directory where Spark is installed in the cluster.

      jars: indicates the code and dependency package of the applicat

    • Main program: processes data.

    For submitting applications details, see https://spark.apache.org/docs/2.3.2/submitting-applications.html

  • Spark shell Command

    The basic Spark shell command supports the submitting of the Spark application. The Spark shell command is

    ./bin/spark-submit \
     --class <main-class> \
     --master <master-url> \ 
    ... # other options 
     <application-jar> \ 
     [application-arguments]

    Parameter description:

    --class: indicates the name of the class of the Spark application.

    --master: indicates the master that the Spark application links, such as YARN-client and YARN-cluster.

    application-jar: indicates the path of the jar package of the Spark application.

    application-arguments: indicates the parameter required to submit the Spark application. (This parameter can be empty.)

  • Spark JobHistory Server

    The Spark Web UI is used to monitor the details in each phase of the Spark framework of a running or historical Spark job and provide the log display, which helps users to develop, configure, and optimize the job in more fine-grained units.

HA Overview

Multi-Active Instance Function
Background

Based on existing JDBCServers in the community, multi-active instance HA is used to achieve the high availability. In this mode, multiple JDBCServers coexist in the cluster and the client can randomly connect to any JDBCServers to perform service operations. When one or multiple JDBCServers stops working, a client can connect to another normal JDBCServer.

Compared with active/standby HA, multi-active instance HA eliminates following restrictions:

  • In active/standby HA, when the active/standby switchover occurs, the unavailable period cannot be controlled by JDBCServer, it depends on YARN service resources.
  • In Spark, the Thrift JDBC similar to HiveServer2 provides services and users access services through Beeline and JDBC interface.

    Therefore, the processing capability of JDBCServer cluster depends on the single-point capability of the active server, without sufficient scalability.

Multi-active instance HA not only prevents service interruption caused by switchover, but also enables cluster scale-out.

Implementation
The following figure shows the basic principle of multi-active instance HA of Spark JDBCServer.
Figure 2-103 Spark JDBCServer HA
  1. After JDBCServer is started, it registers with ZooKeeper by writing node information in a specified directory. Node information includes the JDBCServer instance IP, port number, version, and serial number (information of different nodes is separated by commas).

    For example:

    [serverUri=192.168.169.84:22550;version=6.5.0;sequence=0000001244,serverUri=192.168.195.232:22550;version=6.5.0;sequence=0000001242,serverUri=192.168.81.37:22550;version=6.5.0;sequence=0000001243]
  2. To connect to JDBCServer, the client must specify Namespace, which is the directory of JDBCServer instances in Zookeeper. During the connection, a JDBCServer instance is randomly selected from the specified Namespace. For detail about URL, see URL Connection.
  3. After the connection succeeds, the client sends SQL statements to JDBCServer.
  4. JDBCServer executes received SQL statements and sends results back to the client.

If multi-active instance HA of Spark JDBCServer is enabled, all JDBCServer instances are independent and equivalent. When one JDBCServer instance is interrupted during upgrade, other JDBCServer instances can accept the connection request from the client.

Following rules must be followed in the multi-active instance HA of Spark JDBCServer:
  • If a JDBCServer instance exits abnormally, no other instance will take over the sessions and services running on the abnormal instance.
  • When JDBCServer process is stopped, corresponding nodes will be deleted from ZooKeeper.
  • The client randomly selects the server, which may result in uneven session allocation caused by random distribution of policy results, and finally result in load imbalance of instances.
  • After the instance enters the maintenance mode (in which no new connection request from client is accepted), when the out-of-service time arrives, services still running on the instance may fail.
URL Connection

Multi-active instance mode

In multi-active instance mode, the client reads contents from the Zookeeper node and connects the JDBCServer service. The connection string is as follows:

  • Security mode
    • The JDBCURL of kinit authentication mode is as follows:
      jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;
      NOTE:
      • <zkNode_IP>:<zkNode_Port> indicates comma-separated URLs of ZooKeeper.

        For example, 192.168.81.37:24002,192.168.195.232:24002,192.168.169.84:24002

      • sparkthriftserver2x indicates the directory in Zookeeper, where a random JDBCServer instance is connected to the client. The value must be consistent with the value of spark.thriftserver.zookeeper.namespace.

      For example, when you use Beeline client to connect JDBCServer, run the following command:

      sh CLIENT_HOME/spark/bin/beeline -u "jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;"

    • The JDBCURL of keytab authentication mode is as follows:
      jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;user.principal=<principal_name>;user.keytab=<path_to_keytab>
      

      <principal_name> indicates the principal of Kerberos user, for example, test@HADOOP.COM. <path_to_keytab> indicates the keytab file path corresponds to <principal_name>, for example, /opt/auth/test/user.keytab.

  • Normal mode
    jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;

    For example, when you use Beeline client to connect JDBCServer, run the following command:

    sh CLIENT_HOME/spark/bin/beeline -u "jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;"

Non-multi-active instance mode

In non-multi-active instance mode, a client connects to a specified JDBCServer node. Compared with multi-active instance mode, the connection string in non-multi-active instance mode does not contain the serviceDiscoveryMode and zooKeeperNamespace parameter about Zookeeper.

For example, when you use Beeline client to connect JDBCServer in non-multi-active instance mode, run the following command:

sh CLIENT_HOME/spark/bin/beeline -u "jdbc:hive2://<server_IP>:<server_Port>/;user.principal=spark2x/hadoop.hadoop.com@HADOOP.COM;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;"

NOTE:

<server_IP>:<server_Port> indicates the URL of JDBCServer.

Other operations of JDBCServer interface in multi-active instance mode and non-multi-active instance mode are the same. Spark JDBCServer is another implementation of HiveServer2 in Hive. For details about other operations, see official website of Hive at https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients.

Multi-Tenant Function
Scenario

In the solution of JDBCServer multiple active instances, JDBCServer implements the YARN-Client mode but only one YARN resource queue is available. For the resource limitation, the multi-tenant mode is introduced.

In multi-tenant mode, JDBCServers are bound with tenants. A tenant can use one or more JDBCServers and a JDBCServer provides services for only one tenant. Different YARN queues can be configured for varied users for resource isolation. JDBCServers are enabled as required, preventing resource waste.

Procedure

Figure 2-104 shows the HA solution of the multi-tenant mode.

Figure 2-104 HA principle of Spark JDBCServer in multi-tenant mode
  1. When the ProxyServer is started, it registers with ZooKeeper by writing node information in a specified directory. Node information includes the instance IP address, port number, version, and serial number (information of different nodes is separated by commas).
    NOTE:

    In multi-tenant mode, the JDBCServer instance on FusionInsight indicates ProxyServer, which is the JDBCServer agent.

    For example:
    serverUri=192.168.169.84:22550;version=6.5.0;sequence=0000001244,serverUri=192.168.195.232:22550;version=6.5.0;sequence=0000001242,serverUri=192.168.81.37:22550;version=6.5.0;sequence=0000001243, 
  2. To connect to ProxyServer, the client must specify the namespace, which is the directory of ProxyServer instances where you want to access ZooKeeper. When the client connects to ProxyServer, an instance under namespace is selected randomly for connection. For details about the URL, see Overview of URL Connection.
  3. After the client successfully connects to the ProxyServer, ProxyServer checks whether the ThriftServerJDBCServer of a tenant exists. If yes, Beeline connects the ThriftServerJDBCServer. If no, a ThriftServerJDBCServer is started in YARN-Cluster mode. After the startup of ThriftServerJDBCServer, ProxyServer obtains the IP address of ThriftServerJDBCServer and establishes the connection between Beeline and ThriftServerJDBCServer.
  4. The client sends SQL statements to the ProxyServer, which then forwards statements to the connected ThriftServerJDBCServer. ThriftServerJDBCServer returns the results to the ProxyServer, which then forwards ProxyServer to the client.

In the multi-instance active HA mode of Spark ProxyServer, all instances are independent and equivalent. If one instance is interrupted during upgrade, other instances can accept the connection request from the client.

Overview of URL Connection

Multi-tenant mode

In multi-tenant mode, the client reads content from the ZooKeeper node and connects to ProxyServer. The connection strings are as follows:

  • Security mode
    • If Kinit authentication is enabled, the client URLs are as follows:
      jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;
      NOTE:
      • <zkNode_IP>:<zkNode_Port> indicates comma-separated URLs of ZooKeeper.

        For example, 192.168.81.37:24002,192.168.195.232:24002,192.168.169.84:24002.

      • sparkthriftserver2x indicates the ZooKeeper directory, where a random JDBCServer instance is connected to the client. The value must be consistent with the value of spark.thriftserver.zookeeper.namespace.

      For example, when you use Beeline client to connect ProxyServerJDBCServer, run the following command:

      sh CLIENT_HOME/spark/bin/beeline -u "jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;"

    • If Keytab authentication is enabled, the URLs are as follows:
      jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;user.principal=<principal_name>;user.keytab=<path_to_keytab>
      

      <principal_name> indicates the principal, for example, test@HADOOP.COM, used by the Kerberos. <path_to_keytab> indicates the keytab file path, for example, /opt/auth/test/user.keytab, corresponds to <principal_name>.

  • Normal mode:
    jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;

    For example, run the following command when connecting the Beeline client:

    sh CLIENT_HOME/spark/bin/beeline -u "jdbc:hive2://<zkNode1_IP>:<zkNode1_Port>,<zkNode2_IP>:<zkNode2_Port>,<zkNode3_IP>:<zkNode3_Port>/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;"

Non-multi-tenant mode

In non-multi-tenant mode, a client connects to a specified JDBCServer node. Compared with multi-tenant mode, the connection string in non-multi-active instance mode does not contain the serviceDiscoveryMode and zooKeeperNamespace parameter about ZooKeeper.

For example, when you use Beeline client to connect ThriftServerJDBCServer in non-multi-active instance mode, run the following command:

sh CLIENT_HOME/spark/bin/beeline -u "jdbc:hive2://<server_IP>:<server_Port>/;user.principal=spark2x/hadoop.hadoop.com@HADOOP.COM;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;"

NOTE:

<server_IP>:<server_Port> indicates the URL of the specified JDBCServer node.

Other operations of JDBCServer interface in multi-active tenant mode and non-multi-tenant mode are the same. Spark JDBCServer is another implementation of HiveServer2 in Hive. For details about other operations, see official website of Hive at https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients.

Specifying Tenants

Generally, the client submitted by a user connects to the default JDBCServer of the tenant to which the user belongs. If you want to connect the client to the JDBCServer of a specified tenant, add the --hiveconf mapreduce.job.queuename parameter.

Command for connecting Beeline is as follows (aaa indicates the tenant name):

beeline --hiveconf mapreduce.job.queuename=aaa -u 'jdbc:hive2://192.168.39.30:24002,192.168.40.210:24002,192.168.215.97:24002;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=sparkthriftserver2x;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM;'

Relationship with Other Components

Relationship Between Spark and HDFS

Data computed by Spark can come from multiple data sources, such as Local files and HDFS. Most data computed by Spark comes from the HDFS. The HDFS can read data in large scale for parallel computation. After being computed, data can be stored in the HDFS.

Spark involves Driver and Executor. Driver schedules tasks and Executor runs tasks.

Figure 2-105 describes the file reading process.

Figure 2-105 File reading process
The file reading process is as follows:
  1. Driver interconnects with the HDFS to obtain the information of File A.
  2. The HDFS returns the detailed block information about this file.
  3. Driver sets a parallel degree based on the block data amount, and creates multiple tasks to read the blocks of this file.
  4. Executor runs the tasks and reads the detailed blocks as part of the Resilient Distributed Dataset (RDD).

Figure 2-106 describes the file writing process.

Figure 2-106 File writing process
The file writing process is as follows:
  1. Driver creates a directory where the file is to be written.
  2. Based on the RDD distribution status, the number of tasks related to data writing is computed, and these tasks are sent to Executor.
  3. Executor runs these tasks, and writes the RDD data to the directory created in step 1.
Relationship Between Spark and YARN

The Spark computing and scheduling can be implemented using YARN mode. Spark enjoys the computing resources provided by YARN clusters and runs tasks in a distributed way. Spark on YARN involves two modes: yarn-cluster and yarn-client.

  • yarn-cluster mode

    Figure 2-107 describes the operation framework.

    Figure 2-107 Spark on yarn-cluster operation framework

    Spark on yarn-cluster implementation process:

    1. The client generates the Application information, and then sends the information to ResourceManager.
    2. ResourceManager allocates a Container(ApplicationMaster) to Spark Application, and then starts Driver on the Container node.
    3. ApplicationMaster applies for resources from ResourceManager to run Executor.

      ResourceManager allocates a Container to ApplicationMaster. Then ApplicationMaster communicates with the related NodeManager, and starts Executor on the obtained Container. After Executor is started, it registers in Driver and applies for tasks.

    4. Driver allocates tasks to Executor.
    5. Executor executes tasks and reports the operating status to Driver.
  • yarn-client mode

    Figure 2-108 describes the operation framework.

    Figure 2-108 Spark on yarn-client operation framework

    Spark on yarn-client implementation process:

    NOTE:

    In yarn-client mode, the Driver is deployed and run on the Client. In yarn-client mode, is not compatible with the old version of the client. Recommend the use of yarn-cluster mode.

    1. The client sends the Spark application request to ResourceManager, and the client packages all information required to start ApplicationMaster, and sends the information to ResourceManager, then ResourceManager returns the results. The results include information such as ApplicationId and the upper limit as well as lower limit of available resources.
    2. After receiving the request, ResourceManager finds a proper node for ApplicationMaster and starts it on this node. ApplicationMaster is a role in YARN, and the process name in Spark is ExecutorLauncher.
    3. Based on the resource requirements of each task, ApplicationMaster can apply for a series of Containers to run tasks from ResourceManager.
    4. After receiving the newly allocated Container list (from ResourceManager), ApplicationMaster sends information to the related NodeManager to start the Container.

      ResourceManager allocates a Container to ApplicationMaster. Then ApplicationMaster communicates with the related NodeManager, and starts Executor on the obtained Container. After Executor is started, it registers in Driver and applies for tasks.

      NOTE:

      The running containers will not be hung to release resources.

    5. Driver allocates tasks to Executor. Executor executes tasks and reports the operating status to Driver.

Spark2x Open-Source New Features

Overview

New open source features of Spark 1.5 are as follows:

Download
Updated: 2019-05-17

Document ID: EDOC1100074548

Views: 3943

Downloads: 37

Average rating:
This Document Applies to these Products
Related Documents
Related Version
Share
Previous Next