Introduction:
If you’re building ad or billing pipelines, “close enough” is not good enough.
Every impression, click, and conversion event represents real money. Losing events under‑charges. Double‑counting over‑charges. Both are unacceptable.
At the same time, downstream systems (budget pacing, bidding, dashboards) want near real‑time data, not yesterday’s batch.
This post walks through a concrete architecture and implementation for:
- Ingesting raw ad events into Kafka
- Cleaning and aggregating them in near real‑time with Flink
- Attributing conversions to ads
- Powering low‑latency analytics in Pinot and long‑term reporting in a warehouse
…while maintaining exactly‑once semantics end‑to‑end.
The Problem: Money Events in Motion
Typical event types:
Impression– an ad was shownClick– a user clickedConversion– an order, signup, or some billable event
We want to:
- Ingest events from clients/services
- Clean & dedupe noisy streams (retries, client bugs, late arrival)
- Aggregate metrics (impressions, clicks, CTR) in small time buckets
- Attribute conversions back to ads
- Serve:
- Low‑latency analytics for dashboards and internal services
- Durable history for offline reporting and ML
- With:
- Sub‑minute to a few‑minute latency
- No silent data loss
- Exactly‑once accounting
Building Blocks
We’ll use:
- Apache Kafka – durable event log, supports transactions +
read_committed - Apache Flink – stream processing with exactly‑once state and sinks
- Apache Pinot – real‑time OLAP store, low‑latency queries, upserts
- Warehouse / lake (Hive / Iceberg / Delta) – long‑term history & SQL
You can swap components (e.g., Druid for Pinot, BigQuery/Snowflake for the warehouse); the patterns stay the same.
Event Model and Kafka Topics
We’ll assume Protobuf or Avro for schema‑managed messages. For simplicity, here’s a JSON/Avro‑style schema for the raw ad events topic:
// Topic: ad_events_raw
{
"type": "record",
"name": "AdEvent",
"fields": [
{ "name": "event_id", "type": "string" }, // client-side UUID
{ "name": "user_id", "type": ["null", "string"], "default": null },
{ "name": "ad_id", "type": "string" },
{ "name": "campaign_id", "type": "string" },
{ "name": "event_type", "type": { "type": "enum", "name": "EventType",
"symbols": ["IMPRESSION", "CLICK"] } },
{ "name": "ts", "type": "long" }, // event time (ms)
{ "name": "region", "type": "string" },
{ "name": "meta", "type": ["null", { "type": "map", "values": "string" }],
"default": null }
]
}
Kafka configs for transactional producers (Flink’s Kafka sinks will use this):
# Kafka broker config (cluster-wide)
transaction.state.log.min.isr=3
transaction.state.log.replication.factor=3
# Producer config (used by Flink Kafka sink)
enable.idempotence=true
acks=all
retries=10
max.in.flight.requests.per.connection=1
transactional.id=flink-ad-agg-job-1
High-Level Job Graph
We will have three Flink jobs:
- Aggregation Job
- Consumes
ad_events_raw - Cleans & dedupes
- Aggregates into 1‑minute buckets
- Writes aggregated metrics to
ad_metrics_agg(Kafka) with record UUIDs
- Consumes
- Attribution Job
- Consumes
conversion_events(orders) - Looks up prior ad events
- Emits attributed conversions to
ad_conversions_attr
- Consumes
- Union & Load Job
- Consumes
ad_metrics_agg+ad_conversions_attr - Writes to Pinot (real‑time) and the warehouse
- Consumes
All Kafka ↔ Flink paths use exactly‑once semantics via Flink checkpoints + Kafka transactions.
Exactly-Once in Practice (Flink + Kafka + Idempotency)
We rely on three mechanisms:
- Flink’s exactly‑once state + two‑phase commit sink
In code (Scala/Java):
env.enableCheckpointing(60_000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000L);
env.getCheckpointConfig().setCheckpointTimeout(5 * 60_000L);
env.setStateBackend(new RocksDBStateBackend("hdfs://checkpoints/ad-pipelines"));
- Kafka transactions +
read_committed
Flink’s KafkaSink writes in transactions that are committed when a checkpoint completes:
KafkaSink<AggregatedMetric> aggSink =
KafkaSink.<AggregatedMetric>builder()
.setBootstrapServers(kafkaBrokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("ad_metrics_agg")
.setValueSerializationSchema(new AggregatedMetricSerde())
.build())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-ad-agg-")
.build();
Downstream consumers (Pinot, ETL jobs) must use:
isolation.level=read_committed
- Record‑level idempotency
Aggregated/attributed records use stable IDs:
String recordId = UUID.nameUUIDFromBytes(
(adId + "|" + minuteBucket + "|" + region).getBytes(StandardCharsets.UTF_8)
).toString();
We store this record_id field and use it as:
- Pinot primary key (upsert)
- Dedup key in the warehouse ETL
So if Flink replays, we just overwrite the same logical record.
Job 1: Aggregation – From Noisy Stream to Clean Buckets
Let’s look at the Flink job in more detail (Scala, but Java is similar).
1. Source and basic cleansing
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE)
val source: KafkaSource[AdEvent] =
KafkaSource.builder[AdEvent]()
.setBootstrapServers(kafkaBrokers)
.setTopics("ad_events_raw")
.setGroupId("ad-agg-job")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new AdEventDeserializer)
.build()
val rawEvents: DataStream[AdEvent] = env.fromSource(
source,
WatermarkStrategy
.forBoundedOutOfOrderness[AdEvent](java.time.Duration.ofSeconds(30))
.withTimestampAssigner((e: AdEvent, _: Long) => e.ts),
"ad_events_raw"
)
// Filter invalid / stale events
val validEvents = rawEvents
.filter(e => e.adId != null && e.eventType != null)
.filter(e => isRecent(e.ts)) // e.g., within 7 days
2. Deduplication with keyed state
We assume event_id is client‑side unique. We’ll keep a small TTL cache per user/ad to drop duplicates.
case class DedupedAdEvent(e: AdEvent)
val deduped: DataStream[AdEvent] = validEvents
.keyBy(_.eventId)
.process(new KeyedProcessFunction[String, AdEvent, AdEvent] {
lazy val seen: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("seen-ts", classOf[Long])
)
override def processElement(
value: AdEvent,
ctx: KeyedProcessFunction[String, AdEvent, AdEvent]#Context,
out: Collector[AdEvent]): Unit = {
val lastSeen = seen.value()
if (lastSeen == 0L || isNewer(value.ts, lastSeen)) {
seen.update(value.ts)
out.collect(value)
}
// else: drop duplicate / older retry
}
})
In production, you’d use a more space‑efficient structure (Bloom filter, TTL, etc.), but this illustrates the idea.
3. Aggregation into 1‑minute buckets
Define a case class for the aggregate:
case class AdMetricAgg(
recordId: String,
adId: String,
campaignId: String,
region: String,
minuteBucket: Long,
impressions: Long,
clicks: Long,
updatedAt: Long
)
Aggregation:
val keyed = deduped
.map(e => (e, truncateToMinute(e.ts)))
.keyBy { case (e, minuteBucket) => (e.adId, minuteBucket, e.region) }
val aggregated: DataStream[AdMetricAgg] =
keyed
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction[(AdEvent, Long), AdMetricAgg, (String, Long, String), TimeWindow] {
override def process(
key: (String, Long, String),
context: Context,
elements: Iterable[(AdEvent, Long)],
out: Collector[AdMetricAgg]): Unit = {
val (adId, minuteBucket, region) = key
var imps = 0L
var clicks = 0L
elements.foreach { case (e, _) =>
e.eventType match {
case IMPRESSION => imps += 1
case CLICK => clicks += 1
}
}
val recordId = UUID.nameUUIDFromBytes(
(adId + "|" + minuteBucket + "|" + region).getBytes(StandardCharsets.UTF_8)
).toString
out.collect(AdMetricAgg(
recordId = recordId,
adId = adId,
campaignId = elements.head._1.campaignId,
region = region,
minuteBucket = minuteBucket,
impressions = imps,
clicks = clicks,
updatedAt = System.currentTimeMillis()
))
}
})
4. Transactional sink to Kafka
val aggSink: KafkaSink[AdMetricAgg] =
KafkaSink.builder[AdMetricAgg]()
.setBootstrapServers(kafkaBrokers)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder[AdMetricAgg]()
.setTopic("ad_metrics_agg")
.setValueSerializationSchema(new AdMetricAggSerializer)
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-ad-agg-")
.build()
aggregated.sinkTo(aggSink)
env.execute("ad-aggregation-job")
At this point, we have exactly‑once, deduped, 1‑minute aggregates in Kafka.
Job 2: Attribution – Connecting Orders to Ads
We’ll assume a separate Kafka topic conversion_events:
// Topic: conversion_events
{
"conversion_id": "string",
"user_id": "string",
"order_id": "string",
"amount": "double",
"ts": "long",
"region": "string"
}
We need to join conversions with prior ad events. The simplest design is:
- Write clean ad events into a TTL‑backed KV store (
ad_event_id→ context) - In the attribution job, for each conversion:
- Look up matching ad events
- Emit attributed conversion records with deterministic IDs
Illustrative Flink snippet (pseudocode for KV lookup):
case class AttributedConversion(
recordId: String,
adId: String,
campaignId: String,
conversionId: String,
orderId: String,
amount: Double,
minuteBucket: Long,
region: String,
ts: Long
)
val conversions: DataStream[ConversionEvent] = // from Kafka
val attributed: DataStream[AttributedConversion] =
conversions
.process(new ProcessFunction[ConversionEvent, AttributedConversion] {
override def processElement(
value: ConversionEvent,
ctx: Context,
out: Collector[AttributedConversion]): Unit = {
// Example: lookup by user_id in external KV
val adEvents: List[AdEvent] = adEventStore.lookupByUser(value.userId)
adEvents.foreach { ad =>
val minuteBucket = truncateToMinute(value.ts)
val idBytes = (ad.adId + "|" + value.orderId + "|" + minuteBucket).getBytes("UTF-8")
val recordId = UUID.nameUUIDFromBytes(idBytes).toString
out.collect(AttributedConversion(
recordId = recordId,
adId = ad.adId,
campaignId = ad.campaignId,
conversionId = value.conversionId,
orderId = value.orderId,
amount = value.amount,
minuteBucket = minuteBucket,
region = value.region,
ts = value.ts
))
}
}
})
Then we sink attributed to a Kafka topic ad_conversions_attr with the same EXACTLY_ONCE sink pattern.
Job 3: Union & Load – Pinot + Warehouse
Pinot real-time table config (upsert)
Pinot table (simplified):
{
"tableName": "ad_metrics_rt",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "minuteBucket",
"timeType": "MILLISECONDS",
"replication": "3"
},
"fieldConfigList": [],
"schemaName": "ad_metrics_rt",
"ingestionConfig": {
"transformConfigs": [],
"streamIngestionConfig": {
"type": "kafka",
"config": {
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "ad_metrics_agg",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.prop.isolation.level": "read_committed"
}
}
},
"upsertConfig": {
"mode": "FULL",
"primaryKeyColumns": ["recordId"]
}
}
Now, each aggregate (and later updates) will upsert on recordId, ensuring idempotency.
Warehouse ETL (dedupe on recordId)
If you stage Kafka data into a lake table (e.g., Iceberg), you can dedupe like:
CREATE TABLE ad_metrics_cleaned AS
SELECT *
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY recordId ORDER BY updatedAt DESC) AS rn
FROM ad_metrics_staging
)
WHERE rn = 1;
This preserves exactly one row per logical record, even if Flink had to replay some windows.
Latency and Trade-Offs
Rough latency components:
- Kafka ingest: ms–seconds
- Flink window + checkpoint: e.g., 1 min window + 60s checkpoint interval
- Pinot ingestion: seconds
You can push this down by:
- Shorter windows
- More frequent checkpoints
- Tuning Kafka/Pinot flush intervals
Trade-offs:
- Lower latency = more overhead, more small files/segments
- Larger windows = less overhead, but slower feedback
Reliability comes from:
- Kafka retention for replay
- Flink checkpoints for state + offsets
- Idempotent downstream ingestion via primary keys and
read_committed
Exactly‑once doesn’t mean “no failures”; it means when you recover, your numbers are consistent.
Takeaways
If you’re designing ad, billing, or any money‑sensitive event pipeline:
- Use Flink + Kafka transactions with
EXACTLY_ONCEandread_committed - Assign stable record IDs and treat downstream streams as upsert logs
- Keep aggregation / attribution / serving as separate, testable jobs
- Use an OLAP store (Pinot/Druid) with upsert on primary key for real‑time analytics
- Deduplicate on those same IDs in your warehouse/lake
The key is to design for exactly‑once at every hop:
- Source → Flink: Kafka offsets + checkpoints
- Flink → Kafka: transactional sinks
- Kafka → OLAP/warehouse: primary keys + idempotent ingestion