Trino: Run Federated SQL Across S3, Postgres, Kafka, and Iceberg Without Moving a Single Byte

Your data is everywhere. Operational records live in Postgres. Raw logs landed in S3. Your event stream is in Kafka. Someone else decided Iceberg was the future and started writing tables there. Now your analysts are writing five different data pipelines, four different connection libraries, and maintaining a Frankenstein warehouse just to answer: how many users who clicked this button yesterday also placed an order last month?

Trino solves this problem at the query layer, not the storage layer. You write standard SQL. Trino figures out where each piece of data lives, pushes predicates down to each source, and returns a unified result set. No data movement, no nightly ETL job, no waiting for the sync to finish.

This guide will get you from zero to a working multi-source Trino cluster using Docker Compose, with real connector configurations for S3/MinIO, PostgreSQL, Kafka, and Apache Iceberg.


What Trino Actually Is (and Isn’t)

Trino is a distributed SQL query engine. It does not store data. It has no proprietary storage format. It is a compute layer that speaks to your existing data stores through connectors.

The project started as Facebook’s internal "Presto" in 2012, then the original authors forked it in 2018 under the name PrestoSQL (to escape Facebook’s stewardship), and eventually rebranded to Trino in 2020. The other thing still called "Presto" today is PrestoDB — Meta’s maintained fork. They diverge more every year. Unless you’re running on AWS EMR or inside Meta’s infra, Trino is the one you want.

Official GitHub: https://github.com/trinodb/trino

The architecture is a coordinator/worker model. One coordinator node receives queries, parses them, builds a distributed execution plan, and dispatches work to worker nodes. Workers read data from the source systems and shuffle results between themselves. For development, coordinator and worker can run on the same machine.


The Stack We’re Building

  • Trino 446 (coordinator + worker, single node)
  • MinIO — S3-compatible object storage for local testing
  • PostgreSQL 16 — operational database
  • Kafka 3.7 + Schema Registry — event streaming
  • Apache Iceberg via Trino’s built-in Iceberg connector (tables on MinIO)

The goal: write a single SQL query that joins all four sources.


Docker Compose Setup

Create a project directory and start with docker-compose.yml. Read through the comments — they explain non-obvious choices.

# docker-compose.yml
version: "3.9"

services:

  # ── Object Storage ───────────────────────────────────────────────────────────
  minio:
    image: minio/minio:RELEASE.2024-04-06T05-26-02Z
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    ports:
      - "9000:9000"   # S3 API
      - "9001:9001"   # Web console
    volumes:
      - minio_data:/data
    healthcheck:
      test: ["CMD", "mc", "ready", "local"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Creates buckets on first start so Trino has somewhere to write
  minio-init:
    image: minio/mc:latest
    depends_on:
      minio:
        condition: service_healthy
    entrypoint: >
      /bin/sh -c "
        mc alias set local http://minio:9000 minioadmin minioadmin &&
        mc mb --ignore-existing local/warehouse &&
        mc mb --ignore-existing local/raw-logs
      "

  # ── Relational DB ─────────────────────────────────────────────────────────────
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: trino
      POSTGRES_PASSWORD: trino
      POSTGRES_DB: shop
    ports:
      - "5432:5432"
    volumes:
      - pg_data:/var/lib/postgresql/data
      - ./postgres-init:/docker-entrypoint-initdb.d

  # ── Kafka ─────────────────────────────────────────────────────────────────────
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    volumes:
      - zk_data:/var/lib/zookeeper/data

  kafka:
    image: confluentinc/cp-kafka:7.6.1
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # PLAINTEXT_HOST lets external clients (your laptop) connect
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    volumes:
      - kafka_data:/var/lib/kafka/data

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.1
    depends_on: [kafka]
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092

  # ── Trino ─────────────────────────────────────────────────────────────────────
  trino:
    image: trinodb/trino:446
    ports:
      - "8080:8080"
    volumes:
      - ./trino/etc:/etc/trino
    depends_on:
      minio-init:
        condition: service_completed_successfully
      postgres:
        condition: service_started
      kafka:
        condition: service_started

volumes:
  minio_data:
  pg_data:
  zk_data:
  kafka_data:

Trino Configuration Files

Trino reads its entire configuration from a directory tree. Create trino/etc/ and populate it:

trino/
└── etc/
    ├── config.properties
    ├── jvm.config
    ├── node.properties
    └── catalog/
        ├── hive.properties      ← S3 (raw files)
        ├── iceberg.properties   ← Iceberg tables
        ├── postgresql.properties
        └── kafka.properties

trino/etc/config.properties

# Single-node: coordinator and worker in the same process
coordinator=true
node-scheduler.include-coordinator=true

http-server.http.port=8080
discovery.uri=https://cd-linux.club

# Tune based on your heap (set in jvm.config)
query.max-memory=4GB
query.max-memory-per-node=2GB
query.max-total-memory-per-node=2GB

trino/etc/jvm.config

-server
-Xmx6G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
-Djdk.attach.allowAttachSelf=true
-XX:ReservedCodeCacheSize=512M

trino/etc/node.properties

node.environment=local
node.id=trino-node-1
node.data-dir=/data/trino

Connector: S3 via Hive Metastore (Hive connector)

The Hive connector lets Trino read raw files on S3 — CSV, Parquet, ORC, JSON — as external tables. It needs a Hive Metastore to store table metadata. For local use, Trino ships with an embedded HMS variant, but the easier path is to use the file-based metastore.

trino/etc/catalog/hive.properties

connector.name=hive

# MinIO as S3-compatible storage
hive.metastore=file
hive.metastore.catalog.dir=s3a://warehouse/hive-catalog

fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minioadmin
s3.aws-secret-key=minioadmin
s3.path-style-access=true      # Required for MinIO — virtual-hosted-style won't work

After Trino starts, create a schema and a table pointing at your raw CSV files:

-- Connect with: trino --server localhost:8080 --catalog hive
CREATE SCHEMA hive.logs WITH (location = 's3a://raw-logs/');

CREATE TABLE hive.logs.nginx_access (
    ts          VARCHAR,
    ip          VARCHAR,
    method      VARCHAR,
    path        VARCHAR,
    status      INTEGER,
    bytes       BIGINT
)
WITH (
    external_location = 's3a://raw-logs/nginx/',
    format = 'CSV',
    skip_header_line_count = 1
);

Connector: Apache Iceberg

Iceberg tables need a catalog to track table versions, schema history, and snapshots. Trino supports several catalog backends for Iceberg; the simplest self-hosted option without spinning up an extra service is the REST catalog or the Hive catalog (backed by the same file metastore you already have).

trino/etc/catalog/iceberg.properties

connector.name=iceberg
iceberg.catalog.type=hive_metastore

# Reuse the same file-based metastore
hive.metastore=file
hive.metastore.catalog.dir=s3a://warehouse/iceberg-catalog

fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minioadmin
s3.aws-secret-key=minioadmin
s3.path-style-access=true

# Optional: write Parquet by default
iceberg.file-format=PARQUET

Create your first Iceberg table:

CREATE SCHEMA iceberg.analytics WITH (location = 's3a://warehouse/analytics/');

CREATE TABLE iceberg.analytics.events (
    event_id    BIGINT,
    user_id     BIGINT,
    event_type  VARCHAR,
    occurred_at TIMESTAMP(6) WITH TIME ZONE,
    properties  MAP(VARCHAR, VARCHAR)
)
WITH (
    partitioning = ARRAY['day(occurred_at)'],
    sorted_by    = ARRAY['user_id']
);

Iceberg gives you ACID writes, schema evolution, and time travel — all from Trino SQL. No Spark required.


Connector: PostgreSQL

Dead simple. One file, done.

trino/etc/catalog/postgresql.properties

connector.name=postgresql
connection-url=jdbc:postgresql://postgres:5432/shop
connection-user=trino
connection-password=trino

# Push aggregations and filters to Postgres where possible
postgresql.aggregation-pushdown.enabled=true

Seed Postgres with some data via postgres-init/01-seed.sql:

CREATE TABLE customers (
    id          SERIAL PRIMARY KEY,
    email       TEXT UNIQUE NOT NULL,
    created_at  TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE orders (
    id          SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    total_cents INTEGER,
    placed_at   TIMESTAMPTZ DEFAULT now()
);

INSERT INTO customers (email) VALUES
    ('[email protected]'),
    ('[email protected]'),
    ('[email protected]');

INSERT INTO orders (customer_id, total_cents) VALUES
    (1, 4999), (1, 12000), (2, 799), (3, 23450);

Connector: Kafka

The Kafka connector exposes each topic as a table. Without a Schema Registry, you get raw byte columns. With one, Trino can deserialize Avro or JSON schema messages into proper typed columns automatically.

trino/etc/catalog/kafka.properties

connector.name=kafka
kafka.nodes=kafka:29092

# Topics to expose (comma-separated)
kafka.table-names=clickstream,page_views

# Pull schema definitions from Confluent Schema Registry
kafka.confluent-schema-registry-url=http://schema-registry:8081

# Default deserializer for key/value
kafka.default-schema=default

For topics without a registry schema, Trino falls back to a _message column containing the raw bytes as a VARCHAR. You can always define a custom table description JSON file in etc/kafka/ to map fields manually — but Schema Registry is cleaner.

Produce a few test events and query them immediately:

SELECT _timestamp, _partition_id, user_id, event_type
FROM kafka.default.clickstream
WHERE _timestamp > NOW() - INTERVAL '10' MINUTE
LIMIT 100;

The Kafka connector is read-only by design. Trino can read offsets but cannot commit them — it always reads from the latest or beginning depending on your config. Don’t use it as a consumer group replacement.


The Federated Query

Here’s the payoff. One query, four data sources:

-- Who are the customers from Postgres that:
-- 1. Have at least one order over $100
-- 2. Generated a clickstream event in Kafka in the last hour
-- 3. Have a corresponding analytics event in Iceberg
-- 4. And their IP shows up in the nginx logs on S3

WITH high_value_customers AS (
    SELECT c.id, c.email
    FROM postgresql.public.customers c
    JOIN postgresql.public.orders o ON o.customer_id = c.id
    WHERE o.total_cents > 10000
),
recent_clicks AS (
    SELECT DISTINCT user_id
    FROM kafka.default.clickstream
    WHERE _timestamp > NOW() - INTERVAL '1' HOUR
),
iceberg_signups AS (
    SELECT DISTINCT user_id
    FROM iceberg.analytics.events
    WHERE event_type = 'signup'
      AND occurred_at > TIMESTAMP '2024-01-01 00:00:00 UTC'
),
suspicious_ips AS (
    SELECT DISTINCT ip
    FROM hive.logs.nginx_access
    WHERE status = 403
      AND CAST(ts AS TIMESTAMP) > NOW() - INTERVAL '24' HOUR
)
SELECT
    hvc.email,
    hvc.id AS customer_id
FROM high_value_customers hvc
JOIN recent_clicks rc      ON rc.user_id     = hvc.id
JOIN iceberg_signups ie    ON ie.user_id     = hvc.id
-- Exclude customers whose IPs are in the blocklist
WHERE hvc.id NOT IN (
    SELECT CAST(ip AS BIGINT) FROM suspicious_ips
);

Trino parses this, identifies that postgresql.* queries should be pushed down to Postgres (including the orders.total_cents > 10000 predicate), sends the Kafka read as a streaming scan, reads the Iceberg snapshot from MinIO, and does the final join in its own distributed execution layer.


Gotchas

Time zones in Kafka. The _timestamp virtual column in Kafka is always UTC milliseconds. If you’re comparing against a TIMESTAMP column from Postgres that’s stored as timestamptz, Trino handles the conversion — but if your Postgres timestamps are stored as naive timestamp (no timezone), you’ll get silent wrong results. Audit your timestamp types before writing cross-source time comparisons.

Predicate pushdown isn’t free. Trino is smart, but it’s not magic. If you write a Kafka scan with no predicate, it will read every message in the topic from offset 0. For high-volume topics, always add a _timestamp or _partition_offset filter. Use EXPLAIN to see what actually gets pushed down.

Iceberg snapshots and the metastore. When using file-based metastore, the catalog directory needs to be writable by Trino. If MinIO permissions are wrong, table creation silently fails or leaves orphaned metadata files. Test with SHOW SCHEMAS FROM iceberg right after startup.

Kafka connector memory. By default, Kafka splits are read into Trino worker memory. A compacted topic with 50M messages will blow your heap. Use kafka.max-offsets-per-split to cap how many records a single split handles:

kafka.max-offsets-per-split=100000

JVM heap vs query memory. query.max-memory-per-node cannot exceed ~85% of -Xmx. If you set -Xmx6G and query.max-memory-per-node=6GB, queries will fail with memory exceeded errors before they even run. Leave headroom for JVM internals: set query memory to ~70% of heap.

Schema Registry auth. If your production Schema Registry is behind basic auth, the catalog property is kafka.confluent-schema-registry-url=http://user:pass@registry:8081. Don’t log that URL.


Production Considerations

Separate coordinator from workers. In production, the coordinator should never run data-intensive work. Set node-scheduler.include-coordinator=false on the coordinator. Workers scale horizontally — add nodes by pointing them at the coordinator’s discovery URI.

Authentication. Trino ships with password file auth, LDAP, Kerberos, JWT, and OAuth2. For internal use, password file (PASSWORD auth with http-server.authentication.type=PASSWORD) is fine. For a team deployment, LDAP or OAuth2 against your identity provider is the right call.

The Hive Metastore problem at scale. File-based metastore works for demos and small setups. If you have hundreds of tables across multiple teams, run a real Hive Metastore service (or Nessie, or AWS Glue). The file-based catalog has no locking — concurrent DDL operations can corrupt it.

Iceberg REST catalog. For production Iceberg, swap the hive_metastore catalog type for a REST catalog backed by Project Nessie or Apache Polaris. You get transactional multi-table operations, catalog-level access control, and cross-engine compatibility (Spark, Flink, and Trino all read the same tables).

Spill to disk. Enable spill for large sorts and aggregations that exceed memory:

# config.properties
spill-enabled=true
spiller-spill-path=/tmp/trino-spill
max-spill-per-node=100GB

Mount a fast NVMe volume at that path in production. Spilling to a slow EBS volume turns a 30-second query into a 10-minute one.


Running It

# Bring everything up
docker compose up -d

# Wait for Trino to be ready (~30 seconds)
docker compose logs -f trino | grep "SERVER STARTED"

# Connect with the CLI
docker run -it --network host trinodb/trino trino \
  --server https://cd-linux.club

# Or install the CLI locally (it's just a jar)
curl -Lo trino https://repo1.maven.org/maven2/io/trino/trino-cli/446/trino-cli-446-executable.jar
chmod +x trino
./trino --server localhost:8080

The web UI lives at https://cd-linux.club. It shows active queries, worker node status, and query execution plans. The plan view is invaluable for debugging — it tells you exactly which predicates got pushed to which connector and where data shuffling happens.


Where to Go From Here

Once you’re comfortable with the basics, the next interesting moves are:

Delta Lake connector — if part of your team writes Spark-based Delta tables, Trino can read them natively without converting to Iceberg.

Ranger or OPA for access control — Trino’s system access control interface supports Apache Ranger (for enterprises already running it) and Open Policy Agent for policy-as-code approaches. Both let you control column-level access, row filtering, and catalog-level permissions without changing application code.

Resource groups — if multiple teams share a Trino cluster, resource groups let you cap how much memory and CPU any one team or query type can consume. Without them, one runaway analyst query will evict everyone else.

Materialized views — Trino supports materialized views on top of connector tables, stored as Iceberg snapshots. For frequently-run federated queries, materializing the intermediate join result can cut query time from minutes to seconds.

Trino won’t replace your data warehouse for heavy analytical workloads where query latency at petabyte scale matters. But as a query federation layer that lets you explore across sources before you’ve decided what to ingest where, it’s one of the most practical tools in the self-hosted data stack.

Leave a comment

👁 Views: 2,290 · Unique visitors: 1,647