01

Runtime Environment & Core Imports

Initializes the Python execution environment, handles UTF-8 stream buffering, and imports critical geospatial and deep learning libraries.

huggingface.py
# minerai_api_client.py """ MinerAI API Client - Uses the new API endpoints instead of Google Drive """ import os import io import sys import time import shutil import torch import numpy as np import requests from PIL import Image import cv2 from torchvision import transforms from datetime import datetime import gradio as gr import tempfile import torch.nn as nn import segmentation_models_pytorch as smp import base64 from io import BytesIO # Import your model functions from model import get_model, get_preprocessing_fn
02

API Configuration

Core logical unit within the MinerAI infrastructure.

huggingface.py
# ---------------- API Configuration ---------------- # # For production - use your deployed URL API_BASE_URL = os.environ.get("MINERAI_API_URL", "https://minerai.space/api/v1") API_KEY = os.environ.get("MINERAI_API_KEY") API_SECRET = os.environ.get("MINERAI_API_SECRET") API_HEADERS = { "X-API-Key": API_KEY, "X-API-Secret": API_SECRET }
03

Unbuffered output

Core logical unit within the MinerAI infrastructure.

huggingface.py
# ---------------- Unbuffered output ---------------- # sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', line_buffering=True) sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', line_buffering=True)
04

System Component

Core logical unit within the MinerAI infrastructure.

huggingface.py
def log_with_flush(message): print(message) sys.stdout.flush() sys.stderr.flush() time.sleep(0.01)
05

CONFIG

Core logical unit within the MinerAI infrastructure.

huggingface.py
# ---------------- CONFIG ---------------- # ENCODER = "resnet34" ENCODER_WEIGHTS = "imagenet" CLASSES = ["mine"] ACTIVATION = "sigmoid" DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Use your Hugging Face model repository MODEL_REPO = "unbeeten/mineraiv1.0" MODEL_FILENAME = "checkpoint_epoch_10.pth" MODEL_PATH = MODEL_FILENAME HF_TOKEN = os.environ.get("HF_TOKEN", "")
06

Architecture: UNet + ResNet34

Defines the specific UNet topology with a ResNet34 encoder and integrated 2D Dropout layers for robust planetary-scale feature extraction.

huggingface.py
# ---------------- MODEL ARCHITECTURE ---------------- # class UNetWithDropout(nn.Module): """ Custom UNet with dropout for regularization - matches training architecture exactly """ def __init__(self, encoder_name, encoder_weights, classes=1, dropout_rate=0.2): super().__init__() self.base_model = smp.Unet( encoder_name=encoder_name, encoder_weights=None, classes=classes, activation=None, decoder_use_batchnorm=True, decoder_attention_type=None ) self.dropout = nn.Dropout2d(p=dropout_rate) def forward(self, x): x = self.base_model(x) if self.training: x = self.dropout(x) return x
07

Runtime Environment & Core Imports

Initializes the Python execution environment, handles UTF-8 stream buffering, and imports critical geospatial and deep learning libraries.

huggingface.py
# ---------------- MODEL LOADING ---------------- # def download_model_from_hf(): """Download model from Hugging Face Hub if not exists""" if not os.path.exists(MODEL_PATH): log_with_flush("⬇️ Downloading model from Hugging Face Hub...") if not HF_TOKEN: log_with_flush("⚠️ HF_TOKEN not found. Trying without authentication...") try: from huggingface_hub import hf_hub_download model_path = hf_hub_download( repo_id=MODEL_REPO, filename=MODEL_FILENAME, token=HF_TOKEN if HF_TOKEN else None ) shutil.copy(model_path, MODEL_PATH) log_with_flush(f"✅ Model downloaded from HF Hub: {MODEL_PATH}") return True except Exception as e: log_with_flush(f"⚠️ Failed to download from HF Hub: {e}") try: headers = {} if HF_TOKEN: headers["Authorization"] = f"Bearer {HF_TOKEN}" url = f"https://huggingface.co/{MODEL_REPO}/resolve/main/{MODEL_FILENAME}" response = requests.get(url, headers=headers, stream=True) if response.status_code == 200: with open(MODEL_PATH, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) log_with_flush(f"✅ Model downloaded via direct link: {MODEL_PATH}") return True else: raise Exception(f"HTTP {response.status_code}") except Exception as e2: log_with_flush(f"❌ All download methods failed: {e2}") return False return True
08

Runtime Environment & Core Imports

Initializes the Python execution environment, handles UTF-8 stream buffering, and imports critical geospatial and deep learning libraries.

huggingface.py
def load_model(): """Load the model once at startup""" if not download_model_from_hf(): log_with_flush("❌ Failed to download model") return None, None, 0.5 log_with_flush("⚙️ Loading preprocessing function...") preprocessing_fn = get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS) log_with_flush("🧠 Loading model architecture with dropout...") model = UNetWithDropout( encoder_name=ENCODER, encoder_weights=None, classes=1, dropout_rate=0.2 ) best_threshold = 0.5 if os.path.exists(MODEL_PATH): try: log_with_flush(f"📂 Loading checkpoint from: {MODEL_PATH}") checkpoint = torch.load(MODEL_PATH, map_location=DEVICE, weights_only=False) if isinstance(checkpoint, dict): log_with_flush("📦 Checkpoint is a dictionary with keys: " + ", ".join(checkpoint.keys())) if 'best_threshold' in checkpoint: best_threshold = checkpoint['best_threshold'] log_with_flush(f"📊 Using best threshold from training: {best_threshold:.2f}") elif 'best_th' in checkpoint: best_threshold = checkpoint['best_th'] log_with_flush(f"📊 Using best threshold from training: {best_threshold:.2f}") if 'model_state' in checkpoint: try: model.load_state_dict(checkpoint['model_state']) log_with_flush("✅ Model weights loaded successfully from 'model_state'") except Exception as e: log_with_flush(f"⚠️ Error loading from 'model_state': {e}") model.load_state_dict(checkpoint['model_state'], strict=False) log_with_flush("✅ Model weights loaded from 'model_state' with strict=False") if 'best_iou' in checkpoint: log_with_flush(f"📊 Best IoU from training: {checkpoint['best_iou']:.4f}") else: try: model.load_state_dict(checkpoint) log_with_flush("✅ Model weights loaded successfully (direct state dict)") except Exception as e: log_with_flush(f"⚠️ Error loading direct state dict: {e}") try: model.base_model.load_state_dict(checkpoint) log_with_flush("✅ Model weights loaded into base_model") except Exception as e2: log_with_flush(f"⚠️ Error loading into base_model: {e2}") model.load_state_dict(checkpoint, strict=False) log_with_flush("✅ Model weights loaded with strict=False") else: log_with_flush("📦 Checkpoint is a direct state dict") try: model.load_state_dict(checkpoint) log_with_flush("✅ Model weights loaded successfully") except Exception as e: log_with_flush(f"⚠️ Error loading state dict: {e}") try: model.base_model.load_state_dict(checkpoint) log_with_flush("✅ Model weights loaded into base_model") except Exception as e2: log_with_flush(f"⚠️ Error loading into base_model: {e2}") model.load_state_dict(checkpoint, strict=False) log_with_flush("✅ Model weights loaded with strict=False") except Exception as e: log_with_flush(f"❌ Failed to load model: {e}") import traceback traceback.print_exc() return None, None, 0.5 else: log_with_flush("❌ Model file not found after download attempts") return None, None, 0.5 model.to(DEVICE) model.eval() log_with_flush(f"✅ Model ready on {DEVICE} with threshold {best_threshold:.2f}") return model, preprocessing_fn, best_threshold # Load model at startup log_with_flush("🚀 Starting model loading...") model, preprocessing_fn, INFERENCE_THRESHOLD = load_model() if model is None: log_with_flush("❌ CRITICAL: Model loading failed! Check your model file.") else: log_with_flush(f"✅ Model loaded successfully! Using threshold: {INFERENCE_THRESHOLD}")
09

API: Job Orchestration

Polls the MinerAI core backend for active processing tasks assigned to the current researcher identity.

huggingface.py
# ============================================================================ # NEW API FUNCTIONS - Replace Google Drive operations # ============================================================================ def get_user_jobs(researcher_id=1): """Get list of jobs for a user using API""" try: response = requests.get( f"{API_BASE_URL}/jobs/list", headers=API_HEADERS, timeout=30 ) if response.status_code == 200: data = response.json() if data.get('success'): return data.get('jobs', []) log_with_flush(f"⚠️ Failed to get jobs: {response.text}") return [] except Exception as e: log_with_flush(f"⚠️ Error getting jobs: {e}") return []
10

Runtime Environment & Core Imports

Initializes the Python execution environment, handles UTF-8 stream buffering, and imports critical geospatial and deep learning libraries.

huggingface.py
def download_latest_images_as_zip(researcher_id=1, output_dir=None): """Download latest images as ZIP using API""" try: log_with_flush("⬇️ Downloading latest images from API...") response = requests.get( f"{API_BASE_URL}/download/latest", headers=API_HEADERS, timeout=120 ) if response.status_code == 200: if output_dir is None: output_dir = tempfile.mkdtemp() zip_path = os.path.join(output_dir, "latest_images.zip") with open(zip_path, 'wb') as f: f.write(response.content) log_with_flush(f"✅ Downloaded {len(response.content)} bytes to {zip_path}") # Extract ZIP import zipfile extract_dir = os.path.join(output_dir, "extracted") os.makedirs(extract_dir, exist_ok=True) with zipfile.ZipFile(zip_path, 'r') as zipf: zipf.extractall(extract_dir) log_with_flush(f"✅ Extracted to {extract_dir}") # Get list of extracted images images = [] for f in os.listdir(extract_dir): if f.endswith('.jpg') or f.endswith('.png'): images.append(os.path.join(extract_dir, f)) return images, extract_dir else: log_with_flush(f"❌ Failed to download: {response.text}") return [], None except Exception as e: log_with_flush(f"❌ Error downloading images: {e}") return [], None
11

System Component

Core logical unit within the MinerAI infrastructure.

huggingface.py
def get_presigned_urls(job_id=None): """Get presigned URLs for images (for parallel downloading)""" try: params = {} if job_id: params['job_id'] = job_id response = requests.get( f"{API_BASE_URL}/download/urls", headers=API_HEADERS, params=params, timeout=30 ) if response.status_code == 200: data = response.json() if data.get('success'): return data.get('files', []) return [] except Exception as e: log_with_flush(f"⚠️ Error getting presigned URLs: {e}") return []
12

Runtime Environment & Core Imports

Initializes the Python execution environment, handles UTF-8 stream buffering, and imports critical geospatial and deep learning libraries.

huggingface.py
def upload_mask_to_api(mask_image, original_filename=None, tile_id=None, job_id=None): """Upload mask to API with preserved naming convention""" try: # Generate proper mask filename based on original image name if original_filename: mask_filename = generate_mask_filename(original_filename) else: # Fallback to old naming if original filename not provided timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') unique_id = str(uuid.uuid4())[:8] mask_filename = f"mask_{timestamp}_{unique_id}_tile_{tile_id or 'unknown'}.png" # Convert PIL image to bytes if isinstance(mask_image, Image.Image): img_buffer = BytesIO() mask_image.save(img_buffer, format='PNG') img_buffer.seek(0) files = {'mask_image': (mask_filename, img_buffer, 'image/png')} else: files = {'mask_image': (mask_filename, mask_image, 'image/png')} # Prepare form data data = {} if tile_id: data['tile_id'] = tile_id if job_id: data['job_id'] = job_id # Add original filename as metadata (optional, for reference) if original_filename: data['original_filename'] = original_filename response = requests.post( f"{API_BASE_URL}/upload/mask", headers=API_HEADERS, files=files, data=data, timeout=60 ) if response.status_code == 200: result = response.json() if result.get('success'): log_with_flush(f"✅ Mask uploaded: {mask_filename}") return True else: log_with_flush(f"⚠️ Upload failed: {result.get('error', 'Unknown error')}") return False else: log_with_flush(f"⚠️ Upload failed with status {response.status_code}: {response.text}") return False except Exception as e: log_with_flush(f"⚠️ Error uploading mask: {e}") import traceback traceback.print_exc() return False
13

Runtime Environment & Core Imports

Initializes the Python execution environment, handles UTF-8 stream buffering, and imports critical geospatial and deep learning libraries.

huggingface.py
# ============================================================================ # SINGLE IMAGE PROCESSING # ============================================================================ def process_single_image(image, user_id="user"): """Process a single uploaded image and return the mask""" try: if model is None: log_with_flush("❌ Model not loaded!") return Image.fromarray(np.zeros((100, 100), dtype=np.uint8)) # Convert Gradio image to PIL Image if isinstance(image, np.ndarray): image = Image.fromarray(image) log_with_flush(f"🔍 Processing single image for user {user_id}") # Preprocess image image_array = np.array(image) processed_image = preprocessing_fn(image_array).astype('float32') image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0).to(DEVICE) # Run inference with torch.no_grad(): output = model(image_tensor) # Generate mask using the optimal threshold mask = output.squeeze().cpu().numpy() mask = (mask > INFERENCE_THRESHOLD).astype(np.uint8) * 255 # Convert mask to PIL Image for display mask_pil = Image.fromarray(mask) log_with_flush(f"✅ Single image processed successfully") return mask_pil except Exception as e: log_with_flush(f"⚠️ Error processing image: {e}") import traceback traceback.print_exc() return Image.fromarray(np.zeros((100, 100), dtype=np.uint8))
14

System Component

Core logical unit within the MinerAI infrastructure.

huggingface.py
def generate_mask_filename(original_filename): """ Generate mask filename that preserves the original naming convention. Example: Input: researcher_1_job_20260403_095816_599_tile_tile_0ae1abf9__lat_-25.86039_-25.83765__lon_27.98552_28.01027.jpg Output: researcher_1_job_20260403_095816_599_tile_tile_0ae1abf9__lat_-25.86039_-25.83765__lon_27.98552_28.01027_mask.png """ import re # Remove the original extension name_without_ext = re.sub(r'\.(jpg|png|jpeg)$', '', original_filename, flags=re.IGNORECASE) # Check if filename already has _mask suffix if name_without_ext.endswith('_mask'): # Already has mask suffix, just change extension return f"{name_without_ext}.png" else: # Add _mask.png suffix return f"{name_without_ext}_mask.png"
15

Runtime Environment & Core Imports

Initializes the Python execution environment, handles UTF-8 stream buffering, and imports critical geospatial and deep learning libraries.

huggingface.py
# ============================================================================ # INFERENCE FUNCTION USING API (replaces Google Drive) # ============================================================================ def run_inference_for_user(user_id, user_type, email): """ Main inference function using the new API instead of Google Drive """ log_with_flush(f"👤 Inference triggered by {email} (ID={user_id}, Type={user_type})") # Check if model is loaded if model is None: log_with_flush("❌ Model not loaded!") return {"status": "error", "message": "Model not loaded"} researcher_id = int(user_id) if user_id.isdigit() else 1 # Step 1: Get jobs and find the latest log_with_flush("📋 Fetching jobs from API...") jobs = get_user_jobs(researcher_id) if not jobs: log_with_flush("⚠️ No jobs found for this user") return { "status": "completed", "message": "No jobs found", "user_id": researcher_id, "processed": 0, "total": 0 } latest_job = jobs[0] job_id = latest_job['job_id'] log_with_flush(f"📂 Latest job ID: {job_id} with {latest_job['file_count']} images") # Step 2: Download images from the latest job log_with_flush("⬇️ Downloading images from API...") images, temp_dir = download_latest_images_as_zip(researcher_id) if not images: log_with_flush("⚠️ No images downloaded") return { "status": "completed", "message": "No images found", "user_id": researcher_id, "processed": 0, "total": 0 } log_with_flush(f"🖼️ {len(images)} images found. Starting inference with threshold {INFERENCE_THRESHOLD:.2f}...") # Step 3: Process each image processed_count = 0 for idx, image_path in enumerate(images, 1): filename = os.path.basename(image_path) log_with_flush(f"\n🔍 Processing {filename} ({idx}/{len(images)})") try: # Load and process image image = Image.open(image_path).convert("RGB") image_array = np.array(image) processed_image = preprocessing_fn(image_array).astype('float32') image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0).to(DEVICE) # Run inference with torch.no_grad(): output = model(image_tensor) # Generate mask using optimal threshold mask = output.squeeze().cpu().numpy() mask = (mask > INFERENCE_THRESHOLD).astype(np.uint8) * 255 mask_pil = Image.fromarray(mask) # Extract tile_id from filename (if needed) tile_id = extract_tile_id_from_filename(filename) # IMPORTANT: Pass the original filename and job_id if upload_mask_to_api(mask_pil, original_filename=filename, tile_id=tile_id, job_id=job_id): processed_count += 1 expected_mask_name = generate_mask_filename(filename) log_with_flush(f"✅ Processed and uploaded: {filename} -> {expected_mask_name}") else: log_with_flush(f"⚠️ Processed but failed to upload: {filename}") except Exception as e: log_with_flush(f"⚠️ Error processing {filename}: {e}") import traceback traceback.print_exc() # Progress update progress = (idx / len(images)) * 100 log_with_flush(f"📊 Progress: {progress:.1f}%") # Step 4: Cleanup temp directory try: import shutil shutil.rmtree(temp_dir) log_with_flush("🧹 Cleaned up temporary files") except: pass log_with_flush(f"\n🏁 RUN SCAN completed for user {researcher_id}!") log_with_flush(f"✅ Successfully processed {processed_count}/{len(images)} images") log_with_flush(f"🕓 Finished at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") return { "status": "completed", "user_id": researcher_id, "job_id": job_id, "processed": processed_count, "total": len(images), "threshold_used": float(INFERENCE_THRESHOLD) }
16

System Component

Core logical unit within the MinerAI infrastructure.

huggingface.py
def extract_tile_id_from_filename(filename): """Extract tile_id from filename pattern""" import re # Pattern matches tile_tile_xxxxx or just tile_xxxxx match = re.search(r'tile_tile_([a-f0-9]+)', filename) if match: return match.group(1) match = re.search(r'tile_([a-f0-9]+)', filename) if match: return match.group(1) # Extract from the full pattern as fallback return 'unknown'
17

Reactive UI Interface

Constructs a Gradio-based sandbox for real-time model interaction and immediate visualization of segmentation results.

huggingface.py
# ============================================================================ # GRADIO INTERFACE # ============================================================================ def create_gradio_interface(): """Create the Gradio web interface""" with gr.Blocks(theme=gr.themes.Soft(), title="Mine Detection AI") as demo: gr.Markdown("# 🎯 Mine Detection AI") gr.Markdown("Upload images to detect mine areas using AI segmentation") # Show model and API status if model is None: gr.Markdown("⚠️ **WARNING: Model not loaded! Check logs.**") else: gr.Markdown(f"✅ **Model loaded | Threshold: {INFERENCE_THRESHOLD:.2f}**") # Check API status try: response = requests.get(f"{API_BASE_URL}/rate-limit/status", headers=API_HEADERS, timeout=5) if response.status_code == 200: data = response.json() tier = data.get('subscription_tier', 'free') gr.Markdown(f"🔑 **API Status: Connected | Tier: {tier.upper()}**") else: gr.Markdown("⚠️ **API Status: Connection issue**") except: gr.Markdown("⚠️ **API Status: Cannot connect**") with gr.Tab("Single Image"): with gr.Row(): with gr.Column(): single_image_input = gr.Image(label="Upload Image", type="pil") single_user_id = gr.Textbox(label="User ID (optional)", value="user") single_submit_btn = gr.Button("Process Image") with gr.Column(): single_output = gr.Image(label="Detection Mask") single_log = gr.Textbox(label="Processing Log", lines=5) single_submit_btn.click( fn=process_single_image, inputs=[single_image_input, single_user_id], outputs=[single_output] ).then( fn=lambda: f"✅ Processing completed! (Threshold: {INFERENCE_THRESHOLD:.2f})", outputs=[single_log] ) with gr.Tab("API Trigger (Run on Latest Job)"): with gr.Row(): with gr.Column(): api_user_id = gr.Textbox(label="Researcher ID", value="1") api_user_type = gr.Textbox(label="User Type", value="researcher") api_user_email = gr.Textbox(label="Email", value="user@example.com") api_trigger_btn = gr.Button("Run Inference on Latest Job") with gr.Column(): api_log = gr.Textbox(label="Processing Log", lines=15) api_result = gr.JSON(label="Result") def update_log_from_result(result): if result.get('status') == 'error': return f"❌ Error: {result.get('message', 'Unknown error')}" elif result.get('status') == 'completed': return f"✅ Inference completed!\nProcessed: {result.get('processed', 0)}/{result.get('total', 0)} images\nJob ID: {result.get('job_id', 'N/A')}\nThreshold used: {result.get('threshold_used', 0.5):.2f}" else: return f"Status: {result.get('status', 'Unknown')}" api_trigger_btn.click( fn=run_inference_for_user, inputs=[api_user_id, api_user_type, api_user_email], outputs=[api_result] ).then( fn=update_log_from_result, inputs=[api_result], outputs=[api_log] ) with gr.Tab("API Info"): gr.Markdown("### 📊 API Rate Limits") gr.Markdown(f"- **API Base URL**: {API_BASE_URL}") gr.Markdown("- **Free Tier**: 10 req/min, 100 req/hr, 500 req/day") gr.Markdown("- **Pro Tier**: 60 req/min, 1000 req/hr, 10000 req/day") gr.Markdown("- **Enterprise**: 300 req/min, 5000 req/hr, 50000 req/day") def get_rate_limits(): try: response = requests.get(f"{API_BASE_URL}/rate-limit/status", headers=API_HEADERS, timeout=10) if response.status_code == 200: data = response.json() return f"**Current Tier**: {data.get('subscription_tier', 'free').upper()}\n\n**Rate Limits**:\n- {data.get('rate_limits', {}).get('requests_per_minute', 0)} requests/minute\n- {data.get('rate_limits', {}).get('requests_per_hour', 0)} requests/hour\n- {data.get('rate_limits', {}).get('requests_per_day', 0)} requests/day\n- {data.get('rate_limits', {}).get('concurrent_downloads', 0)} concurrent downloads" return "Could not fetch rate limits" except Exception as e: return f"Error: {e}" rate_limit_info = gr.Markdown("Click refresh to load...") refresh_btn = gr.Button("Refresh Rate Limit Info") refresh_btn.click(fn=get_rate_limits, outputs=[rate_limit_info]) gr.Markdown("---") gr.Markdown("### ℹ️ About") gr.Markdown("This AI model detects mine areas in images using semantic segmentation.") gr.Markdown(f"🔧 Powered by MinerAI API") gr.Markdown(f"🎯 Inference threshold: **{INFERENCE_THRESHOLD:.2f}** (optimized from training)") return demo
18

FLASK COMPATIBILITY

Core logical unit within the MinerAI infrastructure.

huggingface.py
# ---------------- FLASK COMPATIBILITY ---------------- # def flask_compatible_inference(user_id, user_type, email): """Wrapper for Flask backend compatibility""" return run_inference_for_user(user_id, user_type, email)
19

Execution Entry Point

Bootstrap logic for launching the standalone pipeline worker.

huggingface.py
# ---------------- MAIN ENTRY POINT ---------------- # if __name__ == "__main__": log_with_flush(f"🔗 Using MinerAI API at: {API_BASE_URL}") # Check API connection try: response = requests.get(f"{API_BASE_URL}/rate-limit/status", headers=API_HEADERS, timeout=10) if response.status_code == 200: data = response.json() log_with_flush(f"✅ API Connected! Tier: {data.get('subscription_tier', 'free')}") else: log_with_flush(f"⚠️ API returned status: {response.status_code}") except Exception as e: log_with_flush(f"⚠️ Cannot connect to API: {e}") log_with_flush("Make sure the API server is running and the URL is correct") # Check if model loaded successfully if model is None: log_with_flush("⚠️ WARNING: Model failed to load! The app may not function correctly.") demo = create_gradio_interface() log_with_flush("🚀 Launching Gradio interface on port 7860...") demo.launch(server_name="0.0.0.0", server_port=7860)

Security & API Integration

Crucial authentication parameters used within the MinerAI ecosystem.

MINERAI_API_KEY & MINERAI_API_SECRET

These credentials authorize the standalone worker to communicate with the MinerAI central hub. They are used to fetch assigned processing jobs, download latest satellite imagery, and push generated segmentation masks back to the database. These should never be hardcoded and must be provided via environment variables for security.

HF_TOKEN

The Hugging Face Read Token allows the worker to automatically synchronize with private or restricted model repositories. This ensures the worker always has access to the latest optimized weight checkpoints (e.g., checkpoint_epoch_10.pth) from the global hub.

API_BASE_URL

The endpoint pointer for the MinerAI API. By default, it points to the production cluster but can be overridden for local development or testing against staging environments.

Community Discussion

Ask questions, share your implementations, and get help from the MinerAI team.

MinerAI Dev Channel
Connecting...