Join ML Engineer Interview MasterClass (April Cohort) led by FAANG Data Scientists | Just 6 seats remaining...
ML Engineer MasterClass (April) | 6 seats left
At its core, this system is an always-on data infrastructure layer that turns a firehose of raw events into queryable, actionable metrics. Think of it as the plumbing between "something happened" (a user clicked a button, a sensor fired, a service threw an error) and "someone can see it on a dashboard 30 seconds later."
The hard part isn't ingesting events. Kafka handles that. The hard part is doing it correctly at scale: without dropping events, without double-counting, and without breaking every downstream consumer the moment a product team adds a new field to their event payload.
Before you sketch a single box on the whiteboard, you need to pin down what this pipeline actually does. The scope creep trap in this interview is treating "real-time analytics" as one thing when it's really four different systems bolted together.
Core Requirements
Below the line (out of scope)
"Real-time" is doing a lot of work in this problem statement. Nail down what it actually means before you commit to an architecture.
Start with the numbers you've established, then derive what the system actually needs to handle.
Assumptions
| Dimension | Calculation | Result |
|---|---|---|
| Peak ingestion throughput | 500K events/sec × 500 bytes | ~250 MB/sec (~2 Gbps) |
| Daily raw event volume | 250 MB/sec × 86,400 sec | ~21 TB/day |
| 30-day raw retention (Kafka + S3) | 21 TB/day × 30 days | ~630 TB |
| Aggregated metric writes/sec | 500K events × 10 aggregations / 60-sec window | ~83K metric writes/sec |
| 1-year aggregated metrics storage | 83K writes/sec × 60 bytes × 86,400 × 365 | ~160 TB |
| Kafka partition count (at 10 MB/sec/partition) | 250 MB/sec / 10 MB/sec | ~25 partitions minimum (round to 50 with headroom) |
A few things to flag here. The 630 TB raw retention number is why you don't store raw events in Kafka alone. Kafka is expensive at that scale; you tier to S3-backed Iceberg after the retention window. And the 83K metric writes/sec to your serving store is high enough that naive Redis key-per-metric designs will hit memory limits fast, which is a design constraint you'll want to address in the serving layer.
Four entities drive this system. Two are data entities that live on the hot path (raw events and aggregated metrics), one is an operational entity (pipeline checkpoints), and one is a configuration entity (alert rules). Keeping that distinction clear in your interview will signal that you understand the difference between data flow and control flow.
RawEvent is the immutable log entry. Every event that enters the system gets written here exactly once. Think of it as your source of truth: if a downstream job produces wrong aggregates, you replay from this table. The properties column is intentionally schemaless (JSONB) to accommodate different event types without schema migrations on the hot path.
1CREATE TABLE raw_events (
2 event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3 source VARCHAR(100) NOT NULL, -- e.g. 'web', 'ios', 'backend-payments'
4 event_type VARCHAR(100) NOT NULL, -- e.g. 'page_view', 'purchase', 'error'
5 user_id UUID, -- nullable for anonymous events
6 properties JSONB NOT NULL DEFAULT '{}', -- arbitrary event payload
7 server_timestamp TIMESTAMP NOT NULL, -- assigned at ingestion, not by client
8 ingested_at TIMESTAMP NOT NULL DEFAULT now()
9);
10
11CREATE INDEX idx_raw_events_source_ts ON raw_events(source, server_timestamp DESC);
12CREATE INDEX idx_raw_events_type_ts ON raw_events(event_type, server_timestamp DESC);
13server_timestamp is set by the ingestion layer, not the producer. Client clocks drift. If you let producers set the timestamp, your windowed aggregations will silently produce wrong results. This is the kind of detail interviewers love to probe.AggregatedMetric is what dashboards actually read. Your Flink job writes pre-computed rollups here after each window closes. The dimensions column stores the grouping keys (country, device type, etc.) as JSONB so you can support arbitrary dimension combinations without altering the schema every time a product team wants a new breakdown.
1CREATE TABLE aggregated_metrics (
2 metric_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3 metric_name VARCHAR(200) NOT NULL, -- e.g. 'purchase_count', 'p99_latency'
4 window_start TIMESTAMP NOT NULL,
5 window_end TIMESTAMP NOT NULL,
6 dimensions JSONB NOT NULL DEFAULT '{}', -- e.g. {"country": "US", "device": "ios"}
7 value DOUBLE PRECISION NOT NULL,
8 updated_at TIMESTAMP NOT NULL DEFAULT now()
9);
10
11CREATE UNIQUE INDEX idx_agg_metrics_window
12 ON aggregated_metrics(metric_name, window_start, window_end, dimensions);
13CREATE INDEX idx_agg_metrics_name_ts
14 ON aggregated_metrics(metric_name, window_start DESC);
15The UNIQUE INDEX on (metric_name, window_start, window_end, dimensions) is doing real work here. It makes writes idempotent: if a Flink job restarts and replays a window, the upsert lands in the same row instead of creating a duplicate. Mention this explicitly in your interview.
PipelineCheckpoint is how you track exactly-once processing state. Each Flink consumer group commits its Kafka offset and the last watermark it processed per partition. If the job crashes, it recovers from the last committed checkpoint rather than re-reading from the beginning of the topic.
1CREATE TABLE pipeline_checkpoints (
2 checkpoint_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3 consumer_group VARCHAR(200) NOT NULL,
4 topic VARCHAR(200) NOT NULL,
5 partition INT NOT NULL,
6 offset BIGINT NOT NULL, -- last committed Kafka offset
7 watermark TIMESTAMP NOT NULL, -- last processed event-time watermark
8 updated_at TIMESTAMP NOT NULL DEFAULT now(),
9 UNIQUE (consumer_group, topic, partition) -- one row per partition per consumer group
10);
11In practice, Flink manages checkpoint state internally via RocksDB and S3 snapshots. This table represents the logical concept. You don't necessarily build it yourself, but you should be able to explain what it's tracking and why.
AlertRule is a configuration entity. It defines thresholds against metric streams: "if error_rate exceeds 5% over a 60-second window, fire a PagerDuty alert." The critical design point is that this table lives outside the hot path. Your Flink alerting job reads these rules at startup (or polls for changes), not on every event. Coupling rule evaluation to per-event processing would make your alerting latency dependent on rule store throughput.
1CREATE TABLE alert_rules (
2 rule_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3 metric_name VARCHAR(200) NOT NULL,
4 condition VARCHAR(50) NOT NULL, -- e.g. 'GREATER_THAN', 'LESS_THAN'
5 threshold DOUBLE PRECISION NOT NULL,
6 window_seconds INT NOT NULL, -- evaluation window duration
7 notification_channel VARCHAR(200) NOT NULL, -- e.g. 'pagerduty://team-data-eng'
8 enabled BOOLEAN NOT NULL DEFAULT true,
9 created_at TIMESTAMP NOT NULL DEFAULT now()
10);
11
The API surface here is smaller than you might expect. Most of the system is internal pipeline infrastructure. The external-facing API covers three concerns: event ingestion, metric reads for dashboards, and alert rule management.
1// Ingest a single event or a batch of events from a producer
2POST /v1/events
3{
4 "events": [
5 {
6 "source": "web",
7 "event_type": "page_view",
8 "user_id": "uuid-optional",
9 "properties": { "page": "/checkout", "referrer": "google" },
10 "client_timestamp": "2024-01-15T10:30:00Z" // advisory only; server sets server_timestamp
11 }
12 ]
13}
14-> {
15 "accepted": 42,
16 "rejected": 1,
17 "errors": [{ "index": 3, "reason": "schema_validation_failed" }]
18}
19POST is the right verb here because ingestion is not idempotent at the HTTP layer. The pipeline handles idempotency downstream via event_id. Batching is important: at 500K events/sec, per-event HTTP calls would saturate your ingestion service with connection overhead.
1// Read pre-aggregated metrics for a dashboard panel
2GET /v1/metrics/{metric_name}?window_start=...&window_end=...&dimensions=country:US,device:ios
3-> {
4 "metric_name": "purchase_count",
5 "windows": [
6 {
7 "window_start": "2024-01-15T10:00:00Z",
8 "window_end": "2024-01-15T10:01:00Z",
9 "value": 1842,
10 "dimensions": { "country": "US", "device": "ios" }
11 }
12 ]
13}
14This is a GET because it's a pure read with no side effects. The query hits Redis for recent windows and falls back to Iceberg via Trino for historical ranges. The dashboard service handles that routing transparently; callers don't need to know which store answered.
1// Create a new alert rule
2POST /v1/alert-rules
3{
4 "metric_name": "error_rate",
5 "condition": "GREATER_THAN",
6 "threshold": 0.05,
7 "window_seconds": 60,
8 "notification_channel": "pagerduty://team-data-eng"
9}
10-> {
11 "rule_id": "uuid",
12 "created_at": "2024-01-15T10:30:00Z"
13}
14
15// Update an existing alert rule (e.g. change threshold)
16PUT /v1/alert-rules/{rule_id}
17{ "threshold": 0.03 }
18-> { "rule_id": "uuid", "updated_at": "..." }
19
20// Disable a rule without deleting it
21PATCH /v1/alert-rules/{rule_id}
22{ "enabled": false }
23-> { "rule_id": "uuid", "enabled": false }
24Use PUT for full replacement of a rule and PATCH for partial updates like toggling enabled. Soft-disabling rules with enabled: false rather than DELETE preserves audit history, which matters when you're debugging why an alert stopped firing at 2am.
Five moving parts make this pipeline work: ingestion, stream processing, serving, alerting, and error handling. We'll walk through each one in order, because the design decisions in each layer constrain what's possible in the next.
Core components: Event producers (web, mobile, backend services), Schema Registry, Kafka cluster, Kafka Connect
Events arrive from many sources simultaneously. Your job is to get them into a durable, ordered log without letting bad data in.
Producers serialize events as Avro and register their schema with the Schema Registry before publishing. The registry acts as a gatekeeper: if the event doesn't match a registered schema, it's rejected at the producer before it ever touches Kafka. This is much cheaper than catching bad data downstream after it's already been aggregated into metrics.
Data flow:
source or event_type1{
2 "event_id": "uuid-v4",
3 "source": "mobile-ios",
4 "event_type": "page_view",
5 "user_id": "uuid-v4",
6 "properties": {
7 "page": "/checkout",
8 "session_id": "abc123"
9 },
10 "server_timestamp": "2024-01-15T10:23:45.123Z"
11}
12
Partitioning strategy matters here. Partitioning by event_type keeps related events on the same partition, which makes windowed aggregations cheaper because Flink tasks don't need to shuffle data across the network. Partitioning by source is better if you have wildly uneven event volumes per type and want to avoid hot partitions. Ask your interviewer what the event distribution looks like before committing.
One more thing: set Kafka's retention to at least 7 days for raw events. This gives you a reprocessing window if a downstream bug corrupts aggregates. You'll want this when the deep dive on backfills comes up.
Core components: Flink aggregation job, RocksDB state backend, Redis, Iceberg on S3
Flink reads from Kafka and does the heavy lifting: grouping events into time windows, computing aggregates, and writing results to two different stores.
Data flow:
server_timestamp (not processing time)(metric_name, dimension_values) and routed to the appropriate window operator1# Flink windowed aggregation (pseudocode, PySpark-style for readability)
2events = (
3 kafka_source
4 .assign_timestamps_and_watermarks(
5 WatermarkStrategy
6 .for_bounded_out_of_orderness(Duration.of_seconds(30))
7 .with_timestamp_assigner(lambda e: e["server_timestamp"])
8 )
9)
10
11aggregated = (
12 events
13 .key_by(lambda e: (e["event_type"], e["properties"].get("page")))
14 .window(TumblingEventTimeWindows.of(Time.minutes(1)))
15 .aggregate(CountAggregator())
16)
17
18aggregated.add_sink(RedisSink())
19aggregated.add_sink(IcebergSink(table="aggregated_metrics"))
20
The RocksDB state backend is the right choice here. In-memory state backends can't handle the volume at 500K events/sec across many concurrent windows. RocksDB spills to local disk and checkpoints to S3, so Flink can recover without reprocessing from the beginning of the Kafka topic.
The dual-write adds complexity, but it's the right trade-off. The Flink job treats both sinks as part of the same checkpoint transaction, so either both writes succeed or neither does.
Core components: Redis, Iceberg on S3, Trino/Athena, Dashboard service
Two query patterns dominate: "show me the last 5 minutes of checkout conversion rate" and "show me checkout conversion rate for the last 90 days broken down by country." These have completely different latency and throughput characteristics, and one store can't serve both well.
Data flow for real-time dashboard queries:
metric:{metric_name}:{dimension_hash}:{window_start}Data flow for historical/ad-hoc queries:
window_start partition column
Redis keys should have a TTL matching your retention policy for hot data, typically 24 to 48 hours. Anything older goes to Iceberg. The dashboard service needs to know which store to hit based on the query's time range, which is a simple routing decision: if window_start is within the last 48 hours, hit Redis; otherwise, hit Trino.
aggregated_metrics table by DATE(window_start), Trino skips irrelevant files entirely. Without this, a 90-day query scans everything.Don't put OLAP queries on Redis. It's not designed for range scans across thousands of keys, and you'll saturate the connection pool under any real query load.
Core components: Flink alerting job, AlertRule store (Postgres), Redis, notification service
Alerting can't depend on the aggregation job. If the aggregation job is backpressured or restarting, you still need alerts to fire. So the alerting job reads directly from the same Kafka topic, independently.
Data flow:
1-- Alert rules are configuration, not hot-path data
2CREATE TABLE alert_rules (
3 rule_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
4 metric_name VARCHAR(255) NOT NULL,
5 condition VARCHAR(50) NOT NULL, -- 'gt', 'lt', 'eq'
6 threshold DOUBLE PRECISION NOT NULL,
7 window_seconds INT NOT NULL,
8 notification_channel VARCHAR(255) NOT NULL,
9 is_active BOOLEAN NOT NULL DEFAULT true,
10 updated_at TIMESTAMP NOT NULL DEFAULT now()
11);
12Keeping alert rules in Postgres (not in the Flink job's state) means you can update thresholds without restarting the job. The job polls for changes. This is a small operational detail that interviewers love to probe: "what happens if someone changes an alert threshold at 2am during an incident?"
Alerting latency should be in the 5 to 15 second range for most use cases. If you need sub-second alerting, you'd skip windowing entirely and evaluate rules on individual events as they arrive, at the cost of higher false-positive rates.
Core components: DLQ Kafka topic, DLQ monitor, reprocessing job
Every pipeline has bad data. The question is whether you lose it or handle it gracefully.
Data flow:
events.dlq topic with error metadata attached1{
2 "original_payload": "<raw bytes>",
3 "error_type": "SchemaValidationError",
4 "error_message": "Field 'server_timestamp' is required but missing",
5 "source_topic": "events.clickstream",
6 "failed_at": "2024-01-15T10:23:45.456Z",
7 "producer_host": "web-prod-07"
8}
9The DLQ is your audit trail. Without it, you have no way to know how many events you lost or what was in them. With it, you can answer "did we miss any checkout events during the schema migration?" with a concrete number.
The full pipeline is three parallel paths sharing a single Kafka backbone.
The ingestion layer enforces data contracts at the source via the Schema Registry, so the stream processing layer can trust what it receives. Flink runs two independent jobs from the same topics: one for windowed aggregations, one for alerting. Aggregation results land in both Redis (for fresh, low-latency reads) and Iceberg (for durable, queryable history). The dashboard service routes queries to the right store based on time range. Failed events go to the DLQ instead of being silently dropped, giving the team a recovery path.
The key architectural principle throughout: each layer has one job, and failures in one layer don't cascade into others. The alerting job doesn't wait for aggregations. The DLQ doesn't block the main pipeline. The serving layer doesn't force OLAP queries onto a hot cache.
At 500K events/sec, you're looking at roughly 50 Kafka partitions (assuming 10K events/sec per partition as a conservative ceiling), a Flink cluster with 20 to 40 task slots, and a Redis cluster sized for the hot window of pre-aggregated keys. Iceberg on S3 handles the rest at storage costs orders of magnitude cheaper than keeping everything in Redis.
These are the questions that separate candidates who've read about streaming systems from those who've actually run them in production. Expect the interviewer to push hard on at least two of these.
This one comes up constantly. A Flink task manager crashes halfway through a 5-minute tumbling window. When the job recovers, does it reprocess events it already counted? If your answer is "we just restart and hope for the best," you're going to lose points fast.
The naive approach is to let Flink run with at-least-once guarantees and then deduplicate on the read side. You store a processed_event_ids set somewhere (Redis, maybe a Bloom filter) and check it before updating your aggregates.
This breaks down at scale. At 500K events/sec, that deduplication store becomes a bottleneck and a single point of failure. Worse, your aggregates are mutable state spread across two systems with no transactional boundary between them. A partial write leaves you with corrupted counts and no clean way to detect it.
Flink's checkpointing mechanism gives you exactly-once semantics within the job itself. Every N seconds (configurable, typically 30-60s for low-latency pipelines), Flink snapshots all operator state to durable storage using the Chandy-Lamport algorithm. If the job crashes, it recovers from the last successful checkpoint and replays only the Kafka messages after the committed offset.
RocksDB as the state backend is the right call here over the default heap-based backend. Window state for a high-volume pipeline can easily reach tens of gigabytes. Keeping that on the JVM heap causes GC pressure and checkpoint timeouts. RocksDB spills to local disk and snapshots incrementally to S3.
1# Flink job configuration (PyFlink)
2env = StreamExecutionEnvironment.get_execution_environment()
3
4# Enable checkpointing every 30 seconds
5env.enable_checkpointing(30_000) # ms
6
7# Exactly-once mode
8env.get_checkpoint_config().set_checkpointing_mode(
9 CheckpointingMode.EXACTLY_ONCE
10)
11
12# RocksDB state backend with S3 checkpoint storage
13env.set_state_backend(EmbeddedRocksDBStateBackend())
14env.get_checkpoint_config().set_checkpoint_storage(
15 "s3://your-bucket/flink-checkpoints"
16)
17
18# Minimum pause between checkpoints to avoid overlap
19env.get_checkpoint_config().set_min_pause_between_checkpoints(10_000)
20The catch: checkpointing only covers state inside Flink. Your writes to Redis or Iceberg are still outside the checkpoint boundary. If the job crashes after computing a window result but before writing it, you'll recompute and write again on recovery.
The full solution closes the gap between Flink's internal guarantees and the external sinks. For Redis, you key every aggregate write by (metric_name, window_start, window_end, dimensions). A recomputed window produces the same key and simply overwrites the previous value. The write is naturally idempotent.
For Iceberg, you use Flink's two-phase commit sink. Flink writes data files during the checkpoint but only commits the snapshot to the Iceberg catalog when the checkpoint succeeds. If the job crashes before the checkpoint completes, the uncommitted files are abandoned and the catalog never sees them.
1# Idempotent Redis write keyed by window identity
2def write_aggregate_to_redis(redis_client, metric_name, window_start,
3 window_end, dimensions, value):
4 # Key is deterministic: same window always produces same key
5 key = f"metric:{metric_name}:{window_start.isoformat()}:{window_end.isoformat()}"
6
7 # HSET is idempotent: rewriting the same window is safe
8 redis_client.hset(key, mapping={
9 "value": value,
10 "dimensions": json.dumps(dimensions),
11 "updated_at": datetime.utcnow().isoformat()
12 })
13 # TTL keeps the hot store from growing unbounded
14 redis_client.expire(key, 86400) # 24 hours
15On the Kafka side, configure your consumers to commit offsets only after a successful checkpoint, not on a timer. This ties offset commits to Flink's recovery point, so replays always start from a consistent position.

Mobile apps buffer events locally and flush them when connectivity returns. A user opens your app on a plane, generates 40 events, and lands two hours later. Those events hit Kafka with timestamps from two hours ago. Your 5-minute windows closed long ago. What do you do?
Processing time is the timestamp when Flink receives the event, not when it actually occurred. It's simple to implement and there's no late-data problem because events are always "on time" relative to wall clock.
The problem is that your aggregates become meaningless for any business question that cares about when things actually happened. "How many users clicked checkout between 2pm and 3pm?" becomes unanswerable because your 2pm-3pm window contains events that occurred at 11am and events that occurred at 4pm, depending on network conditions.
Flink's event-time processing assigns each event to a window based on its event_timestamp field, not when it arrived. A watermark is a signal that says "I'm confident all events with timestamps before T have now arrived." When the watermark advances past a window's end time, Flink fires the window result.
You configure a watermark strategy with a bounded out-of-orderness. For mobile clients, something like 5 minutes is reasonable for most events. You also set allowed_lateness on the window, which keeps the window state alive for a grace period after the watermark fires, updating the result if late events trickle in.
1from pyflink.datastream.watermark_strategy import WatermarkStrategy
2from pyflink.common import Duration
3
4# Watermark strategy: assume events arrive within 5 minutes of event time
5watermark_strategy = (
6 WatermarkStrategy
7 .for_bounded_out_of_orderness(Duration.of_minutes(5))
8 .with_timestamp_assigner(
9 # Extract event_timestamp from the event payload
10 lambda event, _: event["server_timestamp_ms"]
11 )
12)
13
14stream = kafka_source_stream.assign_timestamps_and_watermarks(
15 watermark_strategy
16)
17
18# Window with 2-minute allowed lateness after watermark fires
19windowed_stream = (
20 stream
21 .key_by(lambda e: (e["metric_name"], e["dimension_value"]))
22 .window(TumblingEventTimeWindows.of(Time.minutes(5)))
23 .allowed_lateness(Time.minutes(2))
24 .aggregate(MetricAggregateFunction())
25)
26The trade-off: a 5-minute out-of-orderness bound means your dashboard results are at least 5 minutes stale. Tighten the bound and you get fresher results but more events classified as late.
Events that arrive after the allowed lateness window closes aren't just dropped. You route them to a Flink side output stream, which feeds a separate correction job. That job merges the late events into the already-written Iceberg partitions using Iceberg's MERGE INTO semantics, updating the historical aggregate without touching the hot Redis store.
1# Define a side output tag for late events
2late_output_tag = OutputTag("late-events", Types.MAP(Types.STRING(), Types.STRING()))
3
4class LateDataAwareWindowFunction(ProcessWindowFunction):
5 def process(self, key, context, elements, out):
6 # Emit normal result to main output
7 result = compute_aggregate(elements)
8 out.collect(result)
9
10 def on_late_arrival(self, element, ctx, out):
11 # Route to side output instead of discarding
12 ctx.output(late_output_tag, element)
13
14# Main stream gets normal results
15main_stream = windowed_stream.process(LateDataAwareWindowFunction())
16
17# Side output gets late arrivals for async correction
18late_stream = main_stream.get_side_output(late_output_tag)
19late_stream.add_sink(IcebergCorrectionSink())
20The correction job runs on a slower cadence (every 15-30 minutes) and issues targeted partition rewrites. Dashboards showing historical data automatically pick up the corrections on their next query. Real-time dashboards are unaffected because they read from Redis, which only holds the current window.

A product team wants to add a session_id field to the click event schema. Sounds simple. But you have three Flink jobs reading that topic, a Spark batch job doing daily rollups, and an Iceberg table with six months of historical data. A naive schema change can silently corrupt all of them.
Some teams skip schema enforcement entirely and use raw JSON. Producers add fields whenever they want, consumers ignore fields they don't recognize. Flexible, right?
In practice, this means a typo in a field name (user_Id instead of user_id) silently produces null values in your aggregates for weeks before anyone notices. You have no contract between producers and consumers, no version history, and no way to detect breaking changes before they hit production. The DLQ stays empty because nothing is technically invalid; it's just wrong.
Avro with a schema registry gives you a versioned contract. Every schema change is registered before any producer starts writing the new format. The registry enforces compatibility rules: in BACKWARD mode, new schemas must be readable by the previous version's reader. That means you can add optional fields with defaults, but you can't remove required fields or change types.
1// v1 schema
2{
3 "type": "record",
4 "name": "ClickEvent",
5 "fields": [
6 {"name": "event_id", "type": "string"},
7 {"name": "user_id", "type": "string"},
8 {"name": "event_type", "type": "string"},
9 {"name": "timestamp_ms", "type": "long"}
10 ]
11}
12
13// v2 schema: adding session_id as optional field with default
14{
15 "type": "record",
16 "name": "ClickEvent",
17 "fields": [
18 {"name": "event_id", "type": "string"},
19 {"name": "user_id", "type": "string"},
20 {"name": "event_type", "type": "string"},
21 {"name": "timestamp_ms", "type": "long"},
22 {"name": "session_id", "type": ["null", "string"], "default": null}
23 ]
24}
25During the rollout window, the Kafka topic contains a mix of v1 and v2 messages. Avro schema resolution handles this: the reader (your Flink job) uses its own schema to decode messages, filling in the default null for session_id when reading v1 events. No job restart required.
BACKWARD compatibility protects existing consumers from new producers, but it doesn't protect new consumers from old data. For a pipeline that needs to backfill or reprocess historical events, you want FULL compatibility: new schemas must be both backward and forward compatible. That rules out removing fields entirely, but it's the right constraint for a system where old events live in Iceberg for months.
On the Iceberg side, you issue an ALTER TABLE to add the new column. Iceberg's schema evolution is metadata-only: existing Parquet files don't get rewritten. Old partitions simply return null for the new column, which is exactly what you want.
1-- Add session_id to Iceberg table without rewriting historical data
2ALTER TABLE analytics.click_events
3ADD COLUMN session_id VARCHAR;
4
5-- Existing partitions return NULL for session_id
6-- New partitions written by v2 Flink job will populate it
7-- Query works across both old and new partitions transparently
8SELECT
9 date_trunc('hour', window_start) AS hour,
10 COUNT(*) AS total_clicks,
11 COUNT(session_id) AS clicks_with_session -- NULLs excluded naturally
12FROM analytics.click_events
13WHERE window_start >= '2024-01-01'
14GROUP BY 1;
15The schema registry acts as the gatekeeper. Before any producer deploys v2, they register the new schema. The registry validates compatibility and rejects it if the change would break existing consumers. Only after registration succeeds does the producer deploy. This makes schema changes a deliberate, auditable operation rather than an accident waiting to happen.

Silent data quality failures are the hardest problems in data engineering. Your pipeline is running, dashboards are updating, no errors in the logs. But your aggregates are 15% lower than reality because a Kafka consumer group fell behind three hours ago and nobody noticed.
Relying on the absence of errors as a signal of correctness. If Flink isn't throwing exceptions and Redis is getting writes, everything must be fine.
This fails because most data quality issues aren't errors. They're silent: a misconfigured watermark that drops late events, a partition imbalance that causes one shard to lag, a schema change that causes a field to deserialize as null. The pipeline hums along producing wrong numbers with no complaint.
Two independent checks cover most failure modes. First, monitor Kafka consumer group lag per partition. If any partition's lag exceeds a threshold (say, 5 minutes of data at normal throughput), page the on-call engineer. Lag is a leading indicator: it tells you the pipeline is falling behind before aggregates become stale.
Second, run a periodic reconciliation job that compares the number of events Kafka says were produced (end offset minus start offset per partition) against the number of rows written to Iceberg for the same time window. A significant discrepancy means events were dropped somewhere in the pipeline.
1def reconcile_window(kafka_admin_client, iceberg_table,
2 window_start, window_end, topic):
3 """
4 Compare Kafka message count to Iceberg row count for a time window.
5 Returns the discrepancy ratio; alert if > 0.01 (1%).
6 """
7 # Count messages in Kafka for the window
8 # (requires timestamp-based offset lookup)
9 start_offsets = kafka_admin_client.offsets_for_times(
10 {TopicPartition(topic, p): int(window_start.timestamp() * 1000)
11 for p in get_partitions(topic)}
12 )
13 end_offsets = kafka_admin_client.offsets_for_times(
14 {TopicPartition(topic, p): int(window_end.timestamp() * 1000)
15 for p in get_partitions(topic)}
16 )
17 kafka_count = sum(
18 end_offsets[tp].offset - start_offsets[tp].offset
19 for tp in start_offsets
20 )
21
22 # Count rows in Iceberg for the same window
23 iceberg_count = spark.sql(f"""
24 SELECT COUNT(*) FROM {iceberg_table}
25 WHERE server_timestamp >= '{window_start}'
26 AND server_timestamp < '{window_end}'
27 """).collect()[0][0]
28
29 discrepancy = abs(kafka_count - iceberg_count) / max(kafka_count, 1)
30 return discrepancy
31The complete observability stack has four layers working together. Kafka lag monitoring catches pipeline slowdowns. Row-count reconciliation catches drops. Flink's built-in metrics sink (throughput, checkpoint duration, backpressure ratio) catches processing bottlenecks before they become lag. And DLQ growth rate catches upstream schema issues at the source.
The DLQ rate is particularly valuable. A sudden spike in DLQ messages almost always means a producer deployed a schema change without registering it. You want to alert on DLQ growth rate (messages per minute), not just total DLQ size, because a slow leak is just as dangerous as a sudden flood.
On top of these, add anomaly detection on the aggregate values themselves. If your "active users per minute" metric drops 40% in two minutes with no corresponding drop in traffic, that's a data quality signal, not a business event. A simple z-score check against a rolling baseline catches this class of failure.
1def check_metric_anomaly(metric_name, current_value,
2 historical_values, z_threshold=3.0):
3 """
4 Flag aggregate values that deviate significantly from recent history.
5 Run this after each window closes.
6 """
7 mean = statistics.mean(historical_values)
8 stdev = statistics.stdev(historical_values)
9
10 if stdev == 0:
11 return False # No variance in history, skip check
12
13 z_score = abs(current_value - mean) / stdev
14
15 if z_score > z_threshold:
16 emit_alert(
17 metric=metric_name,
18 value=current_value,
19 expected_range=(mean - z_threshold * stdev,
20 mean + z_threshold * stdev),
21 z_score=z_score
22 )
23 return True
24 return False
25All four signals feed into a single Prometheus/Grafana dashboard. The on-call engineer sees one view: lag, reconciliation diff, Flink health, and DLQ rate. When something goes wrong, the combination of signals usually points directly at the layer that failed.

Every interviewer on this problem is testing the same underlying question: do you understand the difference between event time and processing time, and can you explain why that distinction breaks windowed aggregations when you get it wrong? That question applies whether you're mid-level or staff. Everything else below is layered on top of it.
event_type or source is a deliberate choice, not a default.window_start + metric_name) are necessary even after you've solved the Flink side.checkout_events by device_type), does that require a Flink job restart? A schema migration? A new Redis key pattern? Staff candidates design for this kind of change being cheap.