Skip to content

Streaming

PGStyx supports Structured Streaming as a microbatch sink for PostgreSQL. Use this page to choose a delivery mode, pick the right write pattern, and size the stream so PostgreSQL stays healthy.

ModeDefaultWhen to use
At-least-onceYes (streamingExactlyOnce=false)When the target can absorb replays cleanly
Exactly-onceNo (streamingExactlyOnce=true)When replayed microbatches are not acceptable

At-least-once means a restart can replay a microbatch that was only partly committed. Two patterns make that safe in practice:

  1. Upsert on a stable natural key. writeMode=upsert with mergeKeys set to something invariant across retries. Replays converge on the same final state.
  2. Append with warnAndFilter. For insert-only workloads, pair validationMode=warnAndFilter with a target-side uniqueness rule so duplicates can be skipped on replay instead of failing the stream.
import org.apache.spark.sql.streaming.Trigger
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.load()
stream
.selectExpr("CAST(key AS STRING) AS user_id", "CAST(value AS STRING) AS payload")
.writeStream
.format("pgstyx")
.option("url", "jdbc:postgresql://host:5432/warehouse")
.option("dbtable", "users")
.option("user", "postgres")
.option("password", "secret")
.option("licenseKey", sys.env("PGSTYX_LICENSE_KEY"))
.option("writeMode", "upsert")
.option("mergeKeys", "user_id")
.option("checkpointLocation", "s3://bucket/pgstyx-users-stream/")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()

Use exactly-once when at-least-once replay is not acceptable. PGStyx keeps enough write progress to avoid re-applying a microbatch that has already committed.

OptionDefaultNotes
streamingExactlyOncefalseEnables exactly-once mode
commitLogTableauto-derivedOverride the bookkeeping table name
commitLogRetentionEpochsnonePrune older bookkeeping rows after commit; keep this larger than your replay window

Exactly-once depends on two things staying stable across restarts:

  • checkpointLocation must point to durable storage and must not be deleted or swapped.
  • The upstream plan should keep the same partitioning shape between restarts.
import org.apache.spark.sql.streaming.Trigger
stream
.writeStream
.format("pgstyx")
.option("url", "jdbc:postgresql://host:5432/warehouse")
.option("dbtable", "events")
.option("user", "postgres")
.option("password", "secret")
.option("licenseKey", sys.env("PGSTYX_LICENSE_KEY"))
.option("writeMode", "append")
.option("streamingExactlyOnce", "true")
.option("commitLogRetentionEpochs", "100")
.option("checkpointLocation", "s3://bucket/pgstyx-events-stream/")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
Spark output modePGStyx behavior
AppendEvery emitted row is written once per successful microbatch. Works with append or upsert.
UpdateSpark emits only changed aggregation rows. Use writeMode=upsert.
CompleteFull aggregation table per trigger. Keep the target replay-safe with upsert or a stable append pattern. writeMode=overwrite is rejected for streaming.
TriggerSupported
Trigger.ProcessingTime(interval)Yes
Trigger.Once / Trigger.AvailableNowYes
Trigger.ContinuousNo — PGStyx is microbatch only

Point checkpointLocation at durable storage such as S3, ADLS, GCS, or HDFS. Keep it stable across restarts. If you enable exactly-once, deleting or replacing the checkpoint breaks replay protection.

Streaming jobs can hold connections for the duration of a microbatch. The connection-budget math from Tuning and Metrics still applies:

If lock pressure or connection pressure appears, reduce write parallelism before raising throughput elsewhere:

  • Lower spark.sql.shuffle.partitions.
  • Set maxOffsetsPerTrigger on Kafka sources.
  • Use a less aggressive Trigger.ProcessingTime interval.
  • Lower maxPoolSize when the database is connection-bound.

Structured streaming requires Enterprise. .writeStream.format("pgstyx") without a matching licenseKey throws LicenseException at stream start, naming structured streaming as the gated feature.