发送消息到Kafka
最简单的发送消息方式如下:
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
这里做了如下几件事:
- 我们创建了一个 ProducerRecord,并且指定了主题以及消息的 key/value。主题总是字符串类型的,但key/value则可以是任意类型,在本例中也是字符串。需要注意的是,这里的 key/value 的类型需要与 serializer和生产者的类型匹配。
- 使用 send() 方法来发送消息,该方法会返回一个 RecordMetadata的 Future 对象,但由于我们没有跟踪Future 对象,因此并不知道发送结果。如前所述,这种方式可能会丢失消息。
- 虽然我们忽略了发送消息到 broker 的异常,但是我们调用 send() 方法时仍然可能会遇到一些异常,例如序列化异常、发送缓冲区溢出异常等等。
同步发送消息
同步发送方式可以简单修改如下:
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
注意到,这里使用了 Future.get() 来获取发送结果,如果发送消息失败则会抛出异常,否则返回一个RecordMetadata 对象。发送失败异常包含:1)broker 返回不可恢复异常,生产者直接抛出该异常;2)对于broke r其他异常,生产者会进行重试,如果重试超过一定次数仍不成功则抛出异常。
可恢复异常指的是,如果生产者进行重试可能会成功,例如连接异常;不可恢复异常则是进行重试也不会成功的异常,例如消息内容过大。
异步发送消息
首先了解下什么场景下需要异步发送消息。假如生产者与 broker 之间的网络延时为 10ms,我们发送 100 条消息,发送每条消息都等待结果,那么需要1秒的时间。而如果我们采用异步的方式,几乎没有任何耗时,而且我们还可以通过回调知道消息的发送结果。
异步发送消息的样例如下:
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());
异步回调的类需要实现 org.apache.kafka.clients.producer.Callback 接口,这个接口只有一个 onCompletion 方法。当 Kafka 返回异常时,异常值不为null,代码中只是简单的打印,但我们可以采取其他处理方式。
生产者的配置
上面我们只配置了 bootstrap.servers 和序列化类,其实生产者还有很多配置,上面只是使用了默认值。下面来看下这些配置参数。
acks
acks控制多少个副本必须写入消息后生产者才能认为写入成功,这个参数对消息丢失可能性有很大影响。这个参数有三种取值:
- acks=0:生产者把消息发送到 broker 即认为成功,不等待broker的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。
- acks=1:生产者会在该分区的群首(leader)写入消息并返回成功后,认为消息发送成功。如果群首写入消息失败,生产者会收到错误响应并进行重试。这种方式能够一定程度避免消息丢失,但如果群首宕机时该消息没有复制到其他副本,那么该消息还是会丢失。另外,如果我们使用同步方式来发送,延迟会比前一种方式大大增加(至少增加一个网络往返时间);如果使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。
- acks=all:生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的。
buffer.memory
这个参数设置生产者缓冲发送的消息的内存大小,如果应用调用send方法的速度大于生产者发送的速度,那么调用会阻塞或者抛出异常,具体行为取决于 block.on.buffer.full(这个参数在0.9.0.0版本被max.block.ms代替,允许抛出异常前等待一定时间)参数。
compresstion.type
默认情况下消息是不压缩的,这个参数可以指定使用消息压缩,参数可以取值为 snappy、gzip 或者 lz4。snappy压缩算法由 Google 研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip 消耗更多的 CPU 资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和 Kafka 存储成本。
retries
当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际情况中,这个参数需要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举群首的时间长,这样可以避免生产者过早结束重试导致失败。
batch.size
当多条消息发送到一个分区时,生产者会进行批量发送,这个参数指定了批量消息的大小上限(以字节为单位)。当批量消息达到这个大小时,生产者会一起发送到 broker;但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。
linger.ms
这个参数指定生产者在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小,到达时间后生产者也会发送批量消息到 broker。默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。
client.id
这个参数可以是任意字符串,它是broker用来识别消息是来自哪个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。
max.in.flight.requests.per.connection
这个参数指定生产者可以发送多少消息到 broker 并且等待响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。设置为 1,可以保证发送到broker 的顺序和调用 send 方法顺序一致,即便出现失败重试的情况也是如此。
timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms
这些参数控制生产者等待 broker 的响应时间。request.timeout.ms 指定发送数据的等待响应时间,metadata.fetch.timeout.ms 指定获取元数据(例如获取分区的群首信息)的等待响应时间。timeout.ms 则指定broker 在返回结果前等待其他副本(与 acks 参数相关)响应的时间,如果时间到了但其他副本没有响应结果,则返回消息写入失败。
max.block.ms
这个参数指定应用调用 send 方法或者获取元数据方法(例如 partitionFor)时的阻塞时间,超过此时间则抛出timeout 异常。
max.request.size
这个参数限制生产者发送数据包的大小,数据包的大小与消息的大小、消息数相关。如果我们指定了最大数据包大小为1M,那么最大的消息大小为1M,或者能够最多批量发送 1000 条消息大小为 1K 的消息。另外,broker 也有message.max.bytes 参数来控制接收的数据包大小。在实际中,建议这些参数值是匹配的,避免生产者发送了超过 broker 限定的数据大小。
receive.buffer.bytes, send.buffer.bytes
这两个参数设置用来发送/接收数据的 TCP 连接的缓冲区,如果设置为 -1 则使用操作系统自身的默认值。如果生产者与broker在不同的数据中心,建议提高这个值,因为不同数据中心往往延迟比较大。
最后讨论下顺序保证。Kafka 保证分区的顺序,也就是说,如果生产者以一定的顺序发送消息到 Kafka 的某个分区,那么 Kafka 在分区内部保持此顺序,而且消费者也按照同样的顺序消费。但是,应用调用send方法的顺序和实际发送消息的顺序不一定是一致的。举个例子,如果 retries 参数不为 0,而 max.in.flights.requests.per.session参数大于 1,那么有可能第一个批量消息写入失败,但是第二个批量消息写入成功,然后第一个批量消息重试写入成功,那么这个顺序乱序的。因此,如果需要保证消息顺序,建议设置 max.in.flights.requests.per.session 为 1,这样可以在第一个批量消息发送失败重试时,第二个批量消息需要等待。