#!/usr/bin/env python3
"""
Continuous execution script with 15-second intervals.
This script runs continuously with a 15-second pause between each execution cycle.
It includes proper exception handling, timestamped logging, and graceful shutdown
on keyboard interruption.
"""
import time
import datetime
import signal
import sys
import os
import requests
import json
from openai import OpenAI
class ContinuousScript:
"""A class to manage continuous script execution with proper lifecycle management."""
def __init__(self,
interval=15,
base_id="appnoVp4rupoSPJFE",
table_name="inputs"):
"""
Initialize the continuous script.
Args:
interval (int): Time in seconds between execution cycles
base_id (str): Airtable base ID
table_name (str): Airtable table name
"""
self.interval = interval
self.running = True
self.cycle_count = 0
self.base_id = base_id
self.table_name = table_name
self.records = []
# Airtable API configuration
self.airtable_api_key = os.getenv('AIRTABLE_API_KEY')
if not self.airtable_api_key:
raise ValueError(
"AIRTABLE_API_KEY environment variable is required")
self.airtable_url = f"https://api.airtable.com/v0/{self.base_id}/{self.table_name}"
self.contacts_url = f"https://api.airtable.com/v0/{self.base_id}/contacts"
self.headers = {
'Authorization': f'Bearer {self.airtable_api_key}',
'Content-Type': 'application/json'
}
# OpenAI API configuration
self.openai_model = "gpt-4o"
self.openai_api_key = os.getenv('OPENAI_API_KEY')
if not self.openai_api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
self.openai_client = OpenAI(api_key=self.openai_api_key)
self.contact_details = []
# Set up signal handler for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
signal_name = signal.Signals(signum).name
self._log(
f"Received {signal_name} signal. Initiating graceful shutdown...")
self.running = False
def _log(self, message):
"""
Log a message with timestamp.
Args:
message (str): The message to log
"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}", flush=True)
def _fetch_airtable_records(self):
"""
Fetch records from Airtable with Status = "Todo".
Returns:
list: List of records with Status = "Todo"
"""
try:
# Use filterByFormula to get only records with Status = "Todo"
params = {'filterByFormula': '{Status} = "Todo"'}
response = requests.get(self.airtable_url,
headers=self.headers,
params=params)
response.raise_for_status()
data = response.json()
records = data.get('records', [])
self._log(f"Found {len(records)} records with Status = 'Todo'")
return records
except requests.exceptions.RequestException as e:
self._log(f"Error fetching Airtable records: {str(e)}")
return []
except Exception as e:
self._log(f"Unexpected error fetching Airtable records: {str(e)}")
return []
def _extract_contact_info(self, email_body):
"""
Extract contact information from email body using OpenAI.
Args:
email_body (str): The email body text to analyze
Returns:
str: JSON string containing extracted contact information
"""
try:
if not email_body or email_body.strip() == "":
return "[]"
prompt = """Extract all contact information from the following email body text. Return the result as a JSON array containing objects with these exact keys: "Full Name", "Job Title", "Company", "Email", "Phone", "Address". If any information is not found, use an empty string for that field. Only include contacts that have at least one piece of information available.
Email body:
""" + email_body
response = self.openai_client.chat.completions.create(
model=self.openai_model,
messages=[{
"role":
"system",
"content":
"You are an expert at extracting contact information from text. Always respond with valid JSON format."
}, {
"role": "user",
"content": prompt
}],
response_format={"type": "json_object"},
max_tokens=1000)
result = response.choices[0].message.content
print("Email body parsed by AI:", result)
# Validate that it's valid JSON
try:
if not result:
return "[]"
parsed = json.loads(str(result))
# Ensure it's a list, if it's an object with a key containing the list, extract it
if isinstance(parsed, dict):
# Look for common keys that might contain the contact list
for key in [
'contacts', 'contact_information', 'results',
'data'
]:
if key in parsed and isinstance(parsed[key], list):
return json.dumps(parsed[key])
# If no list found, wrap the object in a list if it looks like contact info
if any(field in parsed for field in
['Full Name', 'Email', 'Phone', 'Company']):
return json.dumps([parsed])
return "[]"
elif isinstance(parsed, list):
return result
else:
return "[]"
except json.JSONDecodeError:
self._log(f"Invalid JSON returned from OpenAI: {result}")
return "[]"
except Exception as e:
self._log(f"Error extracting contact info with OpenAI: {str(e)}")
return "[]"
def _analyze_business_card(self, attachment_url):
"""
Analyze a business card image using OpenAI vision API.
Args:
attachment_url (str): URL of the business card image
Returns:
str: JSON string containing extracted contact information
"""
try:
if not attachment_url or attachment_url.strip() == "":
return "[]"
prompt = """Analyze this business card image and extract all contact information. Return the result as a JSON array containing objects with these exact keys: "Full Name", "Job Title", "Company", "Email", "Phone", "Address". If any information is not found, use an empty string for that field. Only include contacts that have at least one piece of information available."""
response = self.openai_client.chat.completions.create(
model=self.openai_model,
messages=[{
"role":
"system",
"content":
"You are an expert at extracting contact information from business card images. Always respond with valid JSON format."
}, {
"role":
"user",
"content": [{
"type": "text",
"text": prompt
}, {
"type": "image_url",
"image_url": {
"url": attachment_url
}
}]
}],
response_format={"type": "json_object"},
max_tokens=1000)
result = response.choices[0].message.content
print("Business card analyzed by AI:", result)
# Validate that it's valid JSON
try:
if not result:
return "[]"
parsed = json.loads(str(result))
# Ensure it's a list, if it's an object with a key containing the list, extract it
if isinstance(parsed, dict):
# Look for common keys that might contain the contact list
for key in [
'contacts', 'contact_information', 'results',
'data'
]:
if key in parsed and isinstance(parsed[key], list):
return json.dumps(parsed[key])
# If no list found, wrap the object in a list if it looks like contact info
if any(field in parsed for field in
['Full Name', 'Email', 'Phone', 'Company']):
return json.dumps([parsed])
return "[]"
elif isinstance(parsed, list):
return result
else:
return "[]"
except json.JSONDecodeError:
self._log(
f"Invalid JSON returned from OpenAI for business card: {result}"
)
return "[]"
except Exception as e:
self._log(f"Error analyzing business card with OpenAI: {str(e)}")
return "[]"
def _process_attachments(self, attachments):
"""
Process email attachments to find and analyze business cards.
Args:
attachments (list): List of attachment objects from Airtable
Returns:
list: List of contact dictionaries extracted from business cards
"""
if not attachments:
return []
all_attachment_contacts = []
for attachment in attachments:
filename = attachment.get('filename', '').lower()
attachment_url = attachment.get('url', '')
# Check if the attachment looks like a business card (image file)
if any(ext in filename for ext in
['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']):
self._log(
f" Analyzing business card attachment: {filename}")
contact_json = self._analyze_business_card(attachment_url)
try:
contacts = json.loads(
str(contact_json)) if contact_json else []
if contacts:
self._log(
f" Found {len(contacts)} contact(s) in business card: {filename}"
)
for j, contact in enumerate(contacts, 1):
self._log(
f" Contact {j}: {contact.get('Full Name', 'No Name')} - {contact.get('Company', 'No Company')}"
)
all_attachment_contacts.extend(contacts)
else:
self._log(
f" No contacts found in business card: {filename}"
)
except json.JSONDecodeError:
self._log(
f" Error parsing contact details from business card: {filename}"
)
else:
self._log(f" Skipping non-image attachment: {filename}")
return all_attachment_contacts
def _get_existing_contacts(self):
"""
Fetch all existing contacts from the contacts table.
Returns:
dict: Dictionary mapping Full Name to record ID and data
"""
try:
response = requests.get(self.contacts_url, headers=self.headers)
response.raise_for_status()
data = response.json()
records = data.get('records', [])
contacts_dict = {}
for record in records:
fields = record.get('fields', {})
full_name = fields.get('Full Name', '').strip()
if full_name:
contacts_dict[full_name] = {
'id': record.get('id'),
'fields': fields
}
self._log(
f"Found {len(contacts_dict)} existing contacts in contacts table"
)
return contacts_dict
except requests.exceptions.RequestException as e:
self._log(f"Error fetching existing contacts: {str(e)}")
return {}
except Exception as e:
self._log(f"Unexpected error fetching existing contacts: {str(e)}")
return {}
def _update_contact_record(self, record_id, contact_data):
"""
Update an existing contact record with new data.
Args:
record_id (str): Airtable record ID
contact_data (dict): Contact information to update
"""
try:
# Prepare fields, only including non-empty values
fields = {}
for key, value in contact_data.items():
if value and value.strip():
fields[key] = value
if not fields:
return False
update_data = {"fields": fields}
response = requests.patch(f"{self.contacts_url}/{record_id}",
headers=self.headers,
json=update_data)
response.raise_for_status()
self._log(
f" Updated contact record {record_id} for {contact_data.get('Full Name', 'Unknown')}"
)
return True
except requests.exceptions.RequestException as e:
self._log(
f" Error updating contact record {record_id}: {str(e)}")
return False
except Exception as e:
self._log(
f" Unexpected error updating contact record {record_id}: {str(e)}"
)
return False
def _create_contact_record(self, contact_data):
"""
Create a new contact record.
Args:
contact_data (dict): Contact information to create
"""
try:
# Prepare fields, only including non-empty values
fields = {}
for key, value in contact_data.items():
if value and value.strip():
fields[key] = value
if not fields or not fields.get('Full Name'):
return False
create_data = {"fields": fields}
response = requests.post(self.contacts_url,
headers=self.headers,
json=create_data)
response.raise_for_status()
result = response.json()
new_record_id = result.get('id', 'Unknown')
self._log(
f" Created new contact record {new_record_id} for {contact_data.get('Full Name', 'Unknown')}"
)
return True
except requests.exceptions.RequestException as e:
self._log(
f" Error creating contact record for {contact_data.get('Full Name', 'Unknown')}: {str(e)}"
)
return False
except Exception as e:
self._log(
f" Unexpected error creating contact record for {contact_data.get('Full Name', 'Unknown')}: {str(e)}"
)
return False
def _sync_contacts_to_airtable(self, contacts):
"""
Sync extracted contacts to the contacts table.
Args:
contacts (list): List of contact dictionaries
"""
if not contacts:
self._log("No contacts to sync")
return
self._log(f"Syncing {len(contacts)} contacts to contacts table...")
# Get existing contacts
existing_contacts = self._get_existing_contacts()
created_count = 0
updated_count = 0
for contact in contacts:
full_name = contact.get('Full Name', '').strip()
if not full_name:
self._log(f" Skipping contact with no Full Name")
continue
if full_name in existing_contacts:
# Update existing contact
print(f"Updating contact: {full_name}")
existing_record = existing_contacts[full_name]
record_id = existing_record['id']
# Merge with existing data, only updating non-empty values
updated_fields = existing_record['fields'].copy()
for key, value in contact.items():
if value and value.strip():
updated_fields[key] = value
if self._update_contact_record(record_id, updated_fields):
updated_count += 1
else:
# Create new contact
print(f"Creating contact: {full_name}")
if self._create_contact_record(contact):
created_count += 1
self._log(
f"Contact sync completed: {created_count} created, {updated_count} updated"
)
def _update_record_status(self, record_id, status="Done"):
"""
Update the status of a record in the inputs table.
Args:
record_id (str): Airtable record ID
status (str): New status value
"""
try:
update_data = {"fields": {"Status": status}}
response = requests.patch(f"{self.airtable_url}/{record_id}",
headers=self.headers,
json=update_data)
response.raise_for_status()
self._log(f" Updated record {record_id} status to '{status}'")
return True
except requests.exceptions.RequestException as e:
self._log(
f" Error updating record {record_id} status: {str(e)}")
return False
except Exception as e:
self._log(
f" Unexpected error updating record {record_id} status: {str(e)}"
)
return False
def _execute_task(self):
"""
Execute the main task logic.
This method reads Airtable records with Status = "Todo" and stores them.
"""
self._log(f"Executing cycle #{self.cycle_count + 1}")
current_time = datetime.datetime.now()
self._log(
f"Current time: {current_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}"
)
# Fetch records from Airtable
self._log("Fetching records from Airtable...")
self.records = self._fetch_airtable_records()
# Process each record to extract contact information
if self.records:
self._log("Todo records found:")
all_contact_details = []
processed_record_ids = []
for i, record in enumerate(self.records, 1):
record_id = record.get('id', 'Unknown ID')
fields = record.get('fields', {})
# Log key fields (you can customize this based on your table structure)
name = fields.get('Name', 'No Name')
status = fields.get('Status', 'No Status')
email_body = fields.get('Email Body', '')
email_attachments = fields.get('Email Attachment', [])
self._log(
f" {i}. ID: {record_id}, Name: {name}, Status: {status}")
record_processed = False
# Extract contact information from Email Body if it exists
if email_body and email_body.strip():
self._log(
f" Processing Email Body for record {record_id}...")
contact_json = self._extract_contact_info(email_body)
try:
contacts = json.loads(
str(contact_json)) if contact_json else []
if contacts:
self._log(
f" Found {len(contacts)} contact(s) in email body"
)
for j, contact in enumerate(contacts, 1):
self._log(
f" Contact {j}: {contact.get('Full Name', 'No Name')} - {contact.get('Company', 'No Company')}"
)
all_contact_details.extend(contacts)
record_processed = True
else:
self._log(f" No contacts found in email body")
except json.JSONDecodeError:
self._log(
f" Error parsing contact details from email body"
)
else:
self._log(
f" No Email Body found for record {record_id}")
# Process email attachments for business cards
if email_attachments:
self._log(
f" Processing {len(email_attachments)} attachment(s) for record {record_id}..."
)
attachment_contacts = self._process_attachments(
email_attachments)
if attachment_contacts:
self._log(
f" Found {len(attachment_contacts)} contact(s) from attachments"
)
all_contact_details.extend(attachment_contacts)
record_processed = True
else:
self._log(f" No contacts found in attachments")
else:
self._log(
f" No Email Attachments found for record {record_id}"
)
# Mark record as processed regardless of whether contacts were found
processed_record_ids.append(record_id)
# Save all extracted contact details
self.contact_details = json.dumps(all_contact_details, indent=2)
self._log(f"Total contacts extracted: {len(all_contact_details)}")
# Sync contacts to Airtable contacts table
if all_contact_details:
self._sync_contacts_to_airtable(all_contact_details)
# Update status of processed records to "Done"
if processed_record_ids:
self._log(
f"Updating status of {len(processed_record_ids)} processed records to 'Done'..."
)
for record_id in processed_record_ids:
self._update_record_status(record_id, "Done")
else:
self._log("No Todo records found in Airtable")
self.contact_details = "[]"
self._log(f"Cycle #{self.cycle_count + 1} completed successfully")
def run(self):
"""
Main execution loop.
Runs the script continuously with the specified interval between cycles.
"""
self._log("Starting continuous script execution...")
self._log(f"Execution interval: {self.interval} seconds")
self._log("Press Ctrl+C to stop the script gracefully")
try:
while self.running:
cycle_start_time = time.time()
try:
# Execute the main task
self._execute_task()
self.cycle_count += 1
self.running = False
except Exception as e:
# Handle exceptions in task execution without stopping the script
self._log(f"Error during task execution: {str(e)}")
self._log("Continuing with next cycle...")
# Calculate actual execution time
execution_time = time.time() - cycle_start_time
self._log(
f"Cycle execution time: {execution_time:.2f} seconds")
if self.running: # Check if we're still supposed to be running
self._log(
f"Waiting {self.interval} seconds until next cycle...")
# Sleep for the specified interval
# Use a loop with shorter sleeps to allow for more responsive shutdown
sleep_time = 0
while sleep_time < self.interval and self.running:
time.sleep(min(1, self.interval - sleep_time))
sleep_time += 1
except KeyboardInterrupt:
# This shouldn't normally be reached due to signal handler, but just in case
self._log("Keyboard interrupt received...")
self.running = False
except Exception as e:
self._log(f"Unexpected error in main loop: {str(e)}")
self.running = False
finally:
self._cleanup()
def _cleanup(self):
"""Perform cleanup operations before shutdown."""
self._log("Performing cleanup operations...")
self._log(f"Total cycles completed: {self.cycle_count}")
self._log("Script execution stopped.")
def main():
"""Main entry point of the script."""
# You can modify the interval here or make it configurable via command line arguments
script = ContinuousScript(interval=15)
script.run()
if __name__ == "__main__":
main()