Streaming
PGStyx supports Structured Streaming as a microbatch sink for PostgreSQL. Use this page to choose a delivery mode, pick the right write pattern, and size the stream so PostgreSQL stays healthy.
Delivery guarantees
Section titled “Delivery guarantees”| Mode | Default | When to use |
|---|---|---|
| At-least-once | Yes (streamingExactlyOnce=false) | When the target can absorb replays cleanly |
| Exactly-once | No (streamingExactlyOnce=true) | When replayed microbatches are not acceptable |
At-least-once means a restart can replay a microbatch that was only partly committed. Two patterns make that safe in practice:
- Upsert on a stable natural key.
writeMode=upsertwithmergeKeysset to something invariant across retries. Replays converge on the same final state. - Append with
warnAndFilter. For insert-only workloads, pairvalidationMode=warnAndFilterwith a target-side uniqueness rule so duplicates can be skipped on replay instead of failing the stream.
Minimal streaming upsert
Section titled “Minimal streaming upsert”import org.apache.spark.sql.streaming.Trigger
val stream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker:9092") .option("subscribe", "events") .load()
stream .selectExpr("CAST(key AS STRING) AS user_id", "CAST(value AS STRING) AS payload") .writeStream .format("pgstyx") .option("url", "jdbc:postgresql://host:5432/warehouse") .option("dbtable", "users") .option("user", "postgres") .option("password", "secret") .option("licenseKey", sys.env("PGSTYX_LICENSE_KEY")) .option("writeMode", "upsert") .option("mergeKeys", "user_id") .option("checkpointLocation", "s3://bucket/pgstyx-users-stream/") .trigger(Trigger.ProcessingTime("30 seconds")) .start()import os
stream = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "broker:9092") \ .option("subscribe", "events") \ .load()
stream \ .selectExpr("CAST(key AS STRING) AS user_id", "CAST(value AS STRING) AS payload") \ .writeStream \ .format("pgstyx") \ .option("url", "jdbc:postgresql://host:5432/warehouse") \ .option("dbtable", "users") \ .option("user", "postgres") \ .option("password", "secret") \ .option("licenseKey", os.environ["PGSTYX_LICENSE_KEY"]) \ .option("writeMode", "upsert") \ .option("mergeKeys", "user_id") \ .option("checkpointLocation", "s3://bucket/pgstyx-users-stream/") \ .trigger(processingTime="30 seconds") \ .start()Exactly-once options
Section titled “Exactly-once options”Use exactly-once when at-least-once replay is not acceptable. PGStyx keeps enough write progress to avoid re-applying a microbatch that has already committed.
| Option | Default | Notes |
|---|---|---|
streamingExactlyOnce | false | Enables exactly-once mode |
commitLogTable | auto-derived | Override the bookkeeping table name |
commitLogRetentionEpochs | none | Prune older bookkeeping rows after commit; keep this larger than your replay window |
Exactly-once depends on two things staying stable across restarts:
checkpointLocationmust point to durable storage and must not be deleted or swapped.- The upstream plan should keep the same partitioning shape between restarts.
Minimal exactly-once append
Section titled “Minimal exactly-once append”import org.apache.spark.sql.streaming.Trigger
stream .writeStream .format("pgstyx") .option("url", "jdbc:postgresql://host:5432/warehouse") .option("dbtable", "events") .option("user", "postgres") .option("password", "secret") .option("licenseKey", sys.env("PGSTYX_LICENSE_KEY")) .option("writeMode", "append") .option("streamingExactlyOnce", "true") .option("commitLogRetentionEpochs", "100") .option("checkpointLocation", "s3://bucket/pgstyx-events-stream/") .trigger(Trigger.ProcessingTime("30 seconds")) .start()import os
stream \ .writeStream \ .format("pgstyx") \ .option("url", "jdbc:postgresql://host:5432/warehouse") \ .option("dbtable", "events") \ .option("user", "postgres") \ .option("password", "secret") \ .option("licenseKey", os.environ["PGSTYX_LICENSE_KEY"]) \ .option("writeMode", "append") \ .option("streamingExactlyOnce", "true") \ .option("commitLogRetentionEpochs", "100") \ .option("checkpointLocation", "s3://bucket/pgstyx-events-stream/") \ .trigger(processingTime="30 seconds") \ .start()Output modes
Section titled “Output modes”| Spark output mode | PGStyx behavior |
|---|---|
Append | Every emitted row is written once per successful microbatch. Works with append or upsert. |
Update | Spark emits only changed aggregation rows. Use writeMode=upsert. |
Complete | Full aggregation table per trigger. Keep the target replay-safe with upsert or a stable append pattern. writeMode=overwrite is rejected for streaming. |
Triggers
Section titled “Triggers”| Trigger | Supported |
|---|---|
Trigger.ProcessingTime(interval) | Yes |
Trigger.Once / Trigger.AvailableNow | Yes |
Trigger.Continuous | No — PGStyx is microbatch only |
Checkpointing
Section titled “Checkpointing”Point checkpointLocation at durable storage such as S3, ADLS, GCS, or HDFS. Keep it stable across restarts. If you enable exactly-once, deleting or replacing the checkpoint breaks replay protection.
Backpressure and pool sizing
Section titled “Backpressure and pool sizing”Streaming jobs can hold connections for the duration of a microbatch. The connection-budget math from Tuning and Metrics still applies:
If lock pressure or connection pressure appears, reduce write parallelism before raising throughput elsewhere:
- Lower
spark.sql.shuffle.partitions. - Set
maxOffsetsPerTriggeron Kafka sources. - Use a less aggressive
Trigger.ProcessingTimeinterval. - Lower
maxPoolSizewhen the database is connection-bound.
Plan note
Section titled “Plan note”Structured streaming requires Enterprise. .writeStream.format("pgstyx") without a matching licenseKey throws LicenseException at stream start, naming structured streaming as the gated feature.