Ingest data from Solace
You can ingest data from Solace’s PubSub+ Platform, a powerful event-driven streaming solution designed for real-time enterprises.
It facilitates the design, deployment, integration, and management of event-driven architectures (EDAs) across hybrid, multi-cloud, and IoT environments. It enables seamless data exchange across legacy systems, SaaS applications, messaging services, databases, and AI agents, connecting them to a real-time event-driven layer.
Set up Solace
To set up Solace PubSub+ event broker, you can either choose the free Software version using Docker or Solace PubSub+ Cloud.
Scenario
Consider this scenario: automating the process of notifying passengers that “online check-in is open” exactly 48 hours before their flight departure. Airlines need to handle continuous streams of flight and passenger data to send timely “Check-in Open” alerts to passengers who have opted in. The process begins 72 hours before departure, as flight and passenger data enter the system. Then, at 48 hours before departure, a notification is triggered for eligible passengers.
The solution involves two key steps:
- Event Stream Processing: Continuous streams of flight and passenger data are received from the Departure Control System (DCS) via Solace. Each flight is tracked by a unique identifier, and each passenger by a unique Passenger Reference Number (PRN), enabling real-time processing in RisingWave.
- Notification Logic: Notifications are sent only to passengers who have opted in.
Below is the sample data of flight and passenger details. Solace topic: passenger_full_details
Ingest data from Solace into RisingWave
Create a RisingWave cluster in RisingWave Cloud using the free plan. See the documentation of RisingWave Cloud for instructions.
Solace PubSub+ supports popular open protocols like AMQP, JMS, MQTT, REST, and WebSocket, and open APIs such as Paho and Qpid to enable interaction with the event broker. We will use the RisingWave MQTT connector to read and write data from Solace.
Once the RisingWave cluster is set up, navigate to the Workspace and connect to data streams by creating tables, materialized views, and sinks using SQL statements.
Step 1: Create source table
This query creates a table named combined_passenger_flight_data
to store detailed passenger and flight information. The data is sourced from the Solace topic passenger_full_details
, connected through the Solace broker, with the Quality of Service (QoS) level set to at least once and formatted as plain JSON.
Step 2: Use materialized view to filter
This query creates a materialized view named checkin_open_notification
that selects flight and passenger information for those who opted in and have flights departing within 48 to 72 hours from the current time.
Step 3: Query the materialized view
The materialized view can be queried to retrieve the latest data from the source:
The table chart lists passengers who opted in for notifications and have flights departing soon, showing flight_id
, passenger_ref_number
, flight_number
, carrier_code
, departure_time
, and contact_info
. It highlights passengers with flights departing before 48 from now, indicating that check-in
is open.
Step 4: Create a sink to send notifications
This query creates a sink named checkin_notifications_sink
, which streams data from the checkin_open_notification
view to the Solace topic checkin_open_notification
. The connection to the Solace server is established with at-least-once Quality of Service (QoS), and the data is formatted as plain JSON. The online check-in notification system then retrieves this information from the Solace topic to send notifications to the passengers.
We have successfully created a source table to read data from the Solace with an MQTT source connector, built a materialized view (MV) for querying and performing real-time analytics on the data, and set up a sink to send processed data to a Solace topic using the MQTT sink connector for downstream systems to utilize.