| |
|
|
| from google.cloud import bigquery |
| from google.cloud.exceptions import NotFound |
| import uuid |
| from datetime import datetime |
|
|
| |
| PROJECT_ID = "gem-creation" |
| DATASET_ID = "aura_mind_glow_data" |
| TABLE_ID = "farm_analysis" |
|
|
| def get_bigquery_client(): |
| """Returns an authenticated BigQuery client.""" |
| try: |
| client = bigquery.Client(project=PROJECT_ID) |
| print("β
Successfully authenticated with BigQuery.") |
| return client |
| except Exception as e: |
| print(f"β Error authenticating with BigQuery: {e}") |
| return None |
|
|
| def create_dataset_if_not_exists(client): |
| """Creates the BigQuery dataset if it doesn't exist.""" |
| dataset_id = f"{PROJECT_ID}.{DATASET_ID}" |
| try: |
| client.get_dataset(dataset_id) |
| print(f"βΉοΈ Dataset {dataset_id} already exists.") |
| except NotFound: |
| print(f"π‘ Dataset {dataset_id} not found. Creating dataset...") |
| dataset = bigquery.Dataset(dataset_id) |
| dataset.location = "US" |
| dataset = client.create_dataset(dataset, timeout=30) |
| print(f"β
Created dataset {client.project}.{dataset.dataset_id}") |
|
|
|
|
| def create_table_if_not_exists(client): |
| """Creates the BigQuery table if it doesn't exist.""" |
| table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" |
| try: |
| client.get_table(table_id) |
| print(f"βΉοΈ Table {table_id} already exists.") |
| except NotFound: |
| print(f"π‘ Table {table_id} not found. Creating table...") |
| schema = [ |
| bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"), |
| bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"), |
| bigquery.SchemaField("farmer_id", "STRING", mode="NULLABLE"), |
| bigquery.SchemaField("gps_latitude", "FLOAT", mode="NULLABLE"), |
| bigquery.SchemaField("gps_longitude", "FLOAT", mode="NULLABLE"), |
| bigquery.SchemaField("crop_type", "STRING", mode="NULLABLE"), |
| bigquery.SchemaField("crop_variety", "STRING", mode="NULLABLE"), |
| bigquery.SchemaField("ai_diagnosis", "STRING", mode="NULLABLE"), |
| bigquery.SchemaField("confidence_score", "FLOAT", mode="NULLABLE"), |
| bigquery.SchemaField("recommended_action", "STRING", mode="NULLABLE"), |
| bigquery.SchemaField("farmer_feedback", "STRING", mode="NULLABLE"), |
| bigquery.SchemaField("treatment_applied", "STRING", mode="NULLABLE"), |
| bigquery.SchemaField("outcome_image_id", "STRING", mode="NULLABLE"), |
| ] |
| table = bigquery.Table(table_id, schema=schema) |
| table = client.create_table(table) |
| print(f"β
Created table {table.project}.{table.dataset_id}.{table.table_id}") |
|
|
| def upload_diagnosis_to_bigquery(diagnosis_data: dict): |
| """Uploads a single diagnosis record (from a dictionary) to BigQuery.""" |
| client = get_bigquery_client() |
| if client is None: |
| print("β BigQuery client not available. Cannot upload diagnosis.") |
| return "BigQuery client not available." |
|
|
| create_dataset_if_not_exists(client) |
| create_table_if_not_exists(client) |
|
|
| table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" |
|
|
| if "analysis_id" not in diagnosis_data: |
| diagnosis_data["analysis_id"] = str(uuid.uuid4()) |
| if "timestamp" not in diagnosis_data: |
| diagnosis_data["timestamp"] = datetime.now().isoformat() |
|
|
| rows_to_insert = [diagnosis_data] |
|
|
| errors = client.insert_rows_json(table_id, rows_to_insert) |
| if not errors: |
| print(f"β
Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully.") |
| return "Diagnosis uploaded successfully." |
| else: |
| print(f"β Encountered errors while inserting diagnosis record: {errors}") |
| return f"Error uploading diagnosis: {errors}" |
|
|
|
|
| def upload_csv_to_bigquery(csv_file_path: str): |
| """ |
| Uploads the contents of a CSV file to the specified BigQuery table. |
| |
| Args: |
| csv_file_path (str): The local path to the CSV file. |
| """ |
| client = get_bigquery_client() |
| if client is None: |
| print("β BigQuery client not available. Cannot upload CSV.") |
| return |
|
|
| create_dataset_if_not_exists(client) |
| create_table_if_not_exists(client) |
|
|
| table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" |
|
|
| |
| job_config = bigquery.LoadJobConfig( |
| source_format=bigquery.SourceFormat.CSV, |
| skip_leading_rows=1, |
| |
| write_disposition=bigquery.WriteDisposition.WRITE_APPEND, |
| ) |
|
|
| print(f"π Starting CSV upload from '{csv_file_path}' to table '{table_id}'...") |
|
|
| try: |
| with open(csv_file_path, "rb") as source_file: |
| load_job = client.load_table_from_file(source_file, table_id, job_config=job_config) |
|
|
| load_job.result() |
|
|
| destination_table = client.get_table(table_id) |
| |
| rows_uploaded = load_job.output_rows |
| print(f"β
Job finished. Loaded {rows_uploaded} new rows. The table '{table_id}' now has a total of {destination_table.num_rows} rows.") |
| return "CSV upload successful." |
| except Exception as e: |
| print(f"β An error occurred during the CSV upload: {e}") |
| return f"Error during CSV upload: {e}" |
|
|
|
|
| if __name__ == "__main__": |
| csv_file_to_upload = "farm_analysis_data.csv" |
| |
| print("--- Running BigQuery CSV Uploader Test ---") |
| upload_csv_to_bigquery(csv_file_to_upload) |
| print("--- Test complete ---") |