Back to Releases
v2.228.0 2025年11月24日

AWS CDK v2.228.0 リリース解説

Lambda Kafka イベントソースにプロビジョニングポーラー設定、ポーラーグループ名、失敗時の送信先(KafkaDlq、SNS、SQS、S3)のサポートが追加されました。

lambdamskkafkacloudformation

概要

AWS CDK v2.228.0では、Lambda Kafkaイベントソース(MSKおよびセルフマネージドKafka)に重要な新機能が追加されました。プロビジョニングポーラーの設定、ポーラーグループによるコスト最適化、そして失敗時の送信先として複数のオプション(Kafkaトピック、SNS、SQS、S3)がサポートされました。また、CloudFormation Includeモジュールのバグ修正も含まれています。

新機能

Lambda Kafka イベントソースの新機能

このリリースでは、Lambda Kafka イベントソース(MSK、セルフマネージドKafka)に複数の新機能が追加されました。

1. プロビジョニングポーラー設定(Provisioned Poller Config)

MSKおよびセルフマネージドKafkaイベントソースに、プロビジョニングポーラーの設定が追加されました。これにより、イベントソースマッピングのポーリング処理を最適化し、コストとパフォーマンスのバランスを調整できます。

設定可能なプロパティ:

import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as lambda from 'aws-cdk-lib/aws-lambda';

declare const clusterArn: string;
declare const myFunction: lambda.Function;

myFunction.addEventSource(new ManagedKafkaEventSource({
  clusterArn,
  topic: 'my-topic',
  startingPosition: lambda.StartingPosition.LATEST,
  // プロビジョニングポーラーの設定
  provisionedPollerConfig: {
    minimumPollers: 2,              // 最小ポーラー数(デフォルト: 1、範囲: 1-200)
    maximumPollers: 10,             // 最大ポーラー数(デフォルト: 200、範囲: 1-2000)
  },
}));

セルフマネージドKafkaでも同様に使用できます:

import { SelfManagedKafkaEventSource, AuthenticationMethod } from 'aws-cdk-lib/aws-lambda-event-sources';
import { ISecret } from 'aws-cdk-lib/aws-secretsmanager';

declare const myFunction: lambda.Function;
declare const kafkaCredentials: ISecret;

myFunction.addEventSource(new SelfManagedKafkaEventSource({
  bootstrapServers: ['kafka-broker1.example.com:9092', 'kafka-broker2.example.com:9092'],
  topic: 'events-topic',
  secret: kafkaCredentials,
  startingPosition: lambda.StartingPosition.LATEST,
  authenticationMethod: AuthenticationMethod.SASL_SCRAM_512_AUTH,
  // プロビジョニングポーラーの設定
  provisionedPollerConfig: {
    minimumPollers: 1,
    maximumPollers: 8,
  },
}));

設定の制約:

  • minimumPollers: 1〜200の範囲で設定可能
  • maximumPollers: 1〜2000の範囲で設定可能
  • minimumPollersmaximumPollers 以下である必要があります

2. ポーラーグループ名(Poller Group Name)

複数のKafkaイベントソース間でプロビジョニングポーラーを共有することで、コストを削減できる新機能です。同じ pollerGroupName を設定した複数のイベントソースマッピングは、同じポーラー容量を共有します。

使用例:

import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

declare const clusterArn: string;
declare const ordersFunction: lambda.Function;
declare const inventoryFunction: lambda.Function;

// 注文処理関数
ordersFunction.addEventSource(new ManagedKafkaEventSource({
  clusterArn,
  topic: 'orders-topic',
  startingPosition: lambda.StartingPosition.LATEST,
  provisionedPollerConfig: {
    minimumPollers: 2,
    maximumPollers: 10,
    pollerGroupName: 'shared-kafka-pollers',  // ポーラーグループ名を指定
  },
}));

// 在庫管理関数(同じポーラーグループを使用してコスト削減)
inventoryFunction.addEventSource(new ManagedKafkaEventSource({
  clusterArn,
  topic: 'inventory-topic',
  startingPosition: lambda.StartingPosition.LATEST,
  provisionedPollerConfig: {
    minimumPollers: 2,
    maximumPollers: 10,
    pollerGroupName: 'shared-kafka-pollers',  // 同じグループ名でポーラーを共有
  },
}));

この機能は、専用のポーリング容量を必要としない複数のKafkaトピックがある場合に特に有効です。

3. 失敗時の送信先(On Failure Destination)

Kafkaイベントソースで処理に失敗したレコードを送信する失敗時の送信先として、以下の4つのオプションがサポートされました:

  1. Kafka DLQ - Kafkaトピックへの送信
  2. SNS トピック - SNSトピックへの送信
  3. SQS キュー - SQSキューへの送信
  4. S3 バケット - S3バケットへの送信
Kafka DLQ の使用

新しい KafkaDlq クラスを使用して、失敗したレコードを別のKafkaトピックに送信できます:

import { SelfManagedKafkaEventSource, KafkaDlq } from 'aws-cdk-lib/aws-lambda-event-sources';

const bootstrapServers = ['kafka-broker:9092'];
const topic = 'some-cool-topic';

declare const myFunction: lambda.Function;

myFunction.addEventSource(new SelfManagedKafkaEventSource({
  bootstrapServers,
  topic,
  startingPosition: lambda.StartingPosition.TRIM_HORIZON,
  // Kafkaトピックへの失敗時の送信先を設定
  onFailure: new KafkaDlq('error-topic'),  // トピック名を指定
  provisionedPollerConfig: {
    minimumPollers: 1,
    maximumPollers: 1,
  },
}));

KafkaDlq のトピック名要件:

  • 英数字、ドット(.)、アンダースコア(_)、ハイフン(-)のみ使用可能
  • 空文字列は不可
  • kafka:// プレフィックスは自動的に追加されます(手動で追加することも可能)
S3 バケットの使用

S3バケットを失敗時の送信先として使用することも可能です:

import { ManagedKafkaEventSource, S3OnFailureDestination } from 'aws-cdk-lib/aws-lambda-event-sources';
import { IBucket } from 'aws-cdk-lib/aws-s3';

const clusterArn = 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4';
const topic = 'some-cool-topic';

declare const bucket: IBucket;
declare const myFunction: lambda.Function;

const s3OnFailureDestination = new S3OnFailureDestination(bucket);

myFunction.addEventSource(new ManagedKafkaEventSource({
  clusterArn,
  topic,
  startingPosition: lambda.StartingPosition.TRIM_HORIZON,
  onFailure: s3OnFailureDestination,  // S3バケットを失敗時の送信先に設定
  provisionedPollerConfig: {
    minimumPollers: 1,
    maximumPollers: 3,
  },
}));
SNS と SQS の使用

既存のSNSトピックやSQSキューも失敗時の送信先として使用できます(詳細はLambdaモジュールのドキュメントを参照)。

統合例:すべての新機能を使用

すべての新機能を組み合わせた例:

import { ManagedKafkaEventSource, KafkaDlq } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as lambda from 'aws-cdk-lib/aws-lambda';

declare const clusterArn: string;
declare const myFunction: lambda.Function;

myFunction.addEventSource(new ManagedKafkaEventSource({
  clusterArn,
  topic: 'orders-topic',
  startingPosition: lambda.StartingPosition.TRIM_HORIZON,
  // イベントフィルタリング
  filters: [
    lambda.FilterCriteria.filter({
      stringEquals: lambda.FilterRule.isEqual('order'),
    }),
  ],
  // プロビジョニングポーラー設定
  provisionedPollerConfig: {
    minimumPollers: 2,
    maximumPollers: 10,
    pollerGroupName: 'shared-kafka-pollers',  // コスト最適化のためのグループ化
  },
  // 失敗時の送信先
  onFailure: new KafkaDlq('orders-dlq-topic'),
}));

バグ修正

CloudFormation Include: 組み込み関数使用時の TypeError を修正

CloudFormation Includeモジュールで、組み込み関数(Intrinsic Functions)を含むテンプレートをインポートする際に TypeError が発生する問題が修正されました。

問題の詳細:

  • CDK v2では、L1コンストラクトのプロパティに型付き関係が導入されました
  • 配列プロパティの処理で、値がトークン(未解決の値)である可能性が考慮されていませんでした
  • CloudFormation Includeモジュールでは、組み込み関数がトークンとして表現されるため、この問題が顕在化していました

修正内容:

  • 配列を .forEach() で処理する前に、値が解決可能(Resolvable)かどうかをチェックするようになりました
  • これにより、CloudFormationテンプレートに組み込み関数が含まれている場合でも、正しくインポートできるようになりました

この修正により、以下のようなCloudFormationテンプレートを安全にインポートできます:

import * as cfninc from 'aws-cdk-lib/cloudformation-include';

new cfninc.CfnInclude(this, 'Template', {
  templateFile: 'template-with-intrinsic-functions.json',
});

組み込み関数(Fn::GetAttFn::JoinRef など)を含むテンプレートでも、エラーなく処理されるようになりました。

関連PR: #36157 関連Issue: #36140, #35838

まとめ

AWS CDK v2.228.0は、Lambda Kafkaイベントソースの機能強化に重点を置いたリリースです。プロビジョニングポーラー設定により、イベント処理のパフォーマンスとコストを細かく調整できるようになり、ポーラーグループ機能により複数のイベントソース間でリソースを効率的に共有できるようになりました。また、失敗時の送信先として4つのオプション(Kafka、SNS、SQS、S3)がサポートされたことで、エラーハンドリングの柔軟性が大幅に向上しました。CloudFormation Includeモジュールのバグ修正により、より複雑なテンプレートの移行も安定して実行できるようになっています。

Kafkaベースのイベント駆動アーキテクチャを構築している場合、これらの新機能を活用することで、より効率的でコスト最適化されたシステムを実現できるでしょう。