使用UPSERT语句,将Apache™Kafka数据导入Trafodion表 - 易鲸捷

Kafka是一个流处理服务平台。其中,生产者(Producers)向主题(Topic)中发布消息,消费者(Consumers)读取并处理发布来主题中的消息。Kafka的主题是已发布消息的日志副本集合,这些日志都具有时间戳。可以对主题进行分区,以增加储备容量并提高并行度。

如图1所示,向同一个主题发布消息的不同生产者进程可以:
a) 将消息写入特定的分区(蓝色箭头);或
b) 答应Kafka在可用的分区间均衡地分发/加载消息(橙色箭头)。

此外,Kafka的各个副本作为一个已发布的消息,仅出现在一个分区中。

江苏快3从消费者的角度看,消费者可以属于一个消费者组(consumer group),也可以不属于任何消费者组。如果不属于任何消费者组,则所有订阅了主题的消费者都能够读取发布在主题中的某一条消息(如图1中的黑色箭头c所示)。

如果属于消费者组,则该组中的各消费者将被分配来一个或多个分区中进行读取。如果组中的消费者数量超过了主题的分区数量,则一个消费者最多只能被分配来一个分区(如图1中红色箭头d所示)。当所有分区都分配来了消费者时,部分消费者将闲置,直来需要时才会被使用。如果组中的消费者数量小于主题的分区数量,则一个消费者可以被分配来多个分区。在任何情况下,每组最多只能有一个消费者被分配来某个特定分区。Kafka晓道,各分区的哪一条消息是最后发出的。新的订阅者将从这条消息开始读取,除非其定位来消息日志中靠前的位置。

Trafodion的Kafka消费者

本博客旨在介绍可以将Kafka数据导入Trafodion数据库的技术,同时说明为什么常用的方法不一定是最佳的方法。

Kafka消费者是一个Java程序,可以“读取”发布至Kafka的消息并根据需要对数据进行格式化,以便通过JDBC将数据“写入”Trafodion的数据库表。程序中的Kafka处理可以简要地概括如下:

  • 实例化一个Kafka类(例如,KafkaConsumer),向程序提供输入数据
  • 为该类设置属性值(例如,主题名称topic name、起始位置starting position、服务器位置broker location、超时限制timeout limits、zookeeper位置zookeeper location等)
  • 从主题中读取(poll)消息。

通常,根据具有一个或多个储备分区的主题来组织Kafka的消息发布,以便增加容量。Kafka会跟踪已成功提交至各个分区的消息的最高偏移量(offset)。当在分区上复启一个新的消费者进程时,消息提交将复原来记录的偏移量位置。关于更多信息,请参阅。因此,主题、分区数量和偏移量共同组成了分区的“复启点”。KafkaConsumer类属性设置enable。auto。commit或对KafkaConsumer。commitSync()的显式调用都会影响保存的实际偏移量值。

下面,我们来看看如何将Kafka消息导入Trafodion表。

Trafodion表

Trafodion表是一个SQL表,它是关系型SQL表来HBase表的映射。此处,我们使用了由以下DDL定义的表。

如果要对表进行分区,则需要使用PRIMARY KEY或STORE BY子句(请参阅。

此处,该行中的五列对应了发布的Kafka消息的五个字段。

JDBC技术

江苏快3在如今的IT环境中,Kafka、Trafodion和Kafka消费者可能分别在不同的集群上运行。本次测试使用了一个四节点的集群,CDH 5.9和EsgynDB v2.3在集群的所有节点上运行,Kafka 2.11安装在一个节点上,而Java消费者程序在另一个节点上运行。

Trafodion SQL支持INSERT和UPSERT DML语法。INSERT用于添加新行,UPSERT用于更新现有的匹配行(如果不存在匹配行,则添加新行)。我们比较了INSERT和UPSERT语法,还探索了INSERT-SELECT技术(使用用户定义函数,可能适用于某些情况)。

江苏快3诸如INSERT和UPSERT的DML语法会对数据库进行更改。在JDBC中,数据库更改受来事务(transaction)的保护。在默认情况下,各DML语句都作为独立的事务运行,除非连接被设置为仅在显式请求commit()上终止事务。例如,在默认情况下,10个独立的INSERT是10个事务,各INSERT执行之后,添加的各个新行立刻可见。然而,如果设置了autoCommit=off,这10个INSERT还是会对表进行更新,但是只有当执行来第10个INSERT之后调用了commit()时,才能访问新添加的10行。换句话说,如果在执行第6条INSERT之前,程序发生故障:在第一种情况下,之前新添加的5行依然可见;在第二种情况下,之前添加的行均不可访问。在JDBC中,一次性提交一组记录称为批处理(batching)。

本次测试都是在这些技术的基础上进行的,以下的代码片段是这些技术的使用示例。

测试结果

在实际的操作中,Kafka的消息来达率覆盖了流式应用来涓流馈送。如果要比较数据摄入技术的差异,使用较大的数据量可以产生较好的对比效果。因此,我们向主题的单个分区发布了300,000条144字节的消息,每次测试都使用相同的Kafka消息。

关于目标Trafodion表,请见上文。

哪条语句的性能更好?是INSERT还是UPSERT?表1显示了测试的属性设置和相应的结果。

作为基准参考,每次都要对行的添加进行计时。因此,相比逐条发送记录,批量发送记录块的好处是显而易见的。

很明显,无论块因子是多少,批量UPSERT的性能总是优于INSERT。插入300,000行的最快运行时间仅为7。8秒。根据消息大小、表的列数量、记录数量等其他因素的不同,统计数据会有所不同,但是UPSERT的性能始终优于INSERT。在我们的测试中,当批大小为4000时,UPSERT的性能最佳。您可能需要反复测试,才能找来应用程序的“性能较优”设置。

添加所有消息所需的运行时间是通过编程的方式捕捉的,该总运行时间包括了Kafka poll的1秒超时值(消息流耗尽时)。如果设置较低的超时值,UPSERT的性能数据将进一步提高。

为何会有这样的性能区别?因为INSERT语句的SQL语义要求在插入新行之前检查是否已存在该行。由于Trafodion表是HBase表,因此该操作的HBase接口一次仅答应一行。相反,由于UPSERT语句覆写所有现有的行,因此不需要进行检查。另外,UPSERT的HBase接口可以接受一组行。

关于数据加载的Trafodion技术,请参阅或(Trafodion加载和转换指南)。

关于JDBC批处理的评判:在这些测试中,由于已经创建了Kafka数据,因此在用数据填充批次时没有延迟(延迟不利于计时数据)。在操作中,Kafka数据流的来达率可能是不规则的。无论将批大小设置为多少,在达来批处理限制之前总是会发生事务延迟。您可以通过类似于以下的方法减轻事务延迟:降低批大小,进行更频繁的提交;或在发生一定次数的Kafka poll超时(即,无数据)之后,提交当前批的行——实际上,就是根据Kafka的输入负载,动态地调整批大小。

另一种方法?

UPSERT语法有另一种变体——UPSERT USING LOAD。

根据Trafodion SQL参考手册,该操作在没有事务的情况下发生,用于将数据导入空表。该语句的典型用法为:

UPSERT USING LOAD INTO target_table SELECT * FROM source_table

如何将其应用于Kafka场景?源表是什么?

通过Trafodion的用户定义函数(UDF)架构,即可实现应用。用户编码的Java过程(TMUDF)遵循指定的编码协议,以生成一个虚拟的行列组(即,一个表,该表可以在SQL查询的任何位置使用)。在这种情况下,我们需要一个TMUDF将具有消息字段的Kafka消息转换为多行的数据列。关于Trafodion UDF架构的更多信息,请参阅。。

编写了一个运行正常的Kafka消费者程序,大部分的代码就已经完成。UDF中不需要进行数据库处理(只有部分Kafka需要)。为了符合UDF的编程模型,要进行一些修改。因此,创建一个TMUDF用于读取Kafka主题的一个分区。

我们并没有深究具体的UDF逻辑,但是已经证明了Kafka日志中的300,000条消息被读取并生成了300,000行(每行包含5列)。消费者程序使用的UPSERT语句如下:

UPSERT USING LOAD INTO blogdata SELECT * FROM udf(kafread( …))

其中,UDF代码使用值列表,用于Kafka连接。

当在消费者程序中执行了该语句时,结果如表2所示:

在本例中,使用UDF是一项原子操作(所有或全部没有)。与可以被打断/复原并进行不断控制的行/批插入逻辑不同,UDF Insert-Select是不能被打断的。

因此,虽然可以接受加快加载速度,但是进行复原和复启是不利的(Kafka消息队列的位置信息无法通过UDF传回至调用者)。

结论

江苏快3对于消息速率较低的Kafka应用程序,每次通过INSERT或UPSERT向Trafodion数据库插入或更新消息可能是差不多的。但是对于消息速率较高的Kafka应用程序,UPSERT的性能比INSERT更佳。即使JDBC批处理可以提高INSERT的频次,但是UPSERT真正的优势是其面向集合的处理。以往的习惯可能很难改变,但是在考虑通过INSERT插入数据时,也不要忘记还可以挑选UPSERT。

关于消费者程序的源代码和Kafka UDF,请参阅。