top of page

Building a Retrieval-Augmented Generation Pipeline for Quarterly Earnings Reports

  • Writer: Stephen Dawkins
    Stephen Dawkins
  • Mar 31
  • 5 min read

Overview

In today’s data-driven finance landscape, extracting actionable insights from quarterly earnings reports is a challenging but essential task. Our solution leverages a fully automated pipeline to download SEC filings, clean and normalize HTML content (including complex tables), vectorize the text using a SentenceTransformer model, and then use a vector database (Pinecone) together with an OpenAI language model to answer natural language queries. This pipeline is implemented in Python and is structured into modular components for easy maintenance and deployment. You can find the source code here



1. Data Retrieval and File Extraction


Downloading SEC Filings

The process begins by pulling SEC filings for a given company using its CIK. The function below (from file_extract.py) retrieves the SEC submissions JSON and filters for quarterly filings (10-Q):

def get_sec_filings_by_company(cik):

	url = f"https://data.sec.gov/submissions/CIK{cik}.json"

	response = requests.get(url, headers=headers)

    if response.status_code != 200:

        print("Failed to fetch data from SEC EDGAR.")

        exit()

    data = response.json()

    recent_filings = data.get("filings", {}).get("recent", {})

    forms = recent_filings.get("form", [])

    filing_dates = recent_filings.get("filingDate", [])

    accession_numbers = recent_filings.get("accessionNumber", [])

    primary_document = recent_filings.get("primaryDocument")

    earnings_reports = []

    cik_int = str(int(cik))

    for form, filing_date, accession in zip(forms, filing_dates, accession_numbers):

        if form in ["10-Q"]:

            acc_no_no_dashes = accession.replace("-", "")

            filing_url = f"https://www.sec.gov/Archives/edgar/data/{cik_int}/{acc_no_no_dashes}/{accession}.txt"

            earnings_reports.append({

                "form": form,

                "filing_date": filing_date,

                "accession_number": accession,

                "filing_url": filing_url

            })

    return earnings_reports

    url = f"https://data.sec.gov/submissions/CIK{cik}.json"

    response = requests.get(url, headers=headers)

    if response.status_code != 200:

        print("Failed to fetch data from SEC EDGAR.")

        exit()

    data = response.json()

    recent_filings = data.get("filings", {}).get("recent", {})

    forms = recent_filings.get("form", [])

    filing_dates = recent_filings.get("filingDate", [])

    accession_numbers = recent_filings.get("accessionNumber", [])

    primary_document = recent_filings.get("primaryDocument")

    earnings_reports = []

    cik_int = str(int(cik))

    for form, filing_date, accession in zip(forms, filing_dates, accession_numbers):

        if form in ["10-Q"]:

            acc_no_no_dashes = accession.replace("-", "")

            filing_url = f"https://www.sec.gov/Archives/edgar/data/{cik_int}/{acc_no_no_dashes}/{accession}.txt"

            earnings_reports.append({

                "form": form,

                "filing_date": filing_date,

                "accession_number": accession,

                "filing_url": filing_url

            })

    return earnings_reports

This function ensures that we work only with the most recent quarterly filing.


2. Data Normalization and HTML Cleaning


Cleaning HTML While Preserving Tables

SEC filings come as HTML files full of extraneous markup. Our module normalize_file.py defines a function to clean these files. It uses BeautifulSoup to remove scripts, styles, comments, and unwanted attributes, while converting tables into a readable text format.

def clean_file_with_soup(input_filepath, output_filepath=None):

    if output_filepath is None:

        base, ext = os.path.splitext(input_filepath)

        output_filepath = f"{base}_cleaned.txt"

    with open(input_filepath, 'r', encoding='utf-8') as infile:

        html_content = infile.read()

    soup = BeautifulSoup(html_content, 'lxml')

    for tag in soup(["script", "style", "noscript"]):

        tag.decompose()

    for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):

        comment.extract()

    for tag in soup.find_all():

        for attribute in ["style", "class", "id"]:

            if tag.has_attr(attribute):

                del tag[attribute]

    for table in soup.find_all("table"):

        table_text = []

        for row in table.find_all("tr"):

            row_text = []

            for cell in row.find_all(["th", "td"]):

                cell_text = cell.get_text(separator=" ", strip=True)

                row_text.append(cell_text)

            if row_text:

                table_text.append(" | ".join(row_text))

        table.replace_with("\n".join(table_text))

    text = soup.get_text(separator=" ")

    cleaned_text = re.sub(r'\s+', ' ', text).strip()

    with open(output_filepath, 'w', encoding='utf-8') as outfile:

        outfile.write(cleaned_text)

    print(f"Cleaned text written to: {output_filepath}")


3. Text Chunking and Vectorization


Converting Cleaned Text into Vectors

After cleaning, the next step is to split the text into manageable chunks and vectorize them. In vectorize.py, the function process_file reads a cleaned text file, chunks it into 300-word segments, vectorizes each chunk using a SentenceTransformer model, and writes the results in JSON Lines format:

def process_file(filepath, output_file, max_words=300):

    model = SentenceTransformer('all-MiniLM-L6-v2')

    chunk_index = 0

    current_words = []

    with open(filepath, 'r', encoding='utf-8') as infile, open(output_file, 'a', encoding='utf-8') as outfile:

        for line in infile:

            words = line.strip().split()

            for word in words:

                current_words.append(word)

                if len(current_words) >= max_words:

                    chunk_text = " ".join(current_words)

                    embedding = model.encode([chunk_text])[0].tolist()

                    data = {

                        "chunk_index": chunk_index,

                        "chunk_text": chunk_text,

                        "embedding": embedding

                    }

                    outfile.write(json.dumps(data) + "\n")

                    chunk_index += 1

                    current_words = []

        if current_words:

            chunk_text = " ".join(current_words)

            embedding = model.encode([chunk_text])[0].tolist()

            data = {

                "chunk_index": chunk_index,

                "chunk_text": chunk_text,

                "embedding": embedding

            }

            outfile.write(json.dumps(data) + "\n")

            chunk_index += 1

    print(f"Processing complete. {chunk_index} chunks written to {output_file}")

This module creates a semantic representation of the filing that is ready for indexing.


4. Storing and Querying Vectors in Pinecone


Upserting and Querying the Vector Database

Our pinecone_util.py module handles index creation, upsertion of vectors, and querying. It first checks if an index exists, creates one if necessary, and then upserts vector data from our processed file:

def create_index(index_name):

    pc = Pinecone(api_key=my_api_key)

    embedding_dimension = 384

    if index_name not in pc.list_indexes():

        pc.create_index(index_name, dimension=embedding_dimension, 

                        metric="cosine", 

                        spec=ServerlessSpec(

                        cloud="aws",

                        region=config.PINECONE_ENVIRONMENT

                    ) )

        print(f"Index '{index_name}' created.")

    else:

        print(f"Index '{index_name}' already exists.")def create_index(index_name):

For upsertion:

def upsert_to_vector(index_name, filepath):

    pc = Pinecone(api_key=my_api_key)

    index = pc.Index(index_name)

    while not pc.describe_index(index_name).status['ready']:

        time.sleep(1)

    print(f"Index: {index_name} up and ready")

    vectors = []

    with open(filepath, "r", encoding="utf-8") as f:

        for line in f:

            record = json.loads(line)

            vector_id = str(record["chunk_index"])

            vector = record["embedding"]

            metadata = {"chunk_text": record["chunk_text"]}

            vectors.append((vector_id, vector, metadata))

    batch_size = 100

    for i in range(0, len(vectors), batch_size):

        batch = vectors[i:i + batch_size]

        index.upsert(vectors=batch)

        print(f"Upserted batch {i // batch_size + 1} of {((len(vectors) - 1) // batch_size) + 1}")

    print("All embeddings upserted to Pinecone.")

And for querying:

def query_index(index_name, query):

    pc = Pinecone(api_key=my_api_key)

    index = pc.Index(index_name)

    model = SentenceTransformer('all-MiniLM-L6-v2')

    while not pc.describe_index(index_name).status['ready']:

        time.sleep(1)

    print(f"Index: {index_name} up and ready")

    query_embedding = model.encode([query])[0].tolist()

    results = index.query(vector=query_embedding, top_k=3, include_values=False, include_metadata=True)

    return results

5. Generating Answers with OpenAI

Retrieval-Augmented Generation

The openai_util.py module demonstrates how to combine retrieved chunks with a user query to generate a comprehensive answer using OpenAI’s language model. The function constructs a prompt by merging the context extracted from Pinecone with the query:

def generate_comprehensive_answer(query, pinecone_results, api_key):

    context_parts = []

    for match in pinecone_results["matches"]:

        chunk_text = match["metadata"].get("chunk_text", "")

        context_parts.append(chunk_text)

    context = "\n".join(context_parts)

    prompt = (

        "You are an expert financial analyst. Based on the following information from earnings reports:\n\n"

        f"{context}\n\n"

        "Please provide a comprehensive answer to the following question:\n"

        f"{query}"

    )

    response = client.responses.create(

        model="gpt-4o-mini",

        input=[

            {"role": "system", "content": "You are a knowledgeable financial analyst."},

            {"role": "user", "content": prompt}

        ]

    )

    return response.output_text

This function encapsulates the retrieval-augmented generation (RAG) approach, ensuring that the final answer is informed by the most relevant sections of the report.

6. Orchestration via Command-Line Interface

Finally, the main.py file ties all components together. It allows the user to either load a report (which downloads, cleans, vectorizes, and upserts data) or to query the system for answers:

def load_report(cik):

    earnings_reports = get_sec_filings_by_company(cik)

    print(f"Reports pulled for {cik}:")

    report = earnings_reports[0]

    print(f"Form: {report['form']}, Filing Date: {report['filing_date']}, Accession: {report['accession_number']}")

    print(f"URL: {report['filing_url']}\n")

    filename = f"{report['form']}-{report['filing_date']}-{report['accession_number']}.txt"

    download_document(report['filing_url'], f'data/raw/{filename}')

    print(f'Extract text from HTML...')

    clean_file_with_soup(f'data/raw/{filename}', f'data/cleaned/{filename}')

    print(f'Embedding text into vectors...')

    process_file(f'data/cleaned/{filename}', f'data/processed/{filename}')

    create_index(index)

    print(f'Loading vectors into Pinecone')

    upsert_to_vector(index, f'data/processed/{filename}')

    check_index(index)



def query_report(query):

    pinecone_retrieval_results = query_index(index, query)

    answer = generate_comprehensive_answer(query, pinecone_retrieval_results, config.OPENAI_API_KEY)

    print("Comprehensive Answer:")

    print(answer)



def main():

    parser = argparse.ArgumentParser(description="Earnings Report Processing Pipeline")

    parser.add_argument("action", choices=["load-report", "query"], help="Action to perform: load-report or query")

    parser.add_argument("value", help="For 'load-report', provide the CIK number; for 'query', provide the query text.")

    args = parser.parse_args()

    if args.action == "load-report":

        load_report(args.value)

    elif args.action == "query":

        query_report(args.value)



if __name__ == "__main__":

    main()

Users can run the pipeline from the command line:

  • To load a report example:

python main.py load-report 0000019617 to pulll the CIK for JPMorgan
  • To query the system example:

python main.py query "What are the financial highlights from Q1 2024?"

Conclusion

The code in this repository demonstrates an end-to-end retrieval-augmented generation pipeline specifically designed for quarterly earnings reports. By integrating SEC filing extraction, HTML cleaning (with special handling for tables), text chunking, vectorization using SentenceTransformer, vector storage in Pinecone, and natural language answer generation via OpenAI, we have created a robust tool that transforms unstructured financial data into actionable insights. This case study exemplifies our expertise in modern data engineering and NLP, enabling financial professionals to quickly access and understand critical information.

Recent Posts

See All
bottom of page