FAQ-C50中Flume访问外部kafka将数据存到hbase调试总结-客户端模式

发布时间:  2017-05-18 浏览次数:  454 下载次数:  3
问题描述

在使用FusionInsight HD C50版本flume时,需要对接外部开源的kafka数据源,将数据存入hbase组件中,因为外部开源的kafka使用的是非安全版本的zookeeper,而FI HD是安全版本的zookeeper,如何进行对接配置?

解决方案

通常,在FI HD中,flume与hbase组件内部的对接采用server模式即可,但当使用的源为外部kafka数据时,因为外源kafka使用的是开源非安全版本的Zookeeper,不能直接与FI做对接,故需要部署Flume的客户端方式进行对接。基本数据流程为:外部kafka->flume客户端->flume服务端->hbase。

一、准备服务端配置文件,参考properties.properties文件。



样例如下:

server.sources = avro_source

server.channels = memory_channel

server.sinks = hbase_sink

server.sources.avro_source.type = avro

server.sources.avro_source.port = 21154

server.sources.avro_source.bind = 99.12.166.19

server.sources.avro_source.threads = 4

server.sources.avro_source.ssl = false

server.sources.avro_source.keystore = /opt/huawei/Bigdata/FusionInsight-Flume-1.6.0/flume/conf/flume-sChat.jks

server.sources.avro_source.keystore-password = 9A6ACCA141D6E74081B3064D2EC2589C

server.sources.avro_source.keystore-type = JKS

server.sources.avro_source.trust-all-certs = false

server.sources.avro_source.password-key = 96947A066658903D8B59CEC609513ED40D4410F2FB367D9D8B5551B612D80372651D0F7AF4B158EDBC92FA9FC73AA945

server.sources.avro_source.truststore = /opt/huawei/Bigdata/FusionInsight-Flume-1.6.0/flume/conf/flume-sChatt.jks

server.sources.avro_source.truststore-password = 9A6ACCA141D6E74081B3064D2EC2589C

server.sources.avro_source.truststore-type = JKS

server.sources.avro_source.selector.type = replicating

server.sources.avro_source.channels = memory_channel

# the channel configuration of memory_channel

server.channels.memory_channel.type = memory

server.channels.memory_channel.capacity = 10000

server.channels.memory_channel.transactionCapacity = 100

server.channels.memory_channel.channlefullcount = 10

server.channels.memory_channel.keep-alive = 10

server.channels.memory_channel.byteCapacityBufferPercentage = 20

server.sinks.hbase_sink.type = hbase

server.sinks.hbase_sink.table = lhq_person

server.sinks.hbase_sink.columnFamily = f1

server.sinks.hbase_sink.kerberosPrincipal = flume

server.sinks.hbase_sink.kerberosKeytab = /opt/huawei/Bigdata/FusionInsight-Flume-1.6.0/flume/conf/flume.keytab

server.sinks.hbase_sink.batchSize = 100

server.sinks.hbase_sink.coalesceIncrements = true

server.sinks.hbase_sink.channel = memory_channel

 

二、准备客户端配置文件,参考client.properties.properties文件

client.sources = kafka_source

client.channels = memory_channel

client.sinks = static_log_sink

# the source configuration of kafka_source

client.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource

client.sources.kafka_source.topic = dimon_tcpdump

client.sources.kafka_source.groupId = g1

client.sources.kafka_source.zookeeperConnect = 99.12.143.120:2181,99.12.141.128:2181,99.12.141.43:2181

client.sources.kafka_source.kafka.security.protocol  = SASL_PLAINTEXT

client.sources.kafka_source.kafka.auto.offset.reset = smallest

client.sources.kafka_source.batchDurationMillis = 1000

client.sources.kafka_source.batchSize = 100

client.sources.kafka_source.channels = memory_channel

# the channel configuration of memory_channel

client.channels.memory_channel.type = memory

client.channels.memory_channel.capacity = 10000

client.channels.memory_channel.transactionCapacity = 100

client.channels.memory_channel.channlefullcount = 100

client.channels.memory_channel.keep-alive = 10

client.channels.memory_channel.byteCapacityBufferPercentage = 20

client.sinks.static_log_sink.type = avro

client.sinks.static_log_sink.hostname = 99.12.166.19

client.sinks.static_log_sink.port = 21154

client.sinks.static_log_sink.batch-size = 100

client.sinks.static_log_sink.connect-timeout =  10000

client.sinks.static_log_sink.request-timeout = 20000

client.sinks.static_log_sink.ssl = false

client.sinks.static_log_sink.keystore = /opt/huawei/flume-client/fusioninsight-flume-1.6.0/conf/flume-cChat.jks

client.sinks.static_log_sink.password-key = 96947A066658903D8B59CEC609513ED40D4410F2FB367D9D8B5551B612D80372651D0F7AF4B158EDBC92FA9FC73AA945

client.sinks.static_log_sink.keystore-password = B0A7C693E96AA992DA222B886998784F

client.sinks.static_log_sink.keystore-type = JKS

client.sinks.static_log_sink.trust-all-certs = false

client.sinks.static_log_sink.truststore = /opt/huawei/flume-client/fusioninsight-flume-1.6.0/conf/flume-cChatt.jks

client.sinks.static_log_sink.truststore-password = B0A7C693E96AA992DA222B886998784F

client.sinks.static_log_sink.truststore-type = JKS

client.sinks.static_log_sink.channel = memory_channel



三、安装flume客户端


a. 登录FusionInsight HD集群,单击“服务管理 > Flume > 下载客户端”,设置“客户端类型”为“完整客户端”,下载Flume服务客户端文件。

b. 使用“WinSCP”工具,以“user”用户将“FusionInsight_V100R002C60SPC200_Flume_Client.tar”文件上传到将要安装Flume服务客户端的节点目录上,例如“/opt”。

说明: “user”用户为用于安装和运行Flume客户端的用户。


c. 解压软件包。使用“Putty”工具,用“user”用户进入将要安装Flume服务客户端的节点。进入安装包所在目录,例如“/opt,执行如下命令解压安装包到本地目录。
  cd /opt
  tar -xvf FusionInsight_V100R002C60SPC200_Flume_Client.tar
d.校验软件包。执行sha256sum命令校验解压得到的文件“FusionInsight_V100R002C60SPC200_Flume_ClientConfig.tar”,回显“OK”表示校验通过。例如:
  sha256sum -c
  FusionInsight_V100R002C60SPC200_Flume_ClientConfig.tar.sha256

  FusionInsight_V100R002C60SPC200_Flume_ClientConfig.tar:
  OK

e. 执行以下命令进入“/opt”,解压“FusionInsight_V100R002C60SPC200_Flume_ClientConfig.tar”文件,生成“FusionInsight_V100R002C60SPC200_Flume_ClientConfig”文件夹。
  cd /opt
  tar -xvf
  FusionInsight_V100R002C60SPC200_Flume_ClientConfig.tar

f. 执行以下命令进入“/opt/FusionInsight_V100R002C60SPC200_Flume_ClientConfig/Flume”文件夹,解压“FusionInsight-Flume-1.6.0.tar.gz”文件。
  cd /opt/FusionInsight_V100R002C60SPC200_Flume_ClientConfig/Flume
  tar -xvf FusionInsight-Flume-1.6.0.tar.gz
g. 执行以下命令,将修改的参数信息导入客户端。
  cd /opt/FusionInsight_V100R002C60SPC200_Flume_ClientConfig/Flume
  ./install.sh -d /opt/FlumeClient -f ip -c
  flume/conf/client.properties.properties -l /var/log/Bigdata

  说明: 可通过install.sh –h 获取命令帮助

 

四、在hbase中创建数据存放的表。

客户端执行命令如下:


1.      Hbase shell   ----进入hbase客户端

2.      create ‘lhq_person’,’f1’   ----创建表lhq_person,列族为f1

3.      grant ‘flume’,’RWC’,’lhq_person’   ----给表lhq_person授权,可被flume用户读写权限。

4.      user_permission ‘lhq_person’  ----查看表权限是否授权成功

5.      count ‘lhq_person’   ----统计是否有数据进入hbase

五、在FI Manager 管理界面上,点击flume服务,并点选configration选项,将第一步准备的配置文件properties.properties上传,并保存。

、启动kafka生产者,手动生产消息。观察hbase数据入库情况。



kafka客户端使用介绍:

1.      进入kafka客户端 cd /客户端路径/Kafka/kafka/bin
2.      创建topic信息。
3.      开启kafka生产者,产生消息。
4.      观察hbase表数据是否存储成功。



Kafka使用命令样例如下:

//消息生产者

./kafka-console-producer.sh --topic topic1 --broker-list 99.12.166.9:21005


//消息消费者


./kafka-console-consumer.sh --topic topic1 --zookeeper 99.12.166.9:24002/kafka


//消息查看


./kafka-topics.sh --zookeeper 99.12.166.9:24002/kafka --list

 

 





 

 



END