概要
AWS CDK v2.228.0では、Lambda Kafkaイベントソース(MSKおよびセルフマネージドKafka)に重要な新機能が追加されました。プロビジョニングポーラーの設定、ポーラーグループによるコスト最適化、そして失敗時の送信先として複数のオプション(Kafkaトピック、SNS、SQS、S3)がサポートされました。また、CloudFormation Includeモジュールのバグ修正も含まれています。
新機能
Lambda Kafka イベントソースの新機能
このリリースでは、Lambda Kafka イベントソース(MSK、セルフマネージドKafka)に複数の新機能が追加されました。
1. プロビジョニングポーラー設定(Provisioned Poller Config)
MSKおよびセルフマネージドKafkaイベントソースに、プロビジョニングポーラーの設定が追加されました。これにより、イベントソースマッピングのポーリング処理を最適化し、コストとパフォーマンスのバランスを調整できます。
設定可能なプロパティ:
import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as lambda from 'aws-cdk-lib/aws-lambda';
declare const clusterArn: string;
declare const myFunction: lambda.Function;
myFunction.addEventSource(new ManagedKafkaEventSource({
clusterArn,
topic: 'my-topic',
startingPosition: lambda.StartingPosition.LATEST,
// プロビジョニングポーラーの設定
provisionedPollerConfig: {
minimumPollers: 2, // 最小ポーラー数(デフォルト: 1、範囲: 1-200)
maximumPollers: 10, // 最大ポーラー数(デフォルト: 200、範囲: 1-2000)
},
}));
セルフマネージドKafkaでも同様に使用できます:
import { SelfManagedKafkaEventSource, AuthenticationMethod } from 'aws-cdk-lib/aws-lambda-event-sources';
import { ISecret } from 'aws-cdk-lib/aws-secretsmanager';
declare const myFunction: lambda.Function;
declare const kafkaCredentials: ISecret;
myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers: ['kafka-broker1.example.com:9092', 'kafka-broker2.example.com:9092'],
topic: 'events-topic',
secret: kafkaCredentials,
startingPosition: lambda.StartingPosition.LATEST,
authenticationMethod: AuthenticationMethod.SASL_SCRAM_512_AUTH,
// プロビジョニングポーラーの設定
provisionedPollerConfig: {
minimumPollers: 1,
maximumPollers: 8,
},
}));
設定の制約:
minimumPollers: 1〜200の範囲で設定可能maximumPollers: 1〜2000の範囲で設定可能minimumPollersはmaximumPollers以下である必要があります
2. ポーラーグループ名(Poller Group Name)
複数のKafkaイベントソース間でプロビジョニングポーラーを共有することで、コストを削減できる新機能です。同じ pollerGroupName を設定した複数のイベントソースマッピングは、同じポーラー容量を共有します。
使用例:
import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
declare const clusterArn: string;
declare const ordersFunction: lambda.Function;
declare const inventoryFunction: lambda.Function;
// 注文処理関数
ordersFunction.addEventSource(new ManagedKafkaEventSource({
clusterArn,
topic: 'orders-topic',
startingPosition: lambda.StartingPosition.LATEST,
provisionedPollerConfig: {
minimumPollers: 2,
maximumPollers: 10,
pollerGroupName: 'shared-kafka-pollers', // ポーラーグループ名を指定
},
}));
// 在庫管理関数(同じポーラーグループを使用してコスト削減)
inventoryFunction.addEventSource(new ManagedKafkaEventSource({
clusterArn,
topic: 'inventory-topic',
startingPosition: lambda.StartingPosition.LATEST,
provisionedPollerConfig: {
minimumPollers: 2,
maximumPollers: 10,
pollerGroupName: 'shared-kafka-pollers', // 同じグループ名でポーラーを共有
},
}));
この機能は、専用のポーリング容量を必要としない複数のKafkaトピックがある場合に特に有効です。
3. 失敗時の送信先(On Failure Destination)
Kafkaイベントソースで処理に失敗したレコードを送信する失敗時の送信先として、以下の4つのオプションがサポートされました:
- Kafka DLQ - Kafkaトピックへの送信
- SNS トピック - SNSトピックへの送信
- SQS キュー - SQSキューへの送信
- S3 バケット - S3バケットへの送信
Kafka DLQ の使用
新しい KafkaDlq クラスを使用して、失敗したレコードを別のKafkaトピックに送信できます:
import { SelfManagedKafkaEventSource, KafkaDlq } from 'aws-cdk-lib/aws-lambda-event-sources';
const bootstrapServers = ['kafka-broker:9092'];
const topic = 'some-cool-topic';
declare const myFunction: lambda.Function;
myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers,
topic,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
// Kafkaトピックへの失敗時の送信先を設定
onFailure: new KafkaDlq('error-topic'), // トピック名を指定
provisionedPollerConfig: {
minimumPollers: 1,
maximumPollers: 1,
},
}));
KafkaDlq のトピック名要件:
- 英数字、ドット(.)、アンダースコア(_)、ハイフン(-)のみ使用可能
- 空文字列は不可
kafka://プレフィックスは自動的に追加されます(手動で追加することも可能)
S3 バケットの使用
S3バケットを失敗時の送信先として使用することも可能です:
import { ManagedKafkaEventSource, S3OnFailureDestination } from 'aws-cdk-lib/aws-lambda-event-sources';
import { IBucket } from 'aws-cdk-lib/aws-s3';
const clusterArn = 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4';
const topic = 'some-cool-topic';
declare const bucket: IBucket;
declare const myFunction: lambda.Function;
const s3OnFailureDestination = new S3OnFailureDestination(bucket);
myFunction.addEventSource(new ManagedKafkaEventSource({
clusterArn,
topic,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
onFailure: s3OnFailureDestination, // S3バケットを失敗時の送信先に設定
provisionedPollerConfig: {
minimumPollers: 1,
maximumPollers: 3,
},
}));
SNS と SQS の使用
既存のSNSトピックやSQSキューも失敗時の送信先として使用できます(詳細はLambdaモジュールのドキュメントを参照)。
統合例:すべての新機能を使用
すべての新機能を組み合わせた例:
import { ManagedKafkaEventSource, KafkaDlq } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as lambda from 'aws-cdk-lib/aws-lambda';
declare const clusterArn: string;
declare const myFunction: lambda.Function;
myFunction.addEventSource(new ManagedKafkaEventSource({
clusterArn,
topic: 'orders-topic',
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
// イベントフィルタリング
filters: [
lambda.FilterCriteria.filter({
stringEquals: lambda.FilterRule.isEqual('order'),
}),
],
// プロビジョニングポーラー設定
provisionedPollerConfig: {
minimumPollers: 2,
maximumPollers: 10,
pollerGroupName: 'shared-kafka-pollers', // コスト最適化のためのグループ化
},
// 失敗時の送信先
onFailure: new KafkaDlq('orders-dlq-topic'),
}));
バグ修正
CloudFormation Include: 組み込み関数使用時の TypeError を修正
CloudFormation Includeモジュールで、組み込み関数(Intrinsic Functions)を含むテンプレートをインポートする際に TypeError が発生する問題が修正されました。
問題の詳細:
- CDK v2では、L1コンストラクトのプロパティに型付き関係が導入されました
- 配列プロパティの処理で、値がトークン(未解決の値)である可能性が考慮されていませんでした
- CloudFormation Includeモジュールでは、組み込み関数がトークンとして表現されるため、この問題が顕在化していました
修正内容:
- 配列を
.forEach()で処理する前に、値が解決可能(Resolvable)かどうかをチェックするようになりました - これにより、CloudFormationテンプレートに組み込み関数が含まれている場合でも、正しくインポートできるようになりました
この修正により、以下のようなCloudFormationテンプレートを安全にインポートできます:
import * as cfninc from 'aws-cdk-lib/cloudformation-include';
new cfninc.CfnInclude(this, 'Template', {
templateFile: 'template-with-intrinsic-functions.json',
});
組み込み関数(Fn::GetAtt、Fn::Join、Ref など)を含むテンプレートでも、エラーなく処理されるようになりました。
関連PR: #36157 関連Issue: #36140, #35838
まとめ
AWS CDK v2.228.0は、Lambda Kafkaイベントソースの機能強化に重点を置いたリリースです。プロビジョニングポーラー設定により、イベント処理のパフォーマンスとコストを細かく調整できるようになり、ポーラーグループ機能により複数のイベントソース間でリソースを効率的に共有できるようになりました。また、失敗時の送信先として4つのオプション(Kafka、SNS、SQS、S3)がサポートされたことで、エラーハンドリングの柔軟性が大幅に向上しました。CloudFormation Includeモジュールのバグ修正により、より複雑なテンプレートの移行も安定して実行できるようになっています。
Kafkaベースのイベント駆動アーキテクチャを構築している場合、これらの新機能を活用することで、より効率的でコスト最適化されたシステムを実現できるでしょう。