I Built an AI Agent that Monitors Targeted Government Bids While I Sleep

Meet Scout The Bid/Tender Hunter with Langgraph That Operates While I Sleep.

Ever since I bootstrapped my start-up, one of my secret weapons for funding has been finding small-scale software-related government bids.

imagine closing a 300k USD project within only 3 months with less than 5% cost and 95% profit.

We do this by monitoring and finding bids that meet our following requirements:

1- Software implementation-related bids only (We have an ERP open source we can easily customize to almost most management system-related contracts).

2- Should be in our preferred list of countries,

3- Have bidder requirements that we can easily meet.

4- Bid posted now, hours ago, or a day ago. (to make sure we find bids faster than our competitors)

To do this I assigned one of my colleagues to go to bid listings from different websites every 2 – 3 days and find the ones that met our specific requirements.

 

Problem?
Lack of Consistency: We quickly realized that keeping up a strict two-day schedule was way harder than expected.

We often ended up either finding bids just a few days before the deadline or too late, missing out on opportunities.

Booooooooooring: The process of going through dozens of bids to find the ones you are interested in is the most boring thing a human can ever do.

So I thought, why not switch up to my developer side and build an AI agent?

 

After 5 days, I built the THIS:

 

Meet Scout The Bid/Tender Hunter

Scout wakes up once every day and takes less than a minute to go through hundreds of bids by scraping specific websites of our choice, analyzing them all, deciding which ones qualify to match our company’s interests, and then notifying our team on Telegram along with key information for each bid—such as the bid title, a summary of the requirements, the posted date, the deadline, and the bid link.

As I’m writing this, I received a notification from Scout about a new bid.

You can’t believe the euphoric joy and the goosebumps I’m feeling right now.

Curser and Langgraph helped me focus more on the system’s vision and less on the architectural design and coding.

In this article, I want to walk you through an overview of the main components of this agentic system, including its architecture, tools, LLM, prompts, and memory, and then go more technical by showing the step-by-step code I used for this and even share the code at the end of this article.

 

Roles, Responsibilities, and Communication Flow

  1. The Scraper: Scrapes bids from different sources, combines them into one list, and sends it to the Bid Analyst
  2. Bid Analyst: Analyzes the list received from The scraper, extracts the ones that match the user’s(us) requirements, and then sends the qualified list of bids to the notifier.
  3. Telegram notifier: Sends the qualified bids to the user via The Telegram Channel.

The architecture is based on sequential processing, meaning the output of the Scraper is the input of the Bid Analyst, and the output of the Bid Analyst is the input of the Telegram Notifier.

 

Key Components of the System

  1. The Tools: The system utilizes scraping tools to gather bid listings from different websites and a Telegram tool to notify the user about new bid opportunities.
  2. The LLM: The system uses an LLM (GPT-4o) to analyze batches of the scraped bids, decide which ones match the user’s responses, and output the list of bids that meet the user’s requirements.
  3. Decision Point: The system utilizes a decision point (conditional edge) to determine whether the Bid Analyst needs to continue analyzing or move to the Telegram Notifier.
  4. Context Management (Memory): The system uses SQLite to facilitate communication between the Scraper, Bid Analyst, and Telegram Notifier.

 

The following figure showcases how SQLlite facilitates Context Management.

Now let’s take a look at each role’s responsibilities in detail.

The Scraper (Tool)

The node’s responsibilities are:

  • Running multiple scraping tools.
  • Store the results of each scraping tool in the SQLite DB.
  • Before storing, make sure each bid has not been stored before.

Note:

The reason for running multiple scraping tools is mainly because some bid websites offer APIs while others do not.

For those that don’t offer APIs, JavaScript is used to load the list of bids, which is difficult for general scraping frameworks like Firecrawl or Jina to handle.

So, we ended up developing a different scraping tool for each website.

Bid Analyst (LLM)

This node’s responsibilities are:

  • Retrieve 10 bids from the SQL DB that were stored by the Scraper.
  • Invoke GPT-4o and feed the list of 10 bids along with instructions to analyze and extract the bids that match our interests.
  • Once the LLM returns the list of qualified bids, update the DB to mark the bids as qualified.

 

Note:

The reason I made the Analyst retrieve only 10 bids at a time is that there are instances when the list of bids might reach hundreds or even thousands of JSON objects.

Passing all of that to an LLM along with a prompt will not overflow the context window but cause performance and accuracy issues.

So instead of passing it all at once, I thought, why not pass it in batches (in my case, 10 bids at a time) and keep processing each batch until all bids are processed?

This approach not only improves the accuracy of the LLM but also ensures our system remains scalable and efficient, even when dealing with hundreds of thousands of JSON objects.

Now, to implement this idea, I needed a mechanism to manage the flow of data between batches—a mechanism that directs the flow back to the Bid Analyst node if there are remaining batches to process and directs the flow to the next node in the pipeline if all batches have been processed.

This is where the Should_continue_filtering edge (function) comes in.

Should_continue_filtering:

After processing each batch of bids by the Bid Analyst, this function is triggered.

It essentially acts as a decision point in the workflow. To do this, it performs the following tasks:

  • Counts how many bids have not been processed.
  • If the result is more than 0, meaning there are still bids to be processed, it directs the flow back to the Analyst.
  • If the result is 0, it directs the flow to the Telegram Notifier.

Telegram Notifier

This node is responsible for the following tasks:

  • Retrieve a list of bids that have been marked as qualified.
  • Format each bid by including key information such as the bid title, organization, bid requirement summary, posted date, and closing date.
  • Send the bid to our Telegram channel.

Let me show you how it is all done through code.

Let's Get into The code

Step 1: Define a scraping tool for globaltenders.com

If you are not familiar with Langgraph, here are the key concepts you need to understand:

  • Nodes: Represent the roles we have just discussed. Meaning the Scraper, the Bid Analyst, and the Telegram Notifier are known in Langgraph as nodes.
  • Edges: Edges are responsible for the direct flow from one node to another. For example, I defined an edge to go from the Scraper node to the Bid Analyst node.
  • Conditional Edge: This is a function that acts as a decision point. It determines which node the flow should go to next, based on conditions we define inside it.

Note that this is a scraping tool for a random bid website, not the actual government websites we scrape from, as scraping those could get us into trouble.

For the sake of this example, let’s define a scraping tool to extract bids from this link: https://www.globaltenders.com/free-global-tenders/

Create a file named globalTendersScrapper.py, inside yourcurrentproject/tools and paste the following inside:

Note: I will be refrering to the bid as a tender in my code so bare with me.

				
					from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from typing import Dict, Type, Optional
from pydantic import BaseModel, Field
from langchain.tools import BaseTool
from langchain.callbacks.manager import CallbackManagerForToolRun
import logging
import time
from fake_useragent import UserAgent
from datetime import datetime


class GlobalTendersInput(BaseModel):
   """Input for GlobalTenders scraper."""
   url: str = Field(
       default="https://www.globaltenders.com/free-global-tenders/",
       description="URL to scrape. Defaults to GlobalTenders free tenders page"
   )
   wait_time: int = Field(
       default=10,
       description="Maximum time to wait for elements to load (seconds)"
   )


class GlobalTendersScraper(BaseTool):
   """Tool for scraping tender information from GlobalTenders."""
   name: str = "globaltenders_scraper"
   description: str = """Useful for scraping tender information from GlobalTenders website.
   Returns a list of tenders with their titles, organizations, dates, and URLs."""
   args_schema: Type[BaseModel] = GlobalTendersInput
  
   # Add chrome_options as a class field
   chrome_options: Options = Field(default_factory=Options)
  
   def __init__(self):
       # Initialize parent class first
       super().__init__()
      
       # Generate random user agent
       ua = UserAgent()
       user_agent = ua.random
      
       # Configure Chrome options
       self.chrome_options.add_argument('--headless')
       self.chrome_options.add_argument('--no-sandbox')
       self.chrome_options.add_argument('--disable-dev-shm-usage')
       self.chrome_options.add_argument('--disable-gpu')
       self.chrome_options.add_argument(f'user-agent={user_agent}')
       self.chrome_options.add_argument('--disable-blink-features=AutomationControlled')
       self.chrome_options.add_argument('--incognito')


   def _convert_date(self, date_str: str) -> str:
       """Convert date string to YYYY-MM-DD format."""
       try:
           date_obj = datetime.strptime(date_str.strip(), "%d %b %Y")
           return date_obj.strftime("%Y-%m-%d")
       except Exception as e:
           logging.error(f"Error converting date {date_str}: {str(e)}")
           return date_str


   def _parse_tender_row(self, row) -> Dict:
       """Parse a single tender row and return structured data."""
       tender = {
           "title": "",
           "organization": "",
           "posted_date": "",
           "closing_date": "",
           "location": "",
           "url": "",
           "source": "globaltenders.com",
           "tender_content": ""
       }
      
       try:
           # Find all key-value pairs in the row
           divs = row.find_all('div', class_='row')[0].find_all('div')
           data = {}
          
           # Extract key-value pairs
           for i in range(0, len(divs)-2, 2):
               key = divs[i].text.strip().rstrip(':').lower()
               value = divs[i+1].text.strip().lower()
               data[key] = value
          
           # Map the data to our tender format
           tender["title"] = data.get("description", "")
           tender["organization"] = data.get("authority", "")
           tender["location"] = data.get("country", "")
          
           # Handle the closing date
           if "action deadline" in data:
               tender["closing_date"] = self._convert_date(data["action deadline"])
          
           # Get the URL if available
           url_element = row.find('a', class_='btn-sdetail')
           if url_element:
               tender["url"] = url_element.get('href', '')
          
           # Combine relevant information for tender_content
           tender["tender_content"] = f"{data.get('description', '')} - {data.get('notice type', '')}"
          
       except Exception as e:
           logging.error(f"Error parsing tender row: {str(e)}")
      
       return tender


   def _run(
       self,
       url: str = "https://www.globaltenders.com/free-global-tenders/",
       wait_time: int = 10,
       run_manager: Optional[CallbackManagerForToolRun] = None
   ) -> Dict:
       """Run the tool."""
       driver = None
       try:
           driver = webdriver.Chrome(options=self.chrome_options)
           driver.get(url)
          
           # Wait for the table to be present
           wait = WebDriverWait(driver, wait_time)
           table = wait.until(EC.presence_of_element_located((By.CLASS_NAME, "gt-table")))
          
           # Get the page source after JavaScript has loaded
           html_content = driver.page_source
           soup = BeautifulSoup(html_content, 'html.parser')
          
           # Find all tender rows
           tender_rows = soup.find_all('tr', id=lambda x: x and x.startswith('tender_GT'))
          
           tenders = []
           for row in tender_rows:
               tender = self._parse_tender_row(row)
               if tender["title"]:  # Only add if we have at least a title
                   tenders.append(tender)
          
           return {
               'success': True,
               'items_found': len(tenders),
               'tenders': tenders
           }
          
       except Exception as e:
           logging.error(f"Error scraping URL: {str(e)}")
           return {
               'success': False,
               'error': str(e)
           }
       finally:
           if driver:
               driver.quit()


   async def _arun(self, url: str, wait_time: int = 10) -> Dict:
       """Run the tool asynchronously."""
       raise NotImplementedError("GlobalTendersScraper does not support async")


# Example usage
if __name__ == "__main__":
   scraper = GlobalTendersScraper()
   try:
       tool_input = {
           "url": "https://www.globaltenders.com/free-global-tenders/",
           "wait_time": 10
       }
       result = scraper.run(tool_input)
       if result['success']:
           print(f"\nSuccessfully scraped {result['items_found']} tenders!")
           print("\nScraped Data:")
           import json
           print(json.dumps(result['tenders'], indent=2, ensure_ascii=False))
       else:
           print("Error:", result.get('error', 'Unknown error'))
   except Exception as e:
       print(f"Error: {str(e)}")
				
			

To test this tool run: 
Python3 globalTendersScrapper.py 

You should get results like this:

Step 2: Create main.py and Import Necessary Libraries

Secondly, create main.py which will contain mostly all of our tender finder code.

Lets first, import all the essential Python libraries and modules required for the project.

				
					from typing import  Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from db.db import DB
from datetime import datetime
from tools.globalTendersScrapper import GlobalTendersScraper
#from tools.website2Scrapper import Website2Scraper
from tools.TelegramTool import TelegramTool
from tools.telegramTemplates.tender1Template import format_tender_message
from os import getenv
load_dotenv()
				
			

Step 3: Let's define the data models and the state

The Tender data model represents a structured format for storing and managing tender/bid information in the system.

				
					# Data Models
class Tender(BaseModel):
   """Represents a tender with its attributes."""
   id: int = Field(description="Unique identifier for the tender")
   title: str = Field(description="Title of the tender")
   organization: str = Field(description="Organization issuing the tender")
   posted_date: str = Field(description="Date when tender was posted")
   closing_date: str = Field(description="Deadline for tender submission")
   location: str = Field(description="Geographic location of the tender")
   url: str = Field(description="URL path to tender details")
   source: str = Field(description="Website source of the tender")
   tender_content: str = Field(description="Full content/details of the tender")
   state: str = Field(description="State of the tender")
   is_sent: bool = Field(description="Whether the tender has been sent to the client")


class TenderListing(BaseModel):
   """Collection of tenders."""
   tenders: list[Tender]


class State(TypedDict):
   """Application state definition."""
   human_message: str
				
			

Notice that the state will only include the human message which we will be passing when invoking the graph.

Step 4: Initialize LLMs and loan prompts

				
					# ------------------------------------------------------------
# Language Models Configuration
def initialize_language_models():
   """Initialize and configure language models."""
   return {
       'primary': ChatOpenAI(model="gpt-4o", temperature=0),
   }


# ------------------------------------------------------------
# Prompts
def load_system_prompt() -> str:
   """Load the system prompt from file."""
   with open("prompts/tender_analyst_prompt.md", "r") as file:
       return file.read()
				
			

Step 5: Create the prompt file

In the current directory create a folder called prompts, inside create a file called 
tender_analyst_prompt.md

Paste the following prompt

				
					# Procurement Analyst Prompt: Scout


## Role / Persona


- **Name:** Scout
- **Role:** Procurement Analyst
- **Expertise**:
 - 8+ years in tender analysis & procurement workflows.
 - Proficient in cross-referencing technical, geographic, and compliance requirements.
 - Rigorous attention to detail with zero tolerance for partial matches.


---


## Goal


Identify exact matches only from tender listings that satisfy all user-specified criteria (e.g., location, budget, deadlines, scope). Discard partial/incomplete matches.


---


## Instructions


- Parse Requirements: Extract user’s key filters (e.g., location: "Houston, TX"; sector: "Renewable Energy").


- Field-by-Field Validation:


 - Compare every tender field against all user requirements.


---


## Output Formatting Rules


Structure your response **exactly** in the following format:


```json
[
 {
   "id": "Unique identifier for the tender",
   "title": "Exact title from source (e.g., 'Construction of Solar Farm in Texas')",
   "organization": "Issuing body (e.g., 'Texas Energy Authority')",
   "posted_date": "20 Jan 2025",
   "closing_date": "30 Jan 2025 (or blank)",
   "location": "Specific city/region (e.g., 'Houston, TX')",
   "url": "Direct path (e.g., '/tenders/1234')",
   "source": "Website name (e.g., 'government_tenders.gov')",
   "tender_content": "Full details, unmodified"
 }
]
```


## Validation Constraints


- 🚫 No JSON deviations: Ensure commas, quotes, and brackets are syntactically correct.


- 🚫 No assumptions: If user requires "budget > $1M" and tender lacks budget data, reject it.


- 🔄 Empty array mandate: Return [] if no 100% matches.
				
			

Step 6: Tender Repository class

The TenderRepository class serves as a data access layer that handles all database operations related to tenders. 

It follows the Repository pattern, abstracting away the database implementation details (handled by the DB class) from the rest of the application logic.

The class acts as a bridge between the application’s business logic and the database, providing a clean API for tender-related data operations.

Append the following code into the main.py

				
					# ------------------------------------------------------------
# Database Operations
class TenderRepository:
   """Handles database operations for tenders."""
   def __init__(self):
       self.db = DB()


   def fetch_waiting_tenders(self, limit: int = 8) -> list[Tender]:
       """Fetch tenders waiting for filtering."""
       try:
           raw_tenders = self.db.get_tenders_by_state("waiting_for_filtering", limit=limit)
           return [self._convert_to_tender(tender) for tender in raw_tenders]
       except Exception as e:
           print(f"[REPOSITORY] Error fetching waiting tenders: {e}")
           return []


   def count_waiting_tenders(self) -> int:
       """Count tenders waiting for filtering.
      
       Returns:
           int: Number of tenders in waiting_for_filtering state
       """
       try:
           return self.db.count_tenders_by_state("waiting_for_filtering")
       except Exception as e:
           print(f"[REPOSITORY] Error counting waiting tenders: {e}")
           return 0


   def update_tender_state(self, tender_id: int, new_state: str) -> None:
       """Update tender state in database."""
       try:
           if not self.db.update_tender_field(tender_id, "state", new_state):
               print(f"[REPOSITORY] Failed to update tender {tender_id} to state {new_state}")
       except Exception as e:
           print(f"[REPOSITORY] Error updating tender state: {e}")


   def insert_new_tender(self, tender_json: dict) -> None:
       """Insert a new tender into the database from a JSON object.
      
       Args:
           tender_json (dict): JSON object containing tender data with keys:
               - title
               - organization
               - posted_date
               - closing_date
               - location
               - url
               - source
               - tender_content
       """
       tender_tuple = (
           tender_json.get('title'),
           tender_json.get('organization'),
           tender_json.get('posted_date'),
           tender_json.get('closing_date'),
           tender_json.get('location'),
           tender_json.get('url'),
           tender_json.get('source'),
           tender_json.get('tender_content'),
       )
       self.db.insert_tender(tender_tuple)


   def tender_exists(self, title: str, posted_date: str) -> bool:
       """Check if a tender with given title and posted date exists.
      
       Args:
           title: The tender title
           posted_date: The tender posted date
          
       Returns:
           bool: True if tender exists, False otherwise
       """
       return self.db.tender_exists(title, posted_date)


   def fetch_qualified_unsent_tenders(self) -> list[Tender]:
       """Fetch qualified tenders that haven't been sent yet."""
       try:
           raw_tenders = self.db.get_tenders_by_state_and_sent(
               state="qualified",
               is_sent=False
           )
           return [self._convert_to_tender(tender) for tender in raw_tenders]
       except Exception as e:
           print(f"[REPOSITORY] Error fetching qualified unsent tenders: {e}")
           return []


   def mark_tender_as_sent(self, tender_id: int) -> None:
       """Mark a tender as sent."""
       try:
           if not self.db.update_tender_field(tender_id, "is_sent", True):
               print(f"[REPOSITORY] Failed to mark tender {tender_id} as sent")
       except Exception as e:
           print(f"[REPOSITORY] Error marking tender as sent: {e}")


   @staticmethod
   def _convert_to_tender(raw_tender: dict) -> Tender:
       """Convert raw database tender to Tender model."""
       return Tender(
           **{
               **raw_tender,
               'posted_date': str(raw_tender.get('posted_date', "")),
               'closing_date': str(raw_tender.get('closing_date', "")),
               'tender_content': str(raw_tender.get('tender_content', ""))
           }
       )
  

				
			

Step 7: Create the DB class

We are going to be to the main.py, lets first create dbfolder inside your current project and inside create file db.py

The DB class is a SQLite database wrapper that handles the low-level database operations for the tender management system.

Paste the following code into the db/db.py 

				
					import sqlite3
import os


class DB:
   def __init__(self):
       # Get the directory where db.py is located
       db_dir = os.path.dirname(os.path.abspath(__file__))
       # Set the database path to be in the same directory as db.py
       self.db_name = os.path.join(db_dir, "tenders.db")
       self._ensure_db_exists()
       self.create_table()


   def _ensure_db_exists(self):
       """Create the database file if it doesn't exist"""
       if not os.path.exists(self.db_name):
           conn = sqlite3.connect(self.db_name)
           conn.close()


   def create_table(self):
       """Create the tenders table if it doesn't exist"""
       conn = sqlite3.connect(self.db_name)
       cursor = conn.cursor()


       create_table_query = """
       CREATE TABLE IF NOT EXISTS tenders (
           id INTEGER PRIMARY KEY AUTOINCREMENT,
           title TEXT NOT NULL,
           organization TEXT,
           posted_date DATE,
           closing_date DATE,
           location TEXT,
           url TEXT,
           source TEXT,
           tender_content TEXT(10000),
           created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
           state TEXT DEFAULT 'waiting_for_filtering' CHECK(state IN ('waiting_for_filtering', 'qualified', 'unqualified', 'notified')),
           is_sent BOOLEAN DEFAULT 0
       )
       """
      
       cursor.execute(create_table_query)
       conn.commit()
       conn.close()


   def insert_tender(self, tender):
       """
       Args:
           tender (tuple): A tuple containing tender data in the following order:
               (title: str, organization: str, posted_date: str, closing_date: str,
               location: str, url: str, source: str, tender_content: str)
       """
       conn = sqlite3.connect(self.db_name)
       cursor = conn.cursor()
       insert_query = """
       INSERT INTO tenders (title, organization, posted_date, closing_date, location, url, source, tender_content)
       VALUES (?, ?, ?, ?, ?, ?, ?, ?)
       """
       cursor.execute(insert_query, tender)
       conn.commit()
       conn.close()


   def _convert_to_dictionary(self, cursor, row) -> dict:
       """
       Convert a database row tuple to a dictionary using column names.
      
       Args:
           cursor: The database cursor
           row: A tuple containing row data
          
       Returns:
           dict: Dictionary with column names as keys and row data as values
       """
       if not row:
           return None
          
       columns = [description[0] for description in cursor.description]
       return dict(zip(columns, row))


   def count_tenders_by_state(self, state: str) -> int:
       """
       Count the number of tenders in a given state.
      
       Args:
           state (str): State to count ('waiting_for_filtering', 'qualified', 'unqualified', 'notified')
          
       Returns:
           int: Number of tenders in the specified state
       """
       try:
           conn = sqlite3.connect(self.db_name)
           cursor = conn.cursor()
          
           query = "SELECT COUNT(*) FROM tenders WHERE state = ?"
           cursor.execute(query, (state,))
           count = cursor.fetchone()[0]
          
           return count
       except sqlite3.Error as e:
           print(f"[DB] Error counting tenders by state: {e}")
           return 0
       finally:
           if conn:
               conn.close()


   def get_tenders_by_state(self, state: str, limit: int = None) -> list[dict]:
       """
       Get tenders by state from the database.
      
       Args:
           state (str): State to filter by ('waiting_for_filtering', 'qualified', 'unqualified', 'notified')
           limit (int, optional): Maximum number of records to return
          
       Returns:
           list[dict]: List of tender dictionaries
       """
       try:
           conn = sqlite3.connect(self.db_name)
           cursor = conn.cursor()
          
           query = "SELECT * FROM tenders WHERE state = ?"
           params = [state]
           if limit:
               query += " LIMIT ?"
               params.append(limit)
          
           cursor.execute(query, params)
           results = [self._convert_to_dictionary(cursor, row) for row in cursor.fetchall()]
          
           return results
       except sqlite3.Error as e:
           print(f"[DB] Error fetching tenders by state: {e}")
           return []
       finally:
           if conn:
               conn.close()


   def update_tender_field(self, tender_id: int, field: str, value: any) -> bool:
       """Update a specific field of a tender in the database by its ID."""
       if field not in ['title', 'organization', 'posted_date', 'closing_date',
                        'location', 'url', 'source', 'tender_content', 'state', 'is_sent']:  # Added is_sent
           raise ValueError("Invalid field name")
          
       conn = sqlite3.connect(self.db_name)
       cursor = conn.cursor()
      
       update_query = f"""
       UPDATE tenders
       SET {field} = ?
       WHERE id = ?
       """
      
       cursor.execute(update_query, (value, tender_id))
       conn.commit()
      
       rows_affected = cursor.rowcount
       conn.close()
      
       return rows_affected > 0


   def tender_exists(self, title: str, posted_date: str) -> bool:
       """Check if a tender with given title and posted date exists.
      
       Args:
           title: The tender title
           posted_date: The tender posted date
          
       Returns:
           bool: True if tender exists, False otherwise
       """
       conn = sqlite3.connect(self.db_name)
       cursor = conn.cursor()
      
       query = """
       SELECT COUNT(*) FROM tenders
       WHERE title = ? AND posted_date = ?
       """
      
       cursor.execute(query, (title, posted_date))
       count = cursor.fetchone()[0]
      
       conn.close()
       return count > 0


   def get_tenders_by_state_and_sent(self, state: str, is_sent: bool) -> list[dict]:
       """Get tenders by state and sent status."""
       try:
           conn = sqlite3.connect(self.db_name)
           cursor = conn.cursor()
          
           query = """
               SELECT * FROM tenders
               WHERE state = ? AND is_sent = ?
               ORDER BY posted_date DESC
           """
           cursor.execute(query, (state, is_sent))
           results = [self._convert_to_dictionary(cursor, row) for row in cursor.fetchall()]
          
           return results
       except Exception as e:
           print(f"[DB] Error fetching tenders by state and sent status: {e}")
           return []
       finally:
           if conn:
               conn.close()


if __name__ == "__main__":
   db = DB()
   print(db.update_tender_field(61, "is_sent", False))
   # Verify the data was inserted
  

				
			

Step 8: Create the Nodes

Now let’s get back to the main.py and create the main nodes

				
					# ------------------------------------------------------------
# Workflow Nodes


def create_tender_filter_node(repo: TenderRepository, llm: ChatOpenAI, system_prompt: str):
   """Create node for filtering tenders."""
   def tender_filter_node(state: State) -> dict:
       # Fetch batch of tenders
       waiting_tenders = repo.fetch_waiting_tenders(limit=8)
       print(f"\n[TENDER FILTER] Processing batch of {len(waiting_tenders)} tenders...")
      
       if not waiting_tenders:
           print("[TENDER FILTER] No tenders to process")
           return state


       messages = [
           SystemMessage(content=system_prompt),
           HumanMessage(content=state["human_message"] + str(waiting_tenders))
       ]


       print("[TENDER FILTER] Invoking LLM for classification...")
       llm_with_structure = llm.with_structured_output(
           TenderListing,
           method="json_schema",
           strict=True
       )


       qualified_tenders = llm_with_structure.invoke(messages)
       print(f"[TENDER FILTER] Found {len(qualified_tenders.tenders)} qualified tenders in this batch")
      
       for tender in waiting_tenders:
           is_qualified = any(qt.id == tender.id for qt in qualified_tenders.tenders)
           new_state = "qualified" if is_qualified else "unqualified"
           repo.update_tender_state(tender.id, new_state)
           print(f"[TENDER FILTER] Tender {tender.id} marked as {new_state}")


       return state
   return tender_filter_node


def should_continue_filtering(state: State) -> Literal["tender_filter", "END"]:
   """Determine if filtering should continue."""
   repo = TenderRepository()
   remaining_tenders = repo.count_waiting_tenders()
   print(f"\n[WORKFLOW] Remaining tenders to process: {remaining_tenders}")
   return "tender_filter" if remaining_tenders > 0 else END


def create_scraper_node(repo: TenderRepository):
   """Create node for scraping tenders from various sources."""
   def scraper_node(state: State) -> dict:
       print("\n[SCRAPER] Starting tender scraping process...")
      
       # Global Tenders Scraping
       try:
           print("[SCRAPER] Initiating scraping from Global Tenders...")
           print("[SCRAPER] This may take several minutes due to wait_time=15 seconds...")
           global_response = GlobalTendersScraper().run({})
          
           if global_response.get('success'):
               scraped_tenders = global_response.get('tenders', [])
               print(f"[SCRAPER] Successfully scraped {len(scraped_tenders)} tenders from Global Tenders")
               process_scraped_tenders(repo, scraped_tenders)
           else:
               print("[SCRAPER] Global Tenders scraping failed")
              
       except Exception as e:
           print(f"[SCRAPER] Error during Global Tenders scraping: {str(e)}")
      
       # Website 2 Scraping
       '''
       try:
           print("\n[SCRAPER] Initiating scraping from Website 2...")
           website2_response = Website2Scraper().run({
               "limit": 20,
               "offset": 0
           })
          
           if website2_response.get('success'):
               website2_tenders = website2_response.get('tenders', [])
               print(f"[SCRAPER] Successfully scraped {len(website2_tenders)} tenders from Website 2")
               process_scraped_tenders(repo, website2_tenders)
           else:
               print("[SCRAPER] Website 2 scraping failed")
              
       except Exception as e:
           print(f"[SCRAPER] Error during Website 2 scraping: {str(e)}")


       '''


          
       return state


   def process_scraped_tenders(repo: TenderRepository, tenders: list) -> None:
       """Process and store scraped tenders."""
       new_tenders = 0
       for tender in tenders:
           if not is_valid_date(tender.get('posted_date')):
               print(f"[SCRAPER] Invalid posted_date found: {tender.get('posted_date')}")
               tender['posted_date'] = ''
           if not is_valid_date(tender.get('closing_date')):
               print(f"[SCRAPER] Invalid closing_date found: {tender.get('closing_date')}")
               tender['closing_date'] = ''
          
           if not repo.tender_exists(tender['title'], tender['posted_date']):
               repo.insert_new_tender(tender)
               new_tenders += 1
      
       print(f"[SCRAPER] Added {new_tenders} new tenders to database")
      
   return scraper_node
				
			

Lets not forget the is_valid_date functions, and the telegram notifier ode which in the code i named it notification_node

				
					def is_valid_date(date_str: str) -> bool:
   """Check if a string represents a valid date."""
   if not date_str:
       return False
   try:
      
       datetime.strptime(date_str, '%Y-%m-%d')
       return True
   except ValueError:
       return False


def create_notification_node(repo: TenderRepository, telegram_tool: TelegramTool):
   """Create node for sending notifications about qualified tenders."""
   def notification_node(state: State) -> dict:
       print("\n[NOTIFIER] Starting tender notification process...")
      
       # Step 1: Fetch qualified, unsent tenders
       try:
           qualified_tenders = repo.fetch_qualified_unsent_tenders()
           print(f"[NOTIFIER] Found {len(qualified_tenders)} qualified tenders to notify")
          
           if not qualified_tenders:
               print("[NOTIFIER] No new qualified tenders to notify")
               return state
          
           # Step 2: Format and send notifications
           _send_tender_notifications(qualified_tenders, repo, telegram_tool)
          
       except Exception as e:
           print(f"[NOTIFIER] Error in notification process: {str(e)}")
      
       return state
  
   return notification_node


def _send_tender_notifications(tenders: list, repo: TenderRepository, telegram_tool: TelegramTool) -> None:
   """Format and send notifications for each tender."""
   CHAT_ID = "<YOUR_CHAT_ID>"  # Consider moving to environment variables
  
   for tender in tenders:
       try:
           # Format tender message
           formatted_message = format_tender_message(tender)
          
           # Send message through Telegram
           print(f"[NOTIFIER] Sending notification for tender: {tender.id}")
           result = telegram_tool.run({
               "message": formatted_message,
               "chat_id": CHAT_ID
           })
          
           if result['success']:
               print(f"[NOTIFIER] Successfully sent notification for tender {tender.id}")
               # Update is_sent status in database using mark_tender_as_sent
               repo.mark_tender_as_sent(tender.id)  # Use the correct method
               print(f"[NOTIFIER] Marked tender {tender.id} as sent")
           else:
               print(f"[NOTIFIER] Failed to send notification for tender {tender.id}: {result.get('error')}")
              
       except Exception as e:
           print(f"[NOTIFIER] Error processing tender {tender.id}: {str(e)}")
				
			

Step 9: Create the Workflow

This is where we define the flow of our system

				
					# ------------------------------------------------------------
# Workflow Configuration
def create_workflow():
   """Create and configure the workflow."""
   models = initialize_language_models()
   system_prompt = load_system_prompt()
   repo = TenderRepository()
  
   # Get bot token from environment variable
   telegram_bot_token = getenv('TELEGRAM_BOT_TOKEN')
   if not telegram_bot_token:
       raise ValueError("TELEGRAM_BOT_TOKEN environment variable is not set")
      
   telegram_tool = TelegramTool(bot_token=telegram_bot_token)


   workflow = StateGraph(State)
  
   workflow.add_node("scraper", create_scraper_node(repo))
   workflow.add_node("tender_filter", create_tender_filter_node(repo, models['primary'], system_prompt))
   workflow.add_node("notifier", create_notification_node(repo, telegram_tool))
  
   # Update workflow edges
   workflow.add_edge("scraper", "tender_filter")
   workflow.add_conditional_edges(
       "tender_filter",
       should_continue_filtering,
       {
           "tender_filter": "tender_filter",
           END: "notifier"
       }
   )
   workflow.add_edge("notifier", END)
  
   workflow.set_entry_point("scraper")
   return workflow.compile()


# ------------------------------------------------------------
# Main Execution
if __name__ == "__main__":
   print("\n[MAIN] Starting tender processing workflow...")
   app = create_workflow()
   initial_state = {
       "human_message": """
      
       find tenders related to Technology services.
      
       TECH FOCUS:
       - Software development/implementation
       - Enterprise systems (ERP, CRM)
       - AI solutions
       - Digital transformation services
       - Management system
      
       Exclusion Criteria:
       - Non-tech related services
       - Hardware-only procurement
       - Basic IT support
       - Below minimum budget threshold
       """
      
   }
   results = app.invoke(initial_state)


   print("\n[MAIN] Workflow completed successfully")
   print(results)
				
			

This is where we pass our query prompt, this is where you could choose which type of tenders, in which sectors, keywords and requirements you are interested in.

Step 10: Create the tool

Create a file name tools/TelegramTool.py

Inside paste the following code:

				
					from langchain.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Optional, Type
from langchain.callbacks.manager import CallbackManagerForToolRun
from telegram import Bot
import asyncio
import logging
from datetime import datetime
"""
Step-by-Step Guide to Set Up and Configure Your Telegram Bot:


1. **Create a New Bot:**
  - Open Telegram and search for "BotFather".
  - Start a chat with BotFather and send the command `/newbot`.
  - Follow the prompts to set your bot's name and username.
  - After creation, BotFather will provide a unique token. Save this token securely.


2. **Configure Bot to Allow Group Messages:**
  - In the chat with BotFather, send the command `/setprivacy`.
  - Select your bot from the list of bots you own.
  - Choose the option to disable privacy mode. This setting allows your bot to receive all messages in group chats.


3. **Access Bot Updates:**
  - Use the following URL to access updates, replacing `<bot_token>` with your bot's token:
    `https://api.telegram.org/bot<bot_token>/getUpdates`
  - This URL will return the latest updates sent to your bot, including messages and commands.
"""




class TelegramInput(BaseModel):
   """Input for Telegram message sender."""
   message: str = Field(
       description="Message to be sent through Telegram"
   )
   chat_id: str = Field(
       description="Telegram chat ID where the message will be sent"
   )


class TelegramTool(BaseTool):
   """Tool for sending messages through Telegram."""
   name: str = "telegram_sender"
   description: str = """Useful for sending messages through Telegram.
   Requires a message and a chat ID. Returns success status and message details."""
   args_schema: Type[BaseModel] = TelegramInput
   bot: Bot = None
  
   def __init__(self, bot_token: str):
       """Initialize the Telegram bot with the provided token."""
       super().__init__()
       self.bot = Bot(token=bot_token)
      
   def _run(
       self,
       message: str,
       chat_id: str,
       run_manager: Optional[CallbackManagerForToolRun] = None
   ) -> dict:
       """Run the tool synchronously."""
       try:
           # Get the current event loop or create a new one
           try:
               loop = asyncio.get_event_loop()
           except RuntimeError:
               loop = asyncio.new_event_loop()
               asyncio.set_event_loop(loop)
          
           result = loop.run_until_complete(self._send_message(message, chat_id))
           return result
          
       except Exception as e:
           logging.error(f"Error sending Telegram message: {str(e)}")
           return {
               'success': False,
               'error': str(e),
               'timestamp': datetime.now().isoformat()
           }
      def format_tender_message(tender) -> str:
        """
         Format a tender object into an HTML-formatted message for Telegram.
  
       Args:
       tender: Tender object containing tender details
      
        Returns:
       str: HTML-formatted message ready for Telegram
       """
   # Extract hashtags from title and organization
   words = set((tender.title + " " + tender.organization).split())
   hashtags = " ".join([f"#{word}" for word in words if len(word) > 3])[:50]  # Limit hashtags length
  
   html_message = f"""
<b>🔔 New Tender Alert!</b>


<b>Title:</b> {tender.title}


<i>📋 Key Details</i>
• <b>Organization:</b> {tender.organization}
• <b>Location:</b> {tender.location}
• <b>Posted Date:</b> {tender.posted_date}
• <b>Closing Date:</b> {tender.closing_date}


<i>📝 Description:</i>
{tender.tender_content}


<a href="{tender.url}">View Full Details</a>


<code>Source: {tender.source}</code>
<code>{hashtags}</code>
"""
   return html_message


   async def _send_message(self, message: str, chat_id: str) -> dict:
       """Send message through Telegram bot."""
       try:
           # Send the message
           async with self.bot:
               sent_message = await self.bot.send_message(
                   chat_id=chat_id,
                   text=message,
                   parse_mode='HTML'  # Supports HTML formatting
               )
          
           return {
               'success': True,
               'message_id': sent_message.message_id,
               'chat_id': chat_id,
               'timestamp': datetime.now().isoformat()
           }
          
       except Exception as e:
           logging.error(f"Error sending Telegram message: {str(e)}")
           return {
               'success': False,
               'error': str(e),
               'timestamp': datetime.now().isoformat()
           }


   async def _arun(
       self,
       message: str,
       chat_id: str,
       run_manager: Optional[CallbackManagerForToolRun] = None
   ) -> dict:
       """Run the tool asynchronously."""
       return await self._send_message(message, chat_id)
				
			

Find the fill code in my github repo:

https://github.com/moe1047/scout-the-tender-finder/

Conclusion

To wrap things up, building Scout—the 24/7 AI Bid Hunter—really shows how agentic systems can take over some of the most repetitive tasks in tender finding. 

By combining a few smart tools—a scraper to gather the data, an LLM to sift through tenders, and a Telegram notifier to keep you in the loop—this system not only makes the process consistent and timely but also scales up to handle a huge volume of information without breaking a sweat.

The whole project is a great example of how breaking a problem down into simple, manageable parts can lead to a powerful, automated solution. Hopefully, this overview gives you some fresh ideas on how you might apply similar strategies to your own projects. 

Share the Post:

Related Posts

Get Exclusive Insights and Automation Tips

Stay up-to-date with the latest trends and best practices in automation and AI.

Follow us