This version is still in development and is not considered stable yet. For the latest stable version, please use StreamX Guides 1.1.0!

StreamX Messaging

StreamX is based on messaging. Services consume and produce messages. Messages are send to/read from on messaging system topics. The services connected by topics form data pipelines - configurable flows of messages within the StreamX mesh. The messaging system offers many configuration options and features. StreamX hides the implementation of messaging systems to allow focusing on the implementation of business logic while utilizing the benefits of distributed messaging systems in runtime. The messaging and streaming platform used in StreamX is Apache Pulsar.

Services operate on a messaging abstraction over the actual messaging platform, provided by Smallrye Reactive Messaging and Quasar (a library developed within the scope of StreamX) frameworks. The frameworks and abstraction used by the services are beyond the scope of this document.

The purpose of this document is to summarize the messaging system (Apache Pulsar) setup used to support the intended StreamX capabilities and characteristics. The document also provides a brief explanation of the most important Pulsar concepts used in StreamX necessary to understand the described setup. For more details see the Apache Pulsar documentation.

Messaging terms in StreamX

This section provides a brief explanation of the most important messaging-related terms necessary to understand the StreamX setup. Each item is also described in the context of StreamX.

Messages

Messages are the basic "unit" of Pulsar and StreamX, encapsulating events and enabling their exchange between parties.

StreamX service mesh internally operates on the Pulsar messages utilizing especially:

  • payload - the data carried by the message

  • key - string tag of the message; in StreamX describing the subject related to the event; useful for features like topic-compaction

  • event time - timestamp attached to a message by applications; in StreamX, timestamp of when the occurrence happened

  • properties - key/value map of properties

StreamX, utilizing SmallRye Reactive Messaging and Quasar frameworks API, hides Pulsar implementation.

A message is acknowledged (ack) when its processing has been successful. A message is nacked (negative acknowledgement - nack) indicating that the message was not processed correctly.

StreamX services ack and nack messages automatically in most of the cases. StreamX provides default configurations related to handling of negative acknowledgement in default processing service config and default delivery service config.

Tenant

A Pulsar tenant is an administrative unit for allocating capacity and enforcing authentication or authorization.

A single StreamX mesh instance operates on a single Pulsar tenant. The tenant name used for StreamX mesh instance is configurable.

Namespace

A Pulsar namespaces is a logical grouping of topics and the administrative unit within a tenant. The configuration policies set on a namespace apply to all the topics created in that namespace.

StreamX operates on predefined namespaces, each configured according to its specific requirements.

Topic

A Pulsar topic is a unit of storage that organizes messages into a stream.

StreamX operates on topics assigned to predefined namespaces. The names and number of topics are not predefined and depend on the given StreamX mesh and the services used within it.

StreamX operates only on persistent Pulsar topics - all messages are durably persisted on disks. This ensures message delivery for applications reading messages from the topics. It works even in the case of a restart of the Pulsar components or disconnection of a topic consumer. For example, this applies during a StreamX service restart.

Retention

Messages that have been acknowledged by all consumers are immediately deleted. This behaviour can be changed by using retention. The messages on topics with configured retention are not deleted after acknowledgement from all consumers - unlike if the topic is only a persistent topic without configured retention. StreamX configures retention policies on namespaces containing data needed also after processing of all the currently existing services.

Topic compaction

Topic compaction is a mechanism for removing messages that are obscured by later messages. Topic compaction goes through the topic on a per-key basis and leaves only the most recent message associated with that key.

StreamX uses the compaction, with event time-based topic compactor described in configuration section. The compaction reduce the size of the data storage and speed up reading the topics (by reducing the number of messages to read).

Compaction also removes messages with null payloads.

Partitions

Topics can be partitioned. A partitioned topic is a type of topic allowing higher throughput. A partitioned topic is implemented by N internal topics, where N is the number of partitions.

StreamX configures the topics' partition count for optimization of throughput.

Schema

Pulsar topics use schemas to:

  • create a protocol between the applications that generate messages and the applications that consume them

  • enforce data consistency

  • reduce the need for serialization and deserialization in client code (Pulsar stores the data in raw bytes)

StreamX uses Pulsar schemas created from model classes representing the message payloads used in services.

Subscriptions

When a consumer subscribes to a topic, it must specify the subscription name. A subscription is identified by name. A Pulsar subscription determines how messages are delivered to one or many consumers, depending on the subscription type.

StreamX uses two types of subscriptions:

  • exclusive – allows a single consumer to attach to the subscription; all messages are read by a single consumer; every exclusive subscription has a unique name

  • shared – allows multiple consumers to attach to the same subscription; messages are split between consumers and any given message is delivered to only one of the consumers; all consumers use the same name of the subscription; shared subscriptions do not guarantee message ordering; StreamX uses shared subscriptions to scale processing services; outdated messages are filtered out by the Quasar framework

A consumer can connect to an existing shared subscription (for example, a new StreamX processing service replica). If a consumer specifies a subscription name that does not exist beforehand, the subscription is automatically created.

Subscription mode

When a subscription is created, an associated cursor is created to record the last consumed position. The subscription mode indicates whether the cursor belongs to the durable type or non-durable type. When a consumer of the subscription restarts, it can continue consuming from the last message it consumed.

There are two important differences between durable and non-durable modes:

  • Keeping state after Pulsar components restart – Durable subscriptions allow consumers to continue reading after Pulsar components (broker) restart because the state is saved in persistent storage. Non-durable subscriptions lose this state.

  • Message retention – Messages in a topic without any durable subscriptions are marked for deletion. To prevent messages from being marked for deletion, you can create a durable subscription for the topic. In this scenario, only acknowledged messages are marked for deletion. This behavior can be modified by configuring topic retention settings.

Dead letter queue (DLQ)

DLQ is a mechanism to store messages that the messaging system cannot or should not deliver. In Pulsar, dead letter topic allows you to continue message consumption even when some messages are not consumed successfully.

StreamX configuration for messaging

This section describes messaging configuration in StreamX.

The configuration is provided by:

  • services extensions – StreamX dependencies used during service implementation; the default configurations for Pulsar consumers and producers are provided by service extensions for all channels, see Processing Service reference and Delivery Service reference

  • pulsar-init container – a container started by StreamX at mesh startup, performing Pulsar initialization including tenant and namespaces creation and configuration

Tenant

StreamX services and containers use the streamx.tenant property, which should be configured with the same value for all services and containers started in a service mesh instance.

The value must be set by the system managing the StreamX services mesh. The tenant is initialized in Pulsar by the pulsar-init container.

Service and service instance IDs

Services in a mesh can be scaled to any number of replicas.

  • streamx.service.id – represents the identifier of the service in a mesh instance, which must be the same for all replicas of the given service.

  • quasar.application.instance-id – represents the identifier of the service replica, which must be unique for each replica of the service.

Both values must be set by the system managing the StreamX services mesh on each service replica. It is recommended to start the value of quasar.application.instance-id with the value set in streamx.service.id, for example:

streamx.service.id=some-processing-service
quasar.application.instance-id=some-processing-service-bhfg-gfds-rwea

Namespaces and topics

StreamX services use topics to communicate by exchanging messages and storing data. Topics are created in predefined and configured namespaces: inboxes, relays, outboxes, and stores. The namespaces are created by the pulsar-init container under the initialized tenant. All topics are auto-created when services start to use them.

The namespaces, except stores, allow you to set the number of topic partitions to optimize throughput. To use partitioned topics, the number of Pulsar brokers must also be adjusted. See partitioned topics Pulsar documentation. The stores namespace cannot operate on partitioned topics due to the process of synchronization.

Namespace Description Compaction Retention Partitions support

inboxes

Contains ingested data from external sources

yes, by default

yes, if compaction is disabled

yes

relays

Contains data exchanged by processing services

no

no

yes

outboxes

Contains data consumed by delivery services

yes

no, compaction is used

yes

stores

Contains services state

yes

no, compaction is used

no

pulsar-init container configuration options and default values:

streamx.inboxes.partitions=0 (1)
streamx.relays.partitions=0 (2)
streamx.outboxes.partitions=0 (3)
streamx.inboxes.compactionThreshold=1073741824 (4)
streamx.outboxes.compactionThreshold=1073741824 (5)
streamx.stores.compactionThreshold=1073741824 (6)
streamx.tenant.subscriptionExpirationMinutes=10080 (7)
1 Number of topic partitions in the inboxes namespace; 0 means non-partitioned topics
2 Number of topic partitions in the relays namespace
3 Number of topic partitions in the outboxes namespace
4 Compaction threshold for topics in the inboxes namespace (1GB); if set to 0, infinite retention is used
5 Compaction threshold for topics in the outboxes namespace (1GB); if set to 0, compaction is disabled
6 Compaction threshold for topics in the stores namespace (1GB); if set to 0, compaction is disabled
7 The subscription expiration time is set for all namespaces (1 week); keep it enabled for cleanup of old subscriptions; the value should be large enough to avoid removing subscriptions for services that are temporarily down, which could lead to loss of message processing.

Topic compaction strategy

Using Quasar store for a channel ensures rejection of outdated messages - messages with event time older than the last known event time for a given key will be skipped (acknowledged without processing). To ensure a valid state of the store after loading messages from a compacted store topic, the standard Pulsar key-based topic compaction is not enough. The event time must be used to retain the latest version of the key-based message instead of the last message in the topic. See more in the related Pulsar Improvement Proposal and its implementation.

The event time-based compaction strategy is set in the Pulsar broker configuration by the system managing the StreamX services mesh.

compactionServiceFactoryClassName=org.apache.pulsar.compaction.EventTimeCompactionServiceFactory

Topics and channels

The services’ incoming and outgoing channels operating on Pulsar must be connected to a topic - the outgoing channel writes messages to the topic, while the incoming channel reads messages from it. The namespace and topic name are configured at the mesh configuration level when services are getting connected. The topic value must be set for each channel by the system managing the StreamX services mesh.

Topics schema

Topic schemas are automatically set and resolved from models used by StreamX services. A Pulsar schema defines how to serialize structured data to raw message bytes. StreamX uses the Avro schema. Avro ensures fast serialization/deserialization and schema definition. See Pulsar documentation about schema, Processing Service reference, and Delivery Service reference.

Default configuration for StreamX Processing Service consumer

The default configuration:

mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest (1)
mp.messaging.incoming.[channel-name].subscriptionType=Shared (2)
mp.messaging.incoming.[channel-name].subscriptionName=${streamx.service.id} (3)
mp.messaging.incoming.[channel-name].subscriptionMode=Durable (4)
mp.messaging.incoming.[channel-name].deadLetterPolicy.initialSubscriptionName=${streamx.service.id} (5)
mp.messaging.incoming.[channel-name].deadLetterPolicy.maxRedeliverCount=3 (6)
mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000 (7)
1 When a new subscription is created, it reads all messages available on the topic. Note that readCompacted is not set for the processing service consumer, so if compaction is enabled on the inboxes namespace and a new service starts reading from that topic, it will not read the compacted messages, but full not compacted topic.
2 The processing service consumer configuration must support scaling by adding more replicas of the service. This is enabled by using a shared subscription, where each replica processes a portion of messages. Shared subscriptions do not guarantee message ordering, but StreamX ensures processing of only the latest version of key-based messages.
3 The subscription name is the same for all replicas, because is set to the service identifier.
4 Durable mode keeps the cursor state after a Pulsar broker restart and deletes from the topic only acknowledged messages.
5 Sets the DLQ topic subscription name to automatically create an initial subscription for the DLQ topic.
6 When the maxRedeliverCount is reached, unconsumed messages are moved to a dead letter topic.
7 After the acknowledgment timeout period, the client requests the broker to redeliver unacknowledged messages.

Default configuration for StreamX Processing Service producer

The default configuration:

mp.messaging.outgoing.[channel-name].accessMode=Shared (1)
mp.messaging.outgoing.[channel-name].producerName=${quasar.application.instance-id} (2)
1 The Processing Service producer access mode must be set to Shared to allow multiple producers to write to the same topic.
2 The producer name should be unique; otherwise, the producer cannot be created.

Default configuration for StreamX Delivery Service consumer

The default configuration:

mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest (1)
mp.messaging.incoming.[channel-name].subscriptionMode=Durable (2)
mp.messaging.incoming.[channel-name].subscriptionType=Exclusive (3)
mp.messaging.incoming.[channel-name].subscriptionName=${quasar.application.instance-id} (4)
mp.messaging.incoming.[channel-name].readCompacted=true (5)
mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000 (6)
mp.messaging.incoming.[channel-name].poolMessages=true (7)
1 When a new subscription is created, it will read all available messages from the topic.
2 Durable mode keeps the cursor state after Pulsar broker restart and deletes from the topic only acknowledged messages.
3 Each Delivery Service replica must read all the outboxes topic messages to generate complete experiences from the full data set. The subscription must be exclusive because compacted topics can only be read by subscriptions with a single active consumer.
4 Each exclusive subscription name must be unique.
5 Each Delivery Service replica must read all messages from the outboxes topic. With readCompacted enabled, a consumer reads from a compacted topic rather than the full message backlog. It sees only the latest value for each key up to the compaction point; beyond that point all messages are received.
6 After the acknowledgment timeout, the client requests redelivery of unacknowledged messages from the broker.
7 Enables a memory-efficient mechanism to reuse message objects (see technical details).

Default configuration for Quasar store consumer

The default configuration:

mp.messaging.incoming.[channel-name].subscriptionInitialPosition=Earliest (1)
mp.messaging.incoming.[channel-name].subscriptionMode=Durable (2)
mp.messaging.incoming.[channel-name].subscriptionType=Exclusive (3)
mp.messaging.incoming.[channel-name].subscriptionName=${quasar.application.instance-id} (4)
mp.messaging.incoming.[channel-name].readCompacted=true (5)
mp.messaging.incoming.[channel-name].ackTimeoutMillis=60000 (6)
mp.messaging.incoming.[channel-name].poolMessages=true (7)
1 When a new subscription is created, it will read all available messages from the topic.
2 Durable mode keeps the cursor state after a Pulsar broker restart and ensures that only acknowledged messages are deleted from the topic.
3 Each service replica that uses the store must read all messages from the stores topic to reconstruct the complete service state. Since the topics are compacted, subscriptions must be exclusive to ensure only one active consumer per topic.
4 Each exclusive subscription name must be unique.
5 Each service replica must read all messages from the stores topic. With readCompacted enabled, a consumer reads from a compacted topic instead of the full message backlog. It sees the latest value for each key up to the compaction point; beyond that point all messages are received.
6 After the acknowledgment timeout, the client requests the broker to redeliver unacknowledged messages.
7 Enables memory-efficient reuse of message objects (see technical details).

Default configuration for Quasar store producer

The default configuration:

mp.messaging.outgoing.[channel-name].accessMode=Shared (1)
mp.messaging.outgoing.[channel-name].producerName=${quasar.application.instance-id} (2)
1 The store producer access mode must be set to Shared to allow multiple producers to write to the same topic - multiple replicas of a service write to the same store topic.
2 The producer name should be unique; otherwise, the producer cannot be created.