mq-bridge¶
mq-bridge is a generic, transport-agnostic message broker binding. omniload uses it to consume messages from Kafka, NATS, AMQP (RabbitMQ), MQTT, ZeroMQ, AWS SQS, or an in-memory transport, and load them into any omniload destination.
omniload supports mq-bridge as a source. This page is the full reference for every transport; several brokers also have a short landing page: NATS, MQTT, ZeroMQ, Amazon SQS, and IBM MQ.
AMQP/RabbitMQ is also available as a native source (
amqp://, see RabbitMQ) with its own output columns (data/metadata/msg_id). Use this mq-bridge variant (amqp+mqb://) for commit-after-load delivery and a single engine shared across all brokers.
Kafka is also available as a native source (
kafka://, see Apache Kafka) with richer event decoding. Use this mq-bridge variant (kafka+mqb://) for durable commit-after-load delivery and a single engine shared across all brokers.
Installation¶
mq-bridge ships as a native wheel and is bundled with omniload, so no extra install step is needed:
pip install omniload
IBM MQ is the one exception: it additionally requires the IBM MQ redistributable client to be present at runtime. See IBM MQ → Installation.
URI format¶
Each broker is addressed via a compound <transport>+mqb:// scheme. The broker URL and the
--source-table (topic/subject/queue) make up the connection; everything else is passed as
query parameters.
<transport>+mqb://<broker>?<param>=<value>&...
Transport |
URI example |
Topic-like field ( |
|---|---|---|
Kafka |
|
|
NATS |
|
|
AMQP |
|
|
MQTT |
|
|
ZeroMQ |
|
|
AWS SQS |
|
|
IBM MQ |
|
|
memory |
|
|
The --source-table value supplies the topic-like field for the transport. An explicit
?topic= / ?subject= / ?queue= / ?queue_url= query parameter overrides it.
The authority (the part before ?) becomes the broker URL; the query string becomes the
endpoint’s config fields. Kafka and NATS accept a comma-separated host list in the
authority for clusters/replicas (e.g. kafka+mqb://b1:9092,b2:9092?group_id=g). MQTT and
AMQP are single-host — front multiple brokers with a load balancer or DNS. AWS SQS has no
separate connection URL: the queue is named by its full queue_url, supplied via
--source-table (or ?queue_url=..., percent-encoded), with region discovered from the URL
or ?region=. IBM MQ addresses queue managers as host(port), but you write the familiar
host:port authority (comma-separated for failover, e.g. ibmmq+mqb://h1:1414,h2:1414) and
omniload translates it. queue_manager and channel are required query parameters;
--source-table names the target queue, or pass ?topic= to consume in pub/sub subscriber mode.
memory: in-process & IPC channels¶
The memory transport’s url/topic are aliases for a single channel identifier, which may be:
a bare name —
memory+mqb://orders(mq-bridge treats it asmemory://orders);a
memory://URL —memory+mqb://orders;an
ipc://,unix://, orpipe://channel for cross-process delivery. Because these carry their own scheme, pass them through--source-table(or?url=), not the authority:--source-table 'ipc:///tmp/mq.sock'ormemory+mqb://?url=ipc://my-queue.
Connectivity parameters¶
Any field accepted by the transport’s mq-bridge endpoint config can be passed as a query
parameter; it is forwarded verbatim to mq_bridge.Consumer, with numeric/boolean fields
coerced from their string form. The consumer-relevant fields per transport:
Transport |
Useful query parameters |
|---|---|
Kafka |
|
NATS |
|
AMQP |
|
MQTT |
|
ZeroMQ |
|
AWS SQS |
|
IBM MQ |
|
memory |
|
For the authoritative field list per transport, see mq-bridge’s
configuration guide and
mq-bridge.schema.json.
Fields exposed only to publishers (e.g. request_reply, stream_max_messages) are accepted but
have no effect on a source. Note that mq-bridge’s AWS max_messages (the SQS receive batch
size, ≤ 10) is shadowed by omniload’s own max_messages transfer parameter below and uses the
mq-bridge default.
Authentication & TLS¶
Simple credential fields are flat query parameters: ?username=u&password=p (Kafka SASL /
AMQP / MQTT), ?token=... (NATS).
TLS lives in a nested tls block in mq-bridge’s config. Express it with dotted query keys,
which expand into the nested block:
kafka+mqb://broker:9093?group_id=g&tls.required=true&tls.ca_file=/etc/ssl/ca.pem
becomes {"kafka": {..., "tls": {"required": true, "ca_file": "/etc/ssl/ca.pem"}}}. The
supported tls.* keys are required, ca_file, cert_file / key_file (mTLS),
cert_password, and accept_invalid_certs. This dotted-key expansion works for any nested
config block, not just tls.
Transfer parameters¶
These drive the consume loop and are not forwarded to the broker config:
max_messages: upper bound of messages drained per run, defaults to 100000.idle_timeout_ms: how long to wait for new messages before stopping, defaults to 2000.batch_size: messages fetched per poll, defaults to 500.format:json(default) decodes the payload as JSON;textstores the raw text under avaluecolumn. Any other value is rejected.
Output format¶
Each message is stored as a row. The decoded payload becomes the top-level columns, plus:
Column |
Type |
Description |
|---|---|---|
|
VARCHAR |
The message’s stable source position (Kafka |
|
JSON |
The message metadata as reported by mq-bridge. |
Delivery semantics¶
Delivery is at-least-once: each batch’s offset is acked only after the dlt load has
durably committed. If the load fails, nothing is acked and the broker redelivers the batch on
the next run. Because the resource merges on _mqb_id, redelivered messages are deduplicated —
effectively-once.
Acks are per-batch (via mq-bridge’s poll_batch/ack tokens), so only batches that were fully
handed to the load package are acked. --yield-limit is therefore safe: a limit that stops
mid-batch leaves that batch un-acked, so it is redelivered on the next run and deduplicated on
_mqb_id rather than being silently dropped.
mq-bridge owns both keys behind this guarantee, so two flags are rejected rather than silently
honored: --incremental-key (mq-bridge manages incrementality itself) and --primary-key
(which would override the _mqb_id merge key and break deduplication).
Sample command¶
memory transport to DuckDB¶
A brokerless smoke test using the in-memory transport:
omniload ingest \
--source-uri 'memory+mqb://?topic=orders&capacity=4096' \
--source-table 'orders' \
--dest-uri 'duckdb:///mqbridge.duckdb' \
--dest-table 'dest.orders'
Kafka to PostgreSQL¶
omniload ingest \
--source-uri 'kafka+mqb://localhost:9092?group_id=omniload' \
--source-table 'orders' \
--dest-uri 'postgres://postgres:postgres@localhost:5432/?sslmode=disable' \
--dest-table 'public.orders'
The result is a public.orders table with the message payload’s top-level JSON keys as
columns, plus _mqb_id and _mqb_metadata.