datax 对接 hdfs

概要描述

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能

详细说明

Java版本要求:jdk1.8及以上,Python版本要求:>=2.7.X,DataX 未更新至 Python3

下载安装 DataX

下载地址以及安装教程参考 DataX 开源手册:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

编辑 json

根据数据源编辑 json 文件,数据源参考指南:https://github.com/alibaba/DataX

执行数据迁移脚本

进入datax 的 bin 目录中,里面有 datax.py 文件

# python2 datax.py ../conf/oracle_inceptor_kerberos_test.json

将 oracle 数据写入无认证的 hdfs 中的 json 配置文件

{
"job": {
	"setting": {
	"speed": {
		"channel": 3
		},
		"errorLimit": {
			"record": 0,
			"percentage": 0.02
			}
		},
		"content": [
		{
			"reader": {
				"name": "oraclereader",
				"parameter": {
					"username": "sys",
					"password": "123456",
					"connection": [
						{
							"querySql":["select empno,ename from emp"],
							"jdbcUrl": ["jdbc:oracle:thin:@172.22.44.1:1521/orcl"]
						}
					],
				}
			},
			"writer": {
				"name": "hdfswriter",
				"parameter": {
					"column": [{"name":"empno","type":"int"},{"name":"ename","type":"string"}],
					"compress": "",
					"//": "这里要填写 active 的 namenode 的ip地址",
					"defaultFS": "hdfs://172.22.23.2:8020",
					"fieldDelimiter": "t",
					"fileName": "emp",
					"fileType": "text",
					"path": "/tmp/",
					"writeMode": "append"
                    }
                }
            }
        ]
    }
}

将 oracle 数据写入 kerberos 认证方式的 hdfs 中的 json 配置文件

{
"job": {
	"setting": {
		"speed": {
			"channel": 3
		},
		"errorLimit": {
			"record": 0,
			"percentage": 0.02
			}
		},
	"content": [
	{
		"reader": {
			"name": "oraclereader",
			"parameter": {
				"username": "lkw",
				"password": "q6408912.",
				"connection": [{"querySql":["select empno,ename from emp"],
				"jdbcUrl": ["jdbc:oracle:thin:@172.22.44.1:1521/orcl"]
					}
				],
			}
		},
		"writer": {
			"name": "hdfswriter",
			"parameter": {
				"column": [{"name":"empno","type":"int"},{"name":"ename","type":"string"}],
				"compress": "",
				"//": "这里要填写 active 的 namenode 的ip地址",
				"defaultFS": "hdfs://172.22.23.2:8020",
				"fieldDelimiter": "t",
				"fileName": "emp",
				"fileType": "text",
				"path": "/tmp/",
				"writeMode": "append",
				"//": "开启kerberos模式下,需要添加下面四行!!!",
				"haveKerberos": true,
				"//": "可以从TDH集群上获取keytab文件",
				"kerberosKeytabFilePath": "/etc/hdfs1/conf/hdfs.keytab",
				"//": "可以通过klist -ket /etc/hdfs1/conf/hdfs.keytab 查看对应的principal",
				"kerberosPrincipal": "hdfs/tdh60202@TDH",
				"//": "这个参数,参考TDH页面 HDFS组件的实际配置的值!",
				"hadoopConfig": { "dfs.data.transfer.protection": "integrity" }
					}
				}
			}
		]
	}
}

FAQ

1、开启kerberos模式下,报错 Connection reset by peer

客户端报错一般是这样的:

org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/emp__1b5429c4_671d_4fbc_b57a_3af843c1333d could only be replicated to 0 nodes instead of minReplication (=1). There are 3 datanode(s) running and 3 node(s) are excluded in this operation.

检查datanode日志

2019-09-27 10:00:29,684 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Failed to read expected SASL data transfer protection handshake from client at /172.22
.23.1:42541. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection

Hadoop 2.6.0 版本起,HDFS客户端与 datanode 通信的身份认证有多种模式,客户端的认证方式要与 datanode 端保持一致。可以通过命令查询 datanode 端配置的认证模式:

# grep -C 2 'dfs.data.transfer.protection' /etc/hdfs1/conf/hdfs-site.xml

然后在json 中添加对应的属性,添加到 parameter 属性中,如下所示:

"hadoopConfig":{
    "dfs.data.transfer.protection":"integrity",
    "hadoop.rpc.protection":"authentication"
}

2、报错 No common protection layer between client and server

  • dfs.data.transfer.protection:以TDH集群hdfs组件的实际配置值为准(可以在参数页面查看,或者hdfs getconf -confKey dfs.data.transfer.protection 来检查配置)。
  • hadoop.rpc.protection: 通过hdfs getconf -confKey "hadoop.rpc.protection" 是否为 authentication (如果没有的话可以在TDH上新增自定义参数,配置服务再重启hdfs),然后在 json 中添加对应的属性,添加到 parameter 属性中,如下所示:
"hadoopConfig":{
    "dfs.data.transfer.protection":"integrity",
    "hadoop.rpc.protection":"authentication"
}

3、报错 Operation category READ is not supported in state standby

json 配置中 "defaultFS" 的值要选择 active 的 namenode,如需配置namanode HA,参考下面part4部分<如何配置namenode HA高可用>

2019-09-27 16:28:48.367 [job-0] INFO  JobContainer - DataX Reader.Job [oraclereader] do prepare work .
2019-09-27 16:28:48.368 [job-0] INFO  JobContainer - DataX Writer.Job [hdfswriter] do prepare work .
2019-09-27 16:28:48.476 [job-0] ERROR HdfsWriter$Job - 判断文件路径[message:filePath =/tmp/]是否存在时发生网络IO异常,请检查您的网络是否正常!
2019-09-27 16:28:48.482 [job-0] ERROR JobContainer - Exception when job run
com.alibaba.datax.common.exception.DataXException: Code:[HdfsWriter-06], Description:[与HDFS建立连接时出现IO异常.]. - org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby

4、如何配置namenode HA高可用

active namenode 有时会发生故障切换的问题,可以参考如下方法配置ha高可用

"writer": {
	"name": "hdfswriter",
	"parameter": {
		"column": [{"name":"empno","type":"int"},{"name":"ename","type":"string"}],
		"compress": "",
		"//": "参考TDH集群/etc/hdfsX/conf/hdfs-site.xml中的dfs.nameservices",
		"defaultFS": "hdfs://nameservice1",
		"fieldDelimiter": "t",
		"fileName": "emp",
		"fileType": "text",
		"path": "/tmp/",
		"writeMode": "append",
		"haveKerberos": true,
		"kerberosKeytabFilePath": "/mnt/disk2/datax/bin/hdfs.keytab",
		"kerberosPrincipal": "hdfs/tdh083@TDH",
		"hadoopConfig":{
			"//": "参考TDH集群/etc/hdfsX/conf/hdfs-site.xml中的dfs.nameservices",
			"dfs.nameservices": "nameservice1",
			"//": "参考TDH集群/etc/hdfsX/conf/hdfs-site.xml中的dfs.ha.namenodes.nameservice1",
			"dfs.ha.namenodes.nameservice1": "nn1,nn2",
			"//": "参考TDH集群/etc/hdfsX/conf/hdfs-site.xml中的dfs.namenode.rpc-address.nameservice1.nn1",
			"dfs.namenode.rpc-address.nameservice1.nn1": "tdh082:8020",
			"//": "参考TDH集群/etc/hdfsX/conf/hdfs-site.xml中的dfs.namenode.rpc-address.nameservice1.nn2",
			"dfs.namenode.rpc-address.nameservice1.nn2": "tdh083:8020",
			"//": "参考TDH集群/etc/hdfsX/conf/hdfs-site.xml中的dfs.client.failover.proxy.provider.nameservice1",
			"dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
			}
		}
	}

5、报错 javax.security.auth.login.LoginException: Receive timed out

修改该节点(非TDH集群)的/etc/krb5.conf文件,将 udp_preference_limit 的值修改为1,优先走TCP连接。

6、报错 javax.security.auth.login.LoginException: 没有到主机的路由

a) 检查该节点(非TDH集群)的/etc/hosts文件,是否添加了TDH集群的映射关系(可参考TDH集群的/etc/hosts文件)
b) 检查该节点(非TDH集群)的/etc/krb5.conf文件,kdc中的hostname是否是正确的(可参考TDH集群的/etc/krb5.conf文件)

7、报错 Missing parentheses in call to 'print'. Did you mean print(readerRef)?

[root@single01/mnt/disk2/datax/bin]$ python3 datax.py ./oracle_ha.json 
  File "datax.py", line 114
    print readerRef
                  ^
SyntaxError: Missing parentheses in call to 'print'. Did you mean print(readerRef)?

当前datax暂不支持python3版本,请切换python2进行使用。

8、双网卡环境下,datax往hdfs写数,连接到hdfs集群的内网IP

参考截图:除了集群要参考 TDH集群配置内外网双网卡下外网服务器如何访问内网的HDFS 做调整外,json文件要额外加上 "hadoopConfig": { "dfs.client.use.datanode.hostname": true } 的配置,确保 dfsclient 侧有做该配置。

参考:
记一次datax hdfswriter的踩坑记(上传文件到hdfs的坑)
客户端访问双网卡hadoop集群的HDFS

阅读剩余
THE END