Introduction:

If you’re building ad or billing pipelines, “close enough” is not good enough.

Every impression, click, and conversion event represents real money. Losing events under‑charges. Double‑counting over‑charges. Both are unacceptable.

At the same time, downstream systems (budget pacing, bidding, dashboards) want near real‑time data, not yesterday’s batch.

This post walks through a concrete architecture and implementation for:

…while maintaining exactly‑once semantics end‑to‑end.

The Problem: Money Events in Motion

Typical event types:

We want to:

Building Blocks

We’ll use:

You can swap components (e.g., Druid for Pinot, BigQuery/Snowflake for the warehouse); the patterns stay the same.

Event Model and Kafka Topics

We’ll assume Protobuf or Avro for schema‑managed messages. For simplicity, here’s a JSON/Avro‑style schema for the raw ad events topic:

// Topic: ad_events_raw
{
  "type": "record",
  "name": "AdEvent",
  "fields": [
    { "name": "event_id", "type": "string" },          // client-side UUID
    { "name": "user_id",  "type": ["null", "string"], "default": null },
    { "name": "ad_id",    "type": "string" },
    { "name": "campaign_id", "type": "string" },
    { "name": "event_type", "type": { "type": "enum", "name": "EventType",
      "symbols": ["IMPRESSION", "CLICK"] } },
    { "name": "ts",       "type": "long" },            // event time (ms)
    { "name": "region",   "type": "string" },
    { "name": "meta",     "type": ["null", { "type": "map", "values": "string" }],
      "default": null }
  ]
}

Kafka configs for transactional producers (Flink’s Kafka sinks will use this):

# Kafka broker config (cluster-wide)
transaction.state.log.min.isr=3
transaction.state.log.replication.factor=3

# Producer config (used by Flink Kafka sink)
enable.idempotence=true
acks=all
retries=10
max.in.flight.requests.per.connection=1
transactional.id=flink-ad-agg-job-1

High-Level Job Graph

We will have three Flink jobs:

  1. Aggregation Job
    • Consumes ad_events_raw
    • Cleans & dedupes
    • Aggregates into 1‑minute buckets
    • Writes aggregated metrics to ad_metrics_agg (Kafka) with record UUIDs
  2. Attribution Job
    • Consumes conversion_events (orders)
    • Looks up prior ad events
    • Emits attributed conversions to ad_conversions_attr
  3. Union & Load Job
    • Consumes ad_metrics_agg + ad_conversions_attr
    • Writes to Pinot (real‑time) and the warehouse

All Kafka ↔ Flink paths use exactly‑once semantics via Flink checkpoints + Kafka transactions.

We rely on three mechanisms:

  1. Flink’s exactly‑once state + two‑phase commit sink

In code (Scala/Java):

env.enableCheckpointing(60_000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000L);
env.getCheckpointConfig().setCheckpointTimeout(5 * 60_000L);
env.setStateBackend(new RocksDBStateBackend("hdfs://checkpoints/ad-pipelines"));

  1. Kafka transactions + read_committed

Flink’s KafkaSink writes in transactions that are committed when a checkpoint completes:

KafkaSink<AggregatedMetric> aggSink =
    KafkaSink.<AggregatedMetric>builder()
        .setBootstrapServers(kafkaBrokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("ad_metrics_agg")
            .setValueSerializationSchema(new AggregatedMetricSerde())
            .build())
        .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .setTransactionalIdPrefix("flink-ad-agg-")
        .build();

Downstream consumers (Pinot, ETL jobs) must use:

isolation.level=read_committed

  1. Record‑level idempotency

Aggregated/attributed records use stable IDs:

String recordId = UUID.nameUUIDFromBytes(
    (adId + "|" + minuteBucket + "|" + region).getBytes(StandardCharsets.UTF_8)
).toString();

We store this record_id field and use it as:

So if Flink replays, we just overwrite the same logical record.

Job 1: Aggregation – From Noisy Stream to Clean Buckets

Let’s look at the Flink job in more detail (Scala, but Java is similar).

1. Source and basic cleansing

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE)

val source: KafkaSource[AdEvent] =
  KafkaSource.builder[AdEvent]()
    .setBootstrapServers(kafkaBrokers)
    .setTopics("ad_events_raw")
    .setGroupId("ad-agg-job")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new AdEventDeserializer)
    .build()

val rawEvents: DataStream[AdEvent] = env.fromSource(
  source,
  WatermarkStrategy
    .forBoundedOutOfOrderness[AdEvent](java.time.Duration.ofSeconds(30))
    .withTimestampAssigner((e: AdEvent, _: Long) => e.ts),
  "ad_events_raw"
)

// Filter invalid / stale events
val validEvents = rawEvents
  .filter(e => e.adId != null && e.eventType != null)
  .filter(e => isRecent(e.ts))  // e.g., within 7 days

2. Deduplication with keyed state

We assume event_id is client‑side unique. We’ll keep a small TTL cache per user/ad to drop duplicates.

case class DedupedAdEvent(e: AdEvent)

val deduped: DataStream[AdEvent] = validEvents
  .keyBy(_.eventId)
  .process(new KeyedProcessFunction[String, AdEvent, AdEvent] {
    lazy val seen: ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("seen-ts", classOf[Long])
    )

    override def processElement(
        value: AdEvent,
        ctx: KeyedProcessFunction[String, AdEvent, AdEvent]#Context,
        out: Collector[AdEvent]): Unit = {

      val lastSeen = seen.value()
      if (lastSeen == 0L || isNewer(value.ts, lastSeen)) {
        seen.update(value.ts)
        out.collect(value)
      }
      // else: drop duplicate / older retry
    }
  })

In production, you’d use a more space‑efficient structure (Bloom filter, TTL, etc.), but this illustrates the idea.

3. Aggregation into 1‑minute buckets

Define a case class for the aggregate:

case class AdMetricAgg(
  recordId: String,
  adId: String,
  campaignId: String,
  region: String,
  minuteBucket: Long,
  impressions: Long,
  clicks: Long,
  updatedAt: Long
)

Aggregation:

val keyed = deduped
  .map(e => (e, truncateToMinute(e.ts)))
  .keyBy { case (e, minuteBucket) => (e.adId, minuteBucket, e.region) }

val aggregated: DataStream[AdMetricAgg] =
  keyed
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction[(AdEvent, Long), AdMetricAgg, (String, Long, String), TimeWindow] {
      override def process(
          key: (String, Long, String),
          context: Context,
          elements: Iterable[(AdEvent, Long)],
          out: Collector[AdMetricAgg]): Unit = {

        val (adId, minuteBucket, region) = key
        var imps = 0L
        var clicks = 0L

        elements.foreach { case (e, _) =>
          e.eventType match {
            case IMPRESSION => imps += 1
            case CLICK      => clicks += 1
          }
        }

        val recordId = UUID.nameUUIDFromBytes(
          (adId + "|" + minuteBucket + "|" + region).getBytes(StandardCharsets.UTF_8)
        ).toString

        out.collect(AdMetricAgg(
          recordId = recordId,
          adId = adId,
          campaignId = elements.head._1.campaignId,
          region = region,
          minuteBucket = minuteBucket,
          impressions = imps,
          clicks = clicks,
          updatedAt = System.currentTimeMillis()
        ))
      }
    })

4. Transactional sink to Kafka

val aggSink: KafkaSink[AdMetricAgg] =
  KafkaSink.builder[AdMetricAgg]()
    .setBootstrapServers(kafkaBrokers)
    .setRecordSerializer(
      KafkaRecordSerializationSchema.builder[AdMetricAgg]()
        .setTopic("ad_metrics_agg")
        .setValueSerializationSchema(new AdMetricAggSerializer)
        .build()
    )
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-ad-agg-")
    .build()

aggregated.sinkTo(aggSink)

env.execute("ad-aggregation-job")

At this point, we have exactly‑once, deduped, 1‑minute aggregates in Kafka.

Job 2: Attribution – Connecting Orders to Ads

We’ll assume a separate Kafka topic conversion_events:

// Topic: conversion_events
{
  "conversion_id": "string",
  "user_id": "string",
  "order_id": "string",
  "amount": "double",
  "ts": "long",
  "region": "string"
}

We need to join conversions with prior ad events. The simplest design is:

Illustrative Flink snippet (pseudocode for KV lookup):

case class AttributedConversion(
  recordId: String,
  adId: String,
  campaignId: String,
  conversionId: String,
  orderId: String,
  amount: Double,
  minuteBucket: Long,
  region: String,
  ts: Long
)

val conversions: DataStream[ConversionEvent] = // from Kafka

val attributed: DataStream[AttributedConversion] =
  conversions
    .process(new ProcessFunction[ConversionEvent, AttributedConversion] {
      override def processElement(
          value: ConversionEvent,
          ctx: Context,
          out: Collector[AttributedConversion]): Unit = {

        // Example: lookup by user_id in external KV
        val adEvents: List[AdEvent] = adEventStore.lookupByUser(value.userId)

        adEvents.foreach { ad =>
          val minuteBucket = truncateToMinute(value.ts)
          val idBytes = (ad.adId + "|" + value.orderId + "|" + minuteBucket).getBytes("UTF-8")
          val recordId = UUID.nameUUIDFromBytes(idBytes).toString

          out.collect(AttributedConversion(
            recordId = recordId,
            adId = ad.adId,
            campaignId = ad.campaignId,
            conversionId = value.conversionId,
            orderId = value.orderId,
            amount = value.amount,
            minuteBucket = minuteBucket,
            region = value.region,
            ts = value.ts
          ))
        }
      }
    })

Then we sink attributed to a Kafka topic ad_conversions_attr with the same EXACTLY_ONCE sink pattern.

Job 3: Union & Load – Pinot + Warehouse

Pinot real-time table config (upsert)

Pinot table (simplified):

{
  "tableName": "ad_metrics_rt",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "minuteBucket",
    "timeType": "MILLISECONDS",
    "replication": "3"
  },
  "fieldConfigList": [],
  "schemaName": "ad_metrics_rt",
  "ingestionConfig": {
    "transformConfigs": [],
    "streamIngestionConfig": {
      "type": "kafka",
      "config": {
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.topic.name": "ad_metrics_agg",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.prop.isolation.level": "read_committed"
      }
    }
  },
  "upsertConfig": {
    "mode": "FULL",
    "primaryKeyColumns": ["recordId"]
  }
}

Now, each aggregate (and later updates) will upsert on recordId, ensuring idempotency.

Warehouse ETL (dedupe on recordId)

If you stage Kafka data into a lake table (e.g., Iceberg), you can dedupe like:

CREATE TABLE ad_metrics_cleaned AS
SELECT *
FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (PARTITION BY recordId ORDER BY updatedAt DESC) AS rn
  FROM ad_metrics_staging
)
WHERE rn = 1;

This preserves exactly one row per logical record, even if Flink had to replay some windows.

Latency and Trade-Offs

Rough latency components:

You can push this down by:

Trade-offs:

Reliability comes from:

Exactly‑once doesn’t mean “no failures”; it means when you recover, your numbers are consistent.

Takeaways

If you’re designing ad, billing, or any money‑sensitive event pipeline:

The key is to design for exactly‑once at every hop: