Technical Implementation: Mapping PhD Theses to UN Sustainable Development Goals: A Global Knowledge Analysis

Technical Implementation Details

This project relies on sophisticated data processing pipelines and machine learning models. Here's a detailed breakdown of the technical architecture:

Data Collection and Processing

Thesis Repository Integration

class ThesisRepository:
    def __init__(self, connection_params):
        self.connection = self._establish_connection(connection_params)
        self.metadata_cache = {}
        
    def fetch_theses(self, start_year, end_year, limit=None):
        """Fetch theses within the specified year range"""
        query = f"""
            SELECT id, title, abstract, year, institution, country, 
                   language, keywords, subject_areas
            FROM theses 
            WHERE year BETWEEN {start_year} AND {end_year}
            ORDER BY year DESC
        """
        if limit:
            query += f" LIMIT {limit}"
            
        return self._execute_query(query)
        
    def _establish_connection(self, params):
        # Implementation of database connection logic
        pass
        
    def _execute_query(self, query):
        # Query execution and result processing
        pass

Multilingual Processing Pipeline

To handle theses in multiple languages:

def process_multilingual_corpus(theses_df):
    """Process theses in multiple languages for NLP analysis"""
    processed_theses = []
    
    for idx, thesis in theses_df.iterrows():
        language = detect_language(thesis['abstract'])
        
        # Handle language-specific preprocessing
        if language == 'en':
            processed_text = preprocess_english(thesis['abstract'])
        elif language in SUPPORTED_LANGUAGES:
            processed_text = preprocess_non_english(thesis['abstract'], language)
            processed_text = translate_to_english(processed_text, language)
        else:
            # Default to English processing for unsupported languages
            processed_text = preprocess_english(thesis['abstract'])
            
        processed_theses.append({
            'id': thesis['id'],
            'processed_text': processed_text,
            'original_language': language,
            'metadata': {k: thesis[k] for k in thesis.keys() 
                         if k not in ['id', 'abstract']}
        })
    
    return pd.DataFrame(processed_theses)

SDG Classification System

Model Architecture

The classification system uses a hierarchical approach:

class SDGClassifier:
    def __init__(self, model_path=None):
        self.tokenizer = AutoTokenizer.from_pretrained('allenai/scibert_scivocab_uncased')
        
        if model_path and os.path.exists(model_path):
            self.model = AutoModelForSequenceClassification.from_pretrained(model_path)
        else:
            # Initialize new model
            self.model = AutoModelForSequenceClassification.from_pretrained(
                'allenai/scibert_scivocab_uncased', 
                num_labels=17  # 17 SDGs
            )
        
        self.sdg_descriptions = self._load_sdg_descriptions()
        self.label2sdg = {i: i+1 for i in range(17)}  # Map model output to SDG numbers
        
    def classify_thesis(self, thesis_text, threshold=0.5):
        """Classify thesis text into relevant SDGs"""
        inputs = self.tokenizer(
            thesis_text,
            padding='max_length',
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
            
        # Multi-label classification
        probs = torch.sigmoid(logits).numpy()[0]
        
        # Get SDGs above threshold
        relevant_sdgs = [
            {"sdg": self.label2sdg[i], "score": float(prob)}
            for i, prob in enumerate(probs) if prob > threshold
        ]
        
        return relevant_sdgs
        
    def _load_sdg_descriptions(self):
        # Load detailed SDG descriptions for zero-shot learning
        pass

Training and Validation Process

def train_sdg_classifier(labeled_dataset, validation_split=0.2, epochs=5):
    """Train the SDG classifier model"""
    # Split dataset
    train_df, val_df = train_test_split(
        labeled_dataset, test_size=validation_split, random_state=42
    )
    
    # Create datasets
    train_dataset = SDGDataset(train_df, tokenizer)
    val_dataset = SDGDataset(val_df, tokenizer)
    
    # Training arguments
    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=epochs,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=16,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch",
    )
    
    # Initialize trainer models
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics
    )
    
    # Train model
    trainer.train()
    
    return trainer.model

Geospatial Analysis

The geospatial analysis system handles mapping and spatial patterns:

class GeospatialAnalyzer:
    def __init__(self, world_shapefile_path):
        self.world_gdf = gpd.read_file(world_shapefile_path)
        self.crs = "EPSG:4326"  # WGS 84
        
    def create_sdg_heatmap(self, thesis_data, sdg_number):
        """Create a heatmap for specified SDG"""
        # Filter for the specific SDG
        sdg_theses = thesis_data[
            thesis_data['sdgs'].apply(lambda x: sdg_number in [sdg['sdg'] for sdg in x])
        ]
        
        # Group by country
        country_counts = sdg_theses.groupby('country').size().reset_index(name='count')
        
        # Merge with geographical data
        merged_data = self.world_gdf.merge(
            country_counts, 
            left_on='ISO_A3', 
            right_on='country', 
            how='left'
        )
        
        # Fill NaN values with 0
        merged_data['count'] = merged_data['count'].fillna(0)
        
        # Create normalized column for choropleth
        if merged_data['count'].max() > 0:
            merged_data['normalized'] = merged_data['count'] / merged_data['count'].max()
        else:
            merged_data['normalized'] = 0
            
        return merged_data
        
    def identify_expertise_clusters(self, thesis_data, sdg_number, threshold=0.75):
        """Identify significant clusters of expertise"""
        heatmap_data = self.create_sdg_heatmap(thesis_data, sdg_number)
        
        # Identify high-density areas
        high_density = heatmap_data[heatmap_data['normalized'] > threshold]
        
        # Calculate spatial clustering
        if len(high_density) > 0:
            # Create spatial weights matrix
            w = libpysal.weights.Queen.from_dataframe(high_density)
            
            # Calculate Local Moran's I
            moran_loc = esda.Moran_Local(
                high_density['normalized'],
                w,
                transformation='r',
                permutations=99
            )
            
            # Identify significant clusters
            high_density['cluster_type'] = None
            high_density.loc[moran_loc.q==1, 'cluster_type'] = 'high-high'
            
            return high_density[high_density['cluster_type'] == 'high-high']
        else:
            return gpd.GeoDataFrame(columns=heatmap_data.columns)

Visualization System

The project includes a comprehensive visualization framework:

class SDGVisualization:
    def __init__(self, thesis_data, geospatial_analyzer):
        self.thesis_data = thesis_data
        self.geo_analyzer = geospatial_analyzer
        self.sdg_colors = self._define_sdg_colors()
        
    def generate_temporal_chart(self, start_year, end_year, sdgs=None):
        """Generate temporal analysis chart for specified SDGs"""
        # Filter thesis data by years
        filtered_data = self.thesis_data[
            (self.thesis_data['year'] >= start_year) & 
            (self.thesis_data['year'] <= end_year)
        ]
        
        # Group by year and SDG
        yearly_counts = {}
        
        for year in range(start_year, end_year + 1):
            year_data = filtered_data[filtered_data['year'] == year]
            
            sdg_counts = {}
            for sdg_num in range(1, 18):
                if sdgs and sdg_num not in sdgs:
                    continue
                    
                count = len(year_data[
                    year_data['sdgs'].apply(
                        lambda x: sdg_num in [sdg['sdg'] for sdg in x]
                    )
                ])
                sdg_counts[f'SDG {sdg_num}'] = count
                
            yearly_counts[year] = sdg_counts
            
        # Convert to DataFrame for plotting
        temporal_df = pd.DataFrame(yearly_counts).T
        
        return temporal_df
        
    def create_interactive_map(self, sdg_number=None):
        """Create an interactive map visualization"""
        if sdg_number:
            # Single SDG map
            heatmap_data = self.geo_analyzer.create_sdg_heatmap(
                self.thesis_data, sdg_number
            )
            
            # Create Folium map
            m = folium.Map(location=[0, 0], zoom_start=2, tiles='CartoDB positron')
            
            # Add choropleth layer
            folium.Choropleth(
                geo_data=heatmap_data.__geo_interface__,
                name=f'SDG {sdg_number}',
                data=heatmap_data,
                columns=['ISO_A3', 'normalized'],
                key_on='feature.properties.ISO_A3',
                fill_color='YlOrRd',
                fill_opacity=0.7,
                line_opacity=0.2,
                legend_name=f'SDG {sdg_number} Research Intensity'
            ).add_to(m)
            
            # Add expertise clusters
            clusters = self.geo_analyzer.identify_expertise_clusters(
                self.thesis_data, sdg_number
            )
            
            if len(clusters) > 0:
                for idx, row in clusters.iterrows():
                    folium.CircleMarker(
                        location=[row.geometry.centroid.y, row.geometry.centroid.x],
                        radius=10,
                        color='blue',
                        fill=True,
                        fill_color='blue',
                        fill_opacity=0.6,
                        tooltip=f"Expertise cluster: {row['NAME']}"
                    ).add_to(m)
            
            return m
        else:
            # Multiple SDG map implementation
            pass
    
    def _define_sdg_colors(self):
        # Define standard SDG colors
        return {
            1: '#e5243b',  # No Poverty
            2: '#DDA63A',  # Zero Hunger
            3: '#4C9F38',  # Good Health and Well-being
            # ... additional colors for other SDGs
        }

Performance Optimizations

The system includes several performance optimizations for handling large datasets:

  1. Batch Processing

    def process_large_dataset(dataset_path, batch_size=1000):
        """Process large datasets in batches"""
        total_processed = 0
        all_results = []
        
        # Process the data in batches
        for chunk in pd.read_csv(dataset_path, chunksize=batch_size):
            # Process the chunk
            results = process_chunk(chunk)
            all_results.append(results)
            
            total_processed += len(chunk)
            print(f"Processed {total_processed} records")
        
        # Combine results
        combined_results = pd.concat(all_results, ignore_index=True)
        return combined_results
    
  2. Parallel Classification

    def classify_parallel(theses_list, classifier, n_workers=4):
        """Classify theses in parallel"""
        with concurrent.futures.ProcessPoolExecutor(max_workers=n_workers) as executor:
            # Map classification function to each thesis
            classification_results = list(executor.map(
                classifier.classify_thesis,
                [thesis['abstract'] for thesis in theses_list]
            ))
            
        # Combine results with original data
        for i, thesis in enumerate(theses_list):
            thesis['sdgs'] = classification_results[i]
            
        return theses_list
    

Future Technical Improvements

  1. Enhanced Language Model

    • Implementation of domain-specific pre-training
    • Fine-tuning with expert-labeled examples
    • Integration of SDG target-level classification
  2. Visualization Enhancements

    • Interactive dashboard with filtering capabilities
    • Time-lapse visualization of research evolution
    • Comparative analysis tools
  3. Scalability Improvements

    • Distributed processing for larger datasets
    • Optimized storage solutions for full-text analysis
    • API development for external data integration

This technical implementation provides a robust foundation for the SDG thesis mapping project while maintaining flexibility for future enhancements and refinements.