Building a Retrieval-Augmented Generation Pipeline for Quarterly Earnings Reports
- Stephen Dawkins
- Mar 31
- 5 min read
Overview
In today’s data-driven finance landscape, extracting actionable insights from quarterly earnings reports is a challenging but essential task. Our solution leverages a fully automated pipeline to download SEC filings, clean and normalize HTML content (including complex tables), vectorize the text using a SentenceTransformer model, and then use a vector database (Pinecone) together with an OpenAI language model to answer natural language queries. This pipeline is implemented in Python and is structured into modular components for easy maintenance and deployment. You can find the source code here
1. Data Retrieval and File Extraction
Downloading SEC Filings
The process begins by pulling SEC filings for a given company using its CIK. The function below (from file_extract.py) retrieves the SEC submissions JSON and filters for quarterly filings (10-Q):
def get_sec_filings_by_company(cik):
url = f"https://data.sec.gov/submissions/CIK{cik}.json"
response = requests.get(url, headers=headers)
if response.status_code != 200:
print("Failed to fetch data from SEC EDGAR.")
exit()
data = response.json()
recent_filings = data.get("filings", {}).get("recent", {})
forms = recent_filings.get("form", [])
filing_dates = recent_filings.get("filingDate", [])
accession_numbers = recent_filings.get("accessionNumber", [])
primary_document = recent_filings.get("primaryDocument")
earnings_reports = []
cik_int = str(int(cik))
for form, filing_date, accession in zip(forms, filing_dates, accession_numbers):
if form in ["10-Q"]:
acc_no_no_dashes = accession.replace("-", "")
filing_url = f"https://www.sec.gov/Archives/edgar/data/{cik_int}/{acc_no_no_dashes}/{accession}.txt"
earnings_reports.append({
"form": form,
"filing_date": filing_date,
"accession_number": accession,
"filing_url": filing_url
})
return earnings_reports
url = f"https://data.sec.gov/submissions/CIK{cik}.json"
response = requests.get(url, headers=headers)
if response.status_code != 200:
print("Failed to fetch data from SEC EDGAR.")
exit()
data = response.json()
recent_filings = data.get("filings", {}).get("recent", {})
forms = recent_filings.get("form", [])
filing_dates = recent_filings.get("filingDate", [])
accession_numbers = recent_filings.get("accessionNumber", [])
primary_document = recent_filings.get("primaryDocument")
earnings_reports = []
cik_int = str(int(cik))
for form, filing_date, accession in zip(forms, filing_dates, accession_numbers):
if form in ["10-Q"]:
acc_no_no_dashes = accession.replace("-", "")
filing_url = f"https://www.sec.gov/Archives/edgar/data/{cik_int}/{acc_no_no_dashes}/{accession}.txt"
earnings_reports.append({
"form": form,
"filing_date": filing_date,
"accession_number": accession,
"filing_url": filing_url
})
return earnings_reports
This function ensures that we work only with the most recent quarterly filing.
2. Data Normalization and HTML Cleaning
Cleaning HTML While Preserving Tables
SEC filings come as HTML files full of extraneous markup. Our module normalize_file.py defines a function to clean these files. It uses BeautifulSoup to remove scripts, styles, comments, and unwanted attributes, while converting tables into a readable text format.
def clean_file_with_soup(input_filepath, output_filepath=None):
if output_filepath is None:
base, ext = os.path.splitext(input_filepath)
output_filepath = f"{base}_cleaned.txt"
with open(input_filepath, 'r', encoding='utf-8') as infile:
html_content = infile.read()
soup = BeautifulSoup(html_content, 'lxml')
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
for tag in soup.find_all():
for attribute in ["style", "class", "id"]:
if tag.has_attr(attribute):
del tag[attribute]
for table in soup.find_all("table"):
table_text = []
for row in table.find_all("tr"):
row_text = []
for cell in row.find_all(["th", "td"]):
cell_text = cell.get_text(separator=" ", strip=True)
row_text.append(cell_text)
if row_text:
table_text.append(" | ".join(row_text))
table.replace_with("\n".join(table_text))
text = soup.get_text(separator=" ")
cleaned_text = re.sub(r'\s+', ' ', text).strip()
with open(output_filepath, 'w', encoding='utf-8') as outfile:
outfile.write(cleaned_text)
print(f"Cleaned text written to: {output_filepath}")
3. Text Chunking and Vectorization
Converting Cleaned Text into Vectors
After cleaning, the next step is to split the text into manageable chunks and vectorize them. In vectorize.py, the function process_file reads a cleaned text file, chunks it into 300-word segments, vectorizes each chunk using a SentenceTransformer model, and writes the results in JSON Lines format:
def process_file(filepath, output_file, max_words=300):
model = SentenceTransformer('all-MiniLM-L6-v2')
chunk_index = 0
current_words = []
with open(filepath, 'r', encoding='utf-8') as infile, open(output_file, 'a', encoding='utf-8') as outfile:
for line in infile:
words = line.strip().split()
for word in words:
current_words.append(word)
if len(current_words) >= max_words:
chunk_text = " ".join(current_words)
embedding = model.encode([chunk_text])[0].tolist()
data = {
"chunk_index": chunk_index,
"chunk_text": chunk_text,
"embedding": embedding
}
outfile.write(json.dumps(data) + "\n")
chunk_index += 1
current_words = []
if current_words:
chunk_text = " ".join(current_words)
embedding = model.encode([chunk_text])[0].tolist()
data = {
"chunk_index": chunk_index,
"chunk_text": chunk_text,
"embedding": embedding
}
outfile.write(json.dumps(data) + "\n")
chunk_index += 1
print(f"Processing complete. {chunk_index} chunks written to {output_file}")
This module creates a semantic representation of the filing that is ready for indexing.
4. Storing and Querying Vectors in Pinecone
Upserting and Querying the Vector Database
Our pinecone_util.py module handles index creation, upsertion of vectors, and querying. It first checks if an index exists, creates one if necessary, and then upserts vector data from our processed file:
def create_index(index_name):
pc = Pinecone(api_key=my_api_key)
embedding_dimension = 384
if index_name not in pc.list_indexes():
pc.create_index(index_name, dimension=embedding_dimension,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region=config.PINECONE_ENVIRONMENT
) )
print(f"Index '{index_name}' created.")
else:
print(f"Index '{index_name}' already exists.")def create_index(index_name):
For upsertion:
def upsert_to_vector(index_name, filepath):
pc = Pinecone(api_key=my_api_key)
index = pc.Index(index_name)
while not pc.describe_index(index_name).status['ready']:
time.sleep(1)
print(f"Index: {index_name} up and ready")
vectors = []
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
record = json.loads(line)
vector_id = str(record["chunk_index"])
vector = record["embedding"]
metadata = {"chunk_text": record["chunk_text"]}
vectors.append((vector_id, vector, metadata))
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch)
print(f"Upserted batch {i // batch_size + 1} of {((len(vectors) - 1) // batch_size) + 1}")
print("All embeddings upserted to Pinecone.")
And for querying:
def query_index(index_name, query):
pc = Pinecone(api_key=my_api_key)
index = pc.Index(index_name)
model = SentenceTransformer('all-MiniLM-L6-v2')
while not pc.describe_index(index_name).status['ready']:
time.sleep(1)
print(f"Index: {index_name} up and ready")
query_embedding = model.encode([query])[0].tolist()
results = index.query(vector=query_embedding, top_k=3, include_values=False, include_metadata=True)
return results
5. Generating Answers with OpenAI
Retrieval-Augmented Generation
The openai_util.py module demonstrates how to combine retrieved chunks with a user query to generate a comprehensive answer using OpenAI’s language model. The function constructs a prompt by merging the context extracted from Pinecone with the query:
def generate_comprehensive_answer(query, pinecone_results, api_key):
context_parts = []
for match in pinecone_results["matches"]:
chunk_text = match["metadata"].get("chunk_text", "")
context_parts.append(chunk_text)
context = "\n".join(context_parts)
prompt = (
"You are an expert financial analyst. Based on the following information from earnings reports:\n\n"
f"{context}\n\n"
"Please provide a comprehensive answer to the following question:\n"
f"{query}"
)
response = client.responses.create(
model="gpt-4o-mini",
input=[
{"role": "system", "content": "You are a knowledgeable financial analyst."},
{"role": "user", "content": prompt}
]
)
return response.output_text
This function encapsulates the retrieval-augmented generation (RAG) approach, ensuring that the final answer is informed by the most relevant sections of the report.
6. Orchestration via Command-Line Interface
Finally, the main.py file ties all components together. It allows the user to either load a report (which downloads, cleans, vectorizes, and upserts data) or to query the system for answers:
def load_report(cik):
earnings_reports = get_sec_filings_by_company(cik)
print(f"Reports pulled for {cik}:")
report = earnings_reports[0]
print(f"Form: {report['form']}, Filing Date: {report['filing_date']}, Accession: {report['accession_number']}")
print(f"URL: {report['filing_url']}\n")
filename = f"{report['form']}-{report['filing_date']}-{report['accession_number']}.txt"
download_document(report['filing_url'], f'data/raw/{filename}')
print(f'Extract text from HTML...')
clean_file_with_soup(f'data/raw/{filename}', f'data/cleaned/{filename}')
print(f'Embedding text into vectors...')
process_file(f'data/cleaned/{filename}', f'data/processed/{filename}')
create_index(index)
print(f'Loading vectors into Pinecone')
upsert_to_vector(index, f'data/processed/{filename}')
check_index(index)
def query_report(query):
pinecone_retrieval_results = query_index(index, query)
answer = generate_comprehensive_answer(query, pinecone_retrieval_results, config.OPENAI_API_KEY)
print("Comprehensive Answer:")
print(answer)
def main():
parser = argparse.ArgumentParser(description="Earnings Report Processing Pipeline")
parser.add_argument("action", choices=["load-report", "query"], help="Action to perform: load-report or query")
parser.add_argument("value", help="For 'load-report', provide the CIK number; for 'query', provide the query text.")
args = parser.parse_args()
if args.action == "load-report":
load_report(args.value)
elif args.action == "query":
query_report(args.value)
if __name__ == "__main__":
main()
Users can run the pipeline from the command line:
To load a report example:
python main.py load-report 0000019617 to pulll the CIK for JPMorgan
To query the system example:
python main.py query "What are the financial highlights from Q1 2024?"
Conclusion
The code in this repository demonstrates an end-to-end retrieval-augmented generation pipeline specifically designed for quarterly earnings reports. By integrating SEC filing extraction, HTML cleaning (with special handling for tables), text chunking, vectorization using SentenceTransformer, vector storage in Pinecone, and natural language answer generation via OpenAI, we have created a robust tool that transforms unstructured financial data into actionable insights. This case study exemplifies our expertise in modern data engineering and NLP, enabling financial professionals to quickly access and understand critical information.