跳到主要内容

沙巴体育下载博客

欢迎改变的地方

向Amazon S3发送Kafka消息

By 张贴在 工程 2020年12月9日

在这篇文章中, 沙巴体育手机版将看看集成StreamSets数据收集器引擎(SDC)的最佳实践。, 一个快速的数据摄取引擎,并深入到细节中,展示如何将Kafka消息发送到Amazon S3.

Apache Kafka是一个分布式事件流媒体平台,被数千家公司用于流媒体分析和其他关键任务应用. 这已经成为组织内大数据生态系统最常见的组成部分之一.

发送Kafka消息给Amazon S3 | 卡夫卡的消费者

在某些情况下,一个组织可能已经有一个将消息带到Kafka的现有数据管道. 在这种情况下, SDC可以扮演消费者的角色,处理所有将数据从Kafka带到需要去的地方的逻辑. 例如, 你可以将数据从Kafka消息传递到Amazon S3/HDFS/Elasticsearch或任何你选择的目的地,而不需要写任何代码. 使用streamset 卡夫卡的起源 你可以使用Kafka消息,并将它们批处理成适当大小的消息,并将它们推送到想要的目的地.

向S3发送Kafka消息Kafka消息消费者起源:

  1. 卡夫卡的消费者
  2. 卡夫卡MultiTopic消费者

为什么有两个卡夫卡起源?

假设你有一个应用程序需要从Kafka主题读取消息, 对它们进行验证, 并将结果写入另一个数据存储. 在这种情况下,您将创建订阅适当主题的消费者管道, 开始接收信息, 验证它们并编写结果.

向S3发送Kafka消息

这种做法可能会在一段时间内奏效, 但是,如果生产者向主题写入消息的速度超过了应用程序验证消息的速度,该怎么办呢?  如果您仅限于一个消费者读取和处理数据, 你的申请可能会越来越落后, 无法跟上传入消息的速度. 显然,需要从主题来衡量消费. 就像多个生产者可以写同一个主题一样, 沙巴体育手机版需要允许多个消费者从同一个主题中阅读,也需要允许多个消费者从多个主题中阅读, 在它们之间分割数据. 这就是卡夫卡多主题消费者起源派上用场的地方. 该源使用多个线程来支持并行处理数据.

卡夫卡多话题消费者起源 然后可以利用额外的处理器和内存并行运行几个消费者线程吗. Kafka将跨分区分发消息, 负载将在消费者线程之间共享.

向S3发送Kafka消息

哪一个对我来说是正确的选择?

如果您的SDC引擎运行在具有高资源可用性的更强大的机器上, 然后使用Kafka多主题原点垂直缩放.

向S3发送Kafka消息然而, 如果您计划水平扩展,这是一个更划算的选择, 然后使用Kafka消费者再次分区Kafka主题, 但这次在多个数据收集器实例上运行管道. 如果您在多个数据收集器实例上手动运行此管道的多个实例,则会产生一些额外的维护开销, 但这很容易实现 StreamSets控制中心 通过同一个作业启动多个管道实例.

卡夫卡对S3先进的功能

让沙巴体育手机版看看一些高级特性,你可以利用在两个Kafka起源.

卡夫卡的安全 

你可以配置Kafka的两个阶段——卡夫卡的消费者和Kafka Multitopic Consumer——通过以下方法安全地连接:

  • Kerberos
  • SSL / TLS
  • SSL / TLS和Kerberos

除了完成必要的任务外,启用安全性还需要在该阶段配置其他Kafka配置属性.

记录头属性 

两个Kafka起源都创建了记录头属性,其中包括记录的原始文件的信息. 特别是当原始数据处理Avro数据时,它将Avro模式包含在一个 avroSchema 记录头属性.

  • avroSchema:处理Avro数据时,提供Avro模式
  • offset:记录开始的偏移量
  • partition:记录产生的分区
  • topic:记录产生的主题

向Amazon S3发送Kafka消息所需的Key

您可以配置两个源来捕获每个Kafka消息中包含的消息键,并将它们存储在生成的记录中. Kafka消息键可以是字符串值或Avro消息, 取决于你的Kafka系统是如何配置的. 您可以将消息键值存储在记录头/字段中或两者都存储,并可以在管道处理逻辑中使用这些值或将它们写入目标系统. 如果您不需要消息键,可以丢弃它们. 卡夫卡的消费者和Kafka Multi-topic Consumer源默认丢弃消息键.

数据管道概述和实现

这里的目标是从文件系统目录读取Avro文件,并使用StreamSets Kafka Producer将它们写入Kafka主题. 然后,沙巴体育手机版将使用第二个配置了卡夫卡的消费者的管道来读取来自该主题的所有消息, 执行一组转换以屏蔽数据并确定信用卡的类型. 最后通过对信用卡类型的数据进行分区发送Kafka消息到Amazon S3,并确保存储在S3上的数据是加密的. 在本博客的第二部分,沙巴体育手机版将重新设计沙巴体育手机版的 数据管道 在Kafka消息发送到Amazon期间,用于扩展和处理大量的数据.

以下是JSON格式的数据:

{
  “transaction_date”:“dd / mm / YYYY”,
  “card_number”:"0000-0000-0000-0000",
  “card_expiry_date”:“mm / YYYY”,
  “card_security_code”:"0000",
  “purchase_amount”:"$00.00",
  “描述”:“购买的交易说明”
}

先决条件 

  • StreamSets数据收集器引擎的一个工作实例.18.1+)
  • 一个工作的Kafka实例(参见 快速入门 便于本地设置. 最后在版本1上测试.1.0,但旧的和新的版本也应该工作.)

使用Kafka producer发布数据到Kafka topic

让沙巴体育手机版创建Kafka topic——”demo-topic——通过运行此命令

bin / Kafka-topics.——create——topic demo-topic——bootstrap-server localhost:9092 ——分区 3

现在让沙巴体育手机版使用一个简单的数据管道将一些示例数据推送到这个Kafka主题. 沙巴体育手机版将从文件系统目录读取Avro文件,并使用StreamSets Kafka Producer将它们写入到Kafka主题中 SDC记录数据格式. 然后使用另一个数据管道从Kafka读取署记录数据,并将其写入Elasticsearch,并将数据转换为S3的Avro.

目录到Kafka生产者

使用Kafka消息并将它们存储在Amazon S3中

Kafka消费者到Amazon S3

卡夫卡的消费者

  • 让沙巴体育手机版配置卡夫卡的起源来使用来自本地Kafka setup和o的消息n 数据格式 选项卡选择 署记录.

场转换器

  • 在Avro中,卡号字段被定义为一个整数. 沙巴体育手机版将把它转换成一个字符串值. 所以类型的/card_number'在' Fields to Convert '文本框中设置为' Convert to type '中的String类型. 其余保持默认值.

Jython评估者

  • 在这个阶段,沙巴体育手机版将使用一小段python代码来查看卡号的前几位数,并找出它是什么类型的卡. 沙巴体育手机版将把那个卡片类型添加到一个名为'credit_card_type‘.

场戴面具的人

  • 该过程的最后一步是屏蔽卡号,以便将卡的最后4位数字作为数据存储的全部内容.

向Amazon S3写入Kafka消息

  • 沙巴体育手机版将数据转换回Avro格式,并将其存储在S3桶中.
  • On ‘数据格式“选择”Avro”和“在管道配置“对”Avro模式位置”. 然后为Avro schema指定以下模式:
{“名称” : “有条件现金转移支付.avro”,
 “类型”: “记录”,
 “名称”: “有条件现金转移支付”,
 “医生”: “测试信用卡交易”,
 “字段”: [
            {“名称”: “transaction_date”, “类型”: “字符串”},
            {“名称”: “card_number”, “类型”: “字符串”},
            {“名称”: “card_expiry_date”, “类型”: “字符串”},
            {“名称”: “card_security_code”, “类型”: “字符串”},
            {“名称”: “purchase_amount”, “类型”: “字符串”},
            {“名称”: “描述”, “类型”: “字符串”}
           ]
}
  • 为了节省S3 bucket上的存储空间,让沙巴体育手机版在写入数据时压缩数据. Select BZip2 随着 Avro压缩编解码器.
  • 的数据写入分区。credit_card_type’场,沙巴体育手机版将使用 ${记录:价值(/ credit_card_type)} 表达式为 分区的前缀. 使用此表达式,目标将基于credit_card_type” 记录中的值.
  • 保护敏感数据:您可以使用以下任何选项对Amazon S3上的服务器端加密来保护敏感数据. 例如,在沙巴体育手机版的案例中,信用卡号码.
    • Amazon s3托管加密密钥(SSE-S3)
    • 托管加密密钥(SSE-KMS)
    • 客户提供的加密密钥(SSE-C)

 

StreamSets使数据工程师能够构建端到端智能数据管道. 把你的时间花在构建、实现和创新上,而不是维护、重写和修复.

Amazon S3上的输出

注意,S3上的输出将被'credit_card_type”

Amazon S3输出

针对大工作负载的数据管道重新设计

现在让沙巴体育手机版假设你必须这样做 扩展上述解决方案 假设你正在处理大量的数据,有多个上游应用程序正在写入多个Kafka主题. 因此,生产者向主题写入消息的速度已经超过了该管道消耗消息的速度. 

也假设有一个更多的上游管道/应用程序生成类似的信用卡数据和存储在Kafka主题的信息'demo-topic-2`.

bin / Kafka-topics.Sh——create——topic demo-topic-2 ——bootstrap-server localhost:9092 ——分区 2

因此,与其从头开始重新创建整个数据管道,沙巴体育手机版可以很容易地重新设计现有的管道,方法是用Kafka Multitopic Consumer origin替换源. 沙巴体育手机版可以通过增加线程数量来垂直扩展管道. 如果需要的话,沙巴体育手机版可以同时阅读多个主题.

Kafka Multitopic到Amazon S3

请注意,线程的数量应该始终小于或等于沙巴体育手机版正在读取的分区的数量,以实现更好的并行性. 这里沙巴体育手机版有一个 demo-topic3个分区 而且 demo-topic-22分区. 因此,沙巴体育手机版可以将线程数设置为5,以实现更高的并行度.

请注意:样品 data 而且 管道 可以在GitHub上找到. 

结论

在这个博客中,沙巴体育手机版学习了如何使用 StreamSets 作为一个Kafka consumer,以及何时选择Kafka consumer origin vs Kafka Multitopic consumer origin来处理大量的Kafka消息并利用并行性. 沙巴体育手机版还探索了AWS S3的各种目标功能,如分区和服务器端加密. 

以下是一些资源,可以帮助您开始设计和部署数据管道的旅程:

回到顶部

沙巴体育手机版使用cookie来改善您对沙巴体育手机版网站的体验. 单击“允许所有人同意”并继续访问沙巴体育手机版的网站. 隐私政策

(类^ =“wpforms——”)
(类^ =“wpforms——”)