1+ #!/usr/bin/env python3
2+ """
3+ Export signals and trends data for LLM processing.
4+ Exports all public signals and trends to CSV format, excluding certain fields.
5+ """
6+
7+ import os
8+ import sys
9+ import asyncio
10+ import csv
11+ from datetime import datetime
12+ import psycopg
13+ from psycopg .rows import dict_row
14+
15+ # Add parent directory to path to import src modules
16+ sys .path .insert (0 , os .path .dirname (os .path .dirname (os .path .abspath (__file__ ))))
17+
18+ # No need to import get_connection_string - we'll use DB_CONNECTION directly
19+
20+ # Fields to exclude from the export
21+ EXCLUDE_FIELDS = [
22+ 'is_draft' , 'private' , 'favourite' ,
23+ 'can_edit' , 'modified_at' , 'url' , 'favorite'
24+ ]
25+ file_path = ".exports"
26+
27+ async def export_table_to_csv (conn , table_name , query , filename_prefix ):
28+ """Export data from a table to CSV, excluding certain fields."""
29+ print (f"Exporting { table_name } ..." )
30+
31+ async with conn .cursor (row_factory = dict_row ) as cursor :
32+ await cursor .execute (query )
33+ records = await cursor .fetchall ()
34+
35+ if not records :
36+ print (f"No records found in { table_name } ." )
37+ return
38+
39+ # Get all field names from the first record
40+ all_fields = list (records [0 ].keys ())
41+ # Filter out excluded fields
42+ export_fields = [field for field in all_fields if field not in EXCLUDE_FIELDS ]
43+ # Add app_link as the last column
44+ export_fields .append ('app_link' )
45+
46+ # Compose filename
47+ filename = f'{ file_path } /{ table_name } .csv'
48+
49+ # Ensure export directory exists
50+ os .makedirs (file_path , exist_ok = True )
51+
52+ with open (filename , 'w' , newline = '' , encoding = 'utf-8' ) as csvfile :
53+ writer = csv .DictWriter (csvfile , fieldnames = export_fields )
54+ writer .writeheader ()
55+ for record in records :
56+ row = {field : record [field ] for field in export_fields if field != 'app_link' }
57+ for field , value in row .items ():
58+ if isinstance (value , list ):
59+ row [field ] = ', ' .join (str (v ) for v in value ) if value else ''
60+ # Add app_link
61+ if table_name == 'signals' :
62+ row ['app_link' ] = f'https://signals.data.undp.org/signals/{ record ["id" ]} '
63+ elif table_name == 'trends' :
64+ row ['app_link' ] = f'https://signals.data.undp.org/trends/{ record ["id" ]} '
65+ else :
66+ row ['app_link' ] = ''
67+ writer .writerow (row )
68+
69+ print (f"Exported { len (records )} { table_name } to { filename } " )
70+ return filename
71+
72+ async def main ():
73+ """Main function to export signals and trends."""
74+ # Get database connection string from environment
75+ connection_string = os .environ .get ("DB_CONNECTION" )
76+
77+ if not connection_string :
78+ print ("Error: DB_CONNECTION environment variable not set" )
79+ sys .exit (1 )
80+
81+ try :
82+ # Connect to the database
83+ async with await psycopg .AsyncConnection .connect (
84+ connection_string ,
85+ row_factory = dict_row
86+ ) as conn :
87+ print ("Connected to database successfully" )
88+
89+ # Export signals
90+ signals_query = """
91+ SELECT * FROM signals
92+ WHERE private = FALSE OR private IS NULL
93+ ORDER BY id
94+ """
95+ signals_file = await export_table_to_csv (conn , "signals" , signals_query , "signals" )
96+
97+ # Export trends
98+ trends_query = """
99+ SELECT * FROM trends
100+ ORDER BY id
101+ """
102+ trends_file = await export_table_to_csv (conn , "trends" , trends_query , "trends" )
103+
104+ print ("\n Export completed successfully!" )
105+ if signals_file :
106+ print (f"Signals: { signals_file } " )
107+ if trends_file :
108+ print (f"Trends: { trends_file } " )
109+
110+ except Exception as e :
111+ print (f"Error during export: { e } " )
112+ sys .exit (1 )
113+
114+ if __name__ == "__main__" :
115+ asyncio .run (main ())
0 commit comments