Building a Flight Price Monitoring and Analysis System Using NiFi, Kafka, Flink, and PostgreSQL
- Stephen Dawkins
- Jan 15
- 3 min read
Updated: Mar 31
Introduction
With the rising demand for real-time flight price monitoring and analysis, this case study showcases how we built a scalable data pipeline using Apache NiFi, Apache Kafka, Apache Flink, and PostgreSQL. The goal was to pull flight prices from the Amadeus API for flights from Amsterdam (AMS) to every European capital, transform the data, and store it in a PostgreSQL database for further analysis.
Tools and Technologies Used
Apache NiFi: For orchestrating API calls and handling data ingestion.
Apache Kafka: As a messaging broker to decouple data ingestion and processing.
Apache Flink: For real-time data processing and transformation.
PostgreSQL: To store the transformed data for querying and analysis.
Amadeus API: As the data source for flight offers.
System Architecture
NiFi orchestrates the entire data flow:
Pulls flight offers from the Amadeus API for specific routes.
Splits the JSON response into individual flight offers.
Sends each flight offer as a message to a Kafka topic.
Kafka acts as a buffer and message broker.
Flink consumes messages from Kafka, processes the flight offers, and transforms them by extracting key fields.
PostgreSQL stores the transformed data for further analysis and reporting.
Step 1: Data Ingestion with NiFi
We configured NiFi to make periodic API calls to the Amadeus API and send the flight offers to Kafka.
NiFi Processors
InvokeHTTP:
Makes a POST request to the Amadeus API to fetch flight offers.
Sample cURL for testing the API:
curl -X POST "https://api.amadeus.com/v2/shopping/flight-offers" \ -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "originDestinations": [ { "id": "1", "originLocationCode": "AMS", "destinationLocationCode": "CDG", "departureDateTimeRange": { "date": "2025-01-30" } } ], "travelers": [ { "id": "1", "travelerType": "ADULT" } ], "sources": ["GDS"], "searchCriteria": { "maxFlightOffers": 10 } }'
SplitJson:
Splits the API response into individual flight offers.
ReplaceText:
Formats the split JSON fragments by adding missing fields or adjusting the structure.
PublishKafka:
Publishes each flight offer as a message to a Kafka topic.
Step 2: Real-Time Processing with Flink
Apache Flink consumes the messages from Kafka, processes the flight offers, and transforms them by extracting key fields like price, duration, departure time, and IATA codes.
Flink Code Snippet
DataStream<FlightOfferRecord> flattenedStream = stream
.flatMap((String json, Collector<FlightOfferRecord> out) -> {
try {
JsonNode offer = objectMapper.readTree(json);
String id = offer.get("id").asText();
String lastTicketingDate = offer.get("lastTicketingDate").asText();
int numberOfBookableSeats = offer.get("numberOfBookableSeats").asInt();
String totalPrice = offer.get("price").get("total").asText();
String currency = offer.get("price").get("currency").asText();
String departureIataCode = offer.get("itineraries").get(0).get("segments").get(0).get("departure").get("iataCode").asText();
String arrivalIataCode = offer.get("itineraries").get(0).get("segments").get(0).get("arrival").get("iataCode").asText();
FlightOfferRecord record = new FlightOfferRecord(id, lastTicketingDate, numberOfBookableSeats, totalPrice, currency, departureIataCode, arrivalIataCode, LocalDateTime.now());
out.collect(record);
} catch (Exception e) {
System.err.println("Failed to process message: " + json + ", error: " + e.getMessage());
}
})
.returns(org.apache.flink.api.common.typeinfo.TypeInformation.of(FlightOfferRecord.class));
Step 3: Storing Data in PostgreSQL
We used Flink’s JdbcSink to insert the processed flight offers into a PostgreSQL database.
PostgreSQL Table Schema
CREATE TABLE flight_offers (
id TEXT,
last_ticketing_date DATE,
number_of_bookable_seats INT,
total_price NUMERIC,
currency TEXT,
departure_iata_code TEXT,
arrival_iata_code TEXT,
insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Flink Sink Configuration
flattenedStream.addSink(JdbcSink.sink(
"INSERT INTO flight_offers (id, last_ticketing_date, number_of_bookable_seats, total_price, currency, departure_iata_code, arrival_iata_code, insert_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(ps, record) -> {
ps.setString(1, record.getId());
ps.setDate(2, java.sql.Date.valueOf(record.getLastTicketingDate()));
ps.setInt(3, record.getNumberOfBookableSeats());
ps.setBigDecimal(4, new BigDecimal(record.getTotalPrice()));
ps.setString(5, record.getCurrency());
ps.setString(6, record.getDepartureIataCode());
ps.setString(7, record.getArrivalIataCode());
ps.setTimestamp(8, Timestamp.valueOf(record.getInsertTime()));
},
JdbcExecutionOptions.builder().withBatchSize(1000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/flight_data")
.withDriverName("org.postgresql.Driver")
.withUsername("postgres")
.withPassword("your_password")
.build()
));
Step 4: Querying the Data
Once the data was stored in PostgreSQL, we could run SQL queries to analyze price trends over time.
Sample Query
SELECT departure_iata_code, arrival_iata_code, AVG(total_price) AS avg_price
FROM flight_offers
WHERE departure_iata_code = 'AMS'
AND arrival_iata_code = 'CDG'
GROUP BY departure_iata_code, arrival_iata_code;
Challenges and Solutions
1. Handling Missing IATA Codes
Challenge: Some airlines do not have IATA codes, only ICAO codes.
Solution: We added logic in the Flink job to fall back to the ICAO code if the IATA code was missing.
2. Discrepancy in Prices
Challenge: We noticed that the total price in the Amadeus API response didn’t always match prices on Skyscanner.
Solution: We computed the difference between the base price and total price and labeled it as unitemizedTaxesAndFees.
Conclusion
This case study demonstrates how to build a robust, real-time flight price monitoring system using modern data engineering tools. By leveraging NiFi for orchestration, Kafka for decoupling, Flink for real-time processing, and PostgreSQL for storage, we created a scalable pipeline capable of handling dynamic pricing data efficiently.