Technical Implementation: Mapping PhD Theses to UN Sustainable Development Goals: A Global Knowledge Analysis
Technical Implementation Details
This project relies on sophisticated data processing pipelines and machine learning models. Here's a detailed breakdown of the technical architecture:
Data Collection and Processing
Thesis Repository Integration
class ThesisRepository:
def __init__(self, connection_params):
self.connection = self._establish_connection(connection_params)
self.metadata_cache = {}
def fetch_theses(self, start_year, end_year, limit=None):
"""Fetch theses within the specified year range"""
query = f"""
SELECT id, title, abstract, year, institution, country,
language, keywords, subject_areas
FROM theses
WHERE year BETWEEN {start_year} AND {end_year}
ORDER BY year DESC
"""
if limit:
query += f" LIMIT {limit}"
return self._execute_query(query)
def _establish_connection(self, params):
# Implementation of database connection logic
pass
def _execute_query(self, query):
# Query execution and result processing
pass
Multilingual Processing Pipeline
To handle theses in multiple languages:
def process_multilingual_corpus(theses_df):
"""Process theses in multiple languages for NLP analysis"""
processed_theses = []
for idx, thesis in theses_df.iterrows():
language = detect_language(thesis['abstract'])
# Handle language-specific preprocessing
if language == 'en':
processed_text = preprocess_english(thesis['abstract'])
elif language in SUPPORTED_LANGUAGES:
processed_text = preprocess_non_english(thesis['abstract'], language)
processed_text = translate_to_english(processed_text, language)
else:
# Default to English processing for unsupported languages
processed_text = preprocess_english(thesis['abstract'])
processed_theses.append({
'id': thesis['id'],
'processed_text': processed_text,
'original_language': language,
'metadata': {k: thesis[k] for k in thesis.keys()
if k not in ['id', 'abstract']}
})
return pd.DataFrame(processed_theses)
SDG Classification System
Model Architecture
The classification system uses a hierarchical approach:
class SDGClassifier:
def __init__(self, model_path=None):
self.tokenizer = AutoTokenizer.from_pretrained('allenai/scibert_scivocab_uncased')
if model_path and os.path.exists(model_path):
self.model = AutoModelForSequenceClassification.from_pretrained(model_path)
else:
# Initialize new model
self.model = AutoModelForSequenceClassification.from_pretrained(
'allenai/scibert_scivocab_uncased',
num_labels=17 # 17 SDGs
)
self.sdg_descriptions = self._load_sdg_descriptions()
self.label2sdg = {i: i+1 for i in range(17)} # Map model output to SDG numbers
def classify_thesis(self, thesis_text, threshold=0.5):
"""Classify thesis text into relevant SDGs"""
inputs = self.tokenizer(
thesis_text,
padding='max_length',
truncation=True,
max_length=512,
return_tensors="pt"
)
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
# Multi-label classification
probs = torch.sigmoid(logits).numpy()[0]
# Get SDGs above threshold
relevant_sdgs = [
{"sdg": self.label2sdg[i], "score": float(prob)}
for i, prob in enumerate(probs) if prob > threshold
]
return relevant_sdgs
def _load_sdg_descriptions(self):
# Load detailed SDG descriptions for zero-shot learning
pass
Training and Validation Process
def train_sdg_classifier(labeled_dataset, validation_split=0.2, epochs=5):
"""Train the SDG classifier model"""
# Split dataset
train_df, val_df = train_test_split(
labeled_dataset, test_size=validation_split, random_state=42
)
# Create datasets
train_dataset = SDGDataset(train_df, tokenizer)
val_dataset = SDGDataset(val_df, tokenizer)
# Training arguments
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=epochs,
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
warmup_steps=500,
weight_decay=0.01,
logging_dir='./logs',
logging_steps=10,
evaluation_strategy="epoch",
)
# Initialize trainer models
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics
)
# Train model
trainer.train()
return trainer.model
Geospatial Analysis
The geospatial analysis system handles mapping and spatial patterns:
class GeospatialAnalyzer:
def __init__(self, world_shapefile_path):
self.world_gdf = gpd.read_file(world_shapefile_path)
self.crs = "EPSG:4326" # WGS 84
def create_sdg_heatmap(self, thesis_data, sdg_number):
"""Create a heatmap for specified SDG"""
# Filter for the specific SDG
sdg_theses = thesis_data[
thesis_data['sdgs'].apply(lambda x: sdg_number in [sdg['sdg'] for sdg in x])
]
# Group by country
country_counts = sdg_theses.groupby('country').size().reset_index(name='count')
# Merge with geographical data
merged_data = self.world_gdf.merge(
country_counts,
left_on='ISO_A3',
right_on='country',
how='left'
)
# Fill NaN values with 0
merged_data['count'] = merged_data['count'].fillna(0)
# Create normalized column for choropleth
if merged_data['count'].max() > 0:
merged_data['normalized'] = merged_data['count'] / merged_data['count'].max()
else:
merged_data['normalized'] = 0
return merged_data
def identify_expertise_clusters(self, thesis_data, sdg_number, threshold=0.75):
"""Identify significant clusters of expertise"""
heatmap_data = self.create_sdg_heatmap(thesis_data, sdg_number)
# Identify high-density areas
high_density = heatmap_data[heatmap_data['normalized'] > threshold]
# Calculate spatial clustering
if len(high_density) > 0:
# Create spatial weights matrix
w = libpysal.weights.Queen.from_dataframe(high_density)
# Calculate Local Moran's I
moran_loc = esda.Moran_Local(
high_density['normalized'],
w,
transformation='r',
permutations=99
)
# Identify significant clusters
high_density['cluster_type'] = None
high_density.loc[moran_loc.q==1, 'cluster_type'] = 'high-high'
return high_density[high_density['cluster_type'] == 'high-high']
else:
return gpd.GeoDataFrame(columns=heatmap_data.columns)
Visualization System
The project includes a comprehensive visualization framework:
class SDGVisualization:
def __init__(self, thesis_data, geospatial_analyzer):
self.thesis_data = thesis_data
self.geo_analyzer = geospatial_analyzer
self.sdg_colors = self._define_sdg_colors()
def generate_temporal_chart(self, start_year, end_year, sdgs=None):
"""Generate temporal analysis chart for specified SDGs"""
# Filter thesis data by years
filtered_data = self.thesis_data[
(self.thesis_data['year'] >= start_year) &
(self.thesis_data['year'] <= end_year)
]
# Group by year and SDG
yearly_counts = {}
for year in range(start_year, end_year + 1):
year_data = filtered_data[filtered_data['year'] == year]
sdg_counts = {}
for sdg_num in range(1, 18):
if sdgs and sdg_num not in sdgs:
continue
count = len(year_data[
year_data['sdgs'].apply(
lambda x: sdg_num in [sdg['sdg'] for sdg in x]
)
])
sdg_counts[f'SDG {sdg_num}'] = count
yearly_counts[year] = sdg_counts
# Convert to DataFrame for plotting
temporal_df = pd.DataFrame(yearly_counts).T
return temporal_df
def create_interactive_map(self, sdg_number=None):
"""Create an interactive map visualization"""
if sdg_number:
# Single SDG map
heatmap_data = self.geo_analyzer.create_sdg_heatmap(
self.thesis_data, sdg_number
)
# Create Folium map
m = folium.Map(location=[0, 0], zoom_start=2, tiles='CartoDB positron')
# Add choropleth layer
folium.Choropleth(
geo_data=heatmap_data.__geo_interface__,
name=f'SDG {sdg_number}',
data=heatmap_data,
columns=['ISO_A3', 'normalized'],
key_on='feature.properties.ISO_A3',
fill_color='YlOrRd',
fill_opacity=0.7,
line_opacity=0.2,
legend_name=f'SDG {sdg_number} Research Intensity'
).add_to(m)
# Add expertise clusters
clusters = self.geo_analyzer.identify_expertise_clusters(
self.thesis_data, sdg_number
)
if len(clusters) > 0:
for idx, row in clusters.iterrows():
folium.CircleMarker(
location=[row.geometry.centroid.y, row.geometry.centroid.x],
radius=10,
color='blue',
fill=True,
fill_color='blue',
fill_opacity=0.6,
tooltip=f"Expertise cluster: {row['NAME']}"
).add_to(m)
return m
else:
# Multiple SDG map implementation
pass
def _define_sdg_colors(self):
# Define standard SDG colors
return {
1: '#e5243b', # No Poverty
2: '#DDA63A', # Zero Hunger
3: '#4C9F38', # Good Health and Well-being
# ... additional colors for other SDGs
}
Performance Optimizations
The system includes several performance optimizations for handling large datasets:
-
Batch Processing
def process_large_dataset(dataset_path, batch_size=1000): """Process large datasets in batches""" total_processed = 0 all_results = [] # Process the data in batches for chunk in pd.read_csv(dataset_path, chunksize=batch_size): # Process the chunk results = process_chunk(chunk) all_results.append(results) total_processed += len(chunk) print(f"Processed {total_processed} records") # Combine results combined_results = pd.concat(all_results, ignore_index=True) return combined_results
-
Parallel Classification
def classify_parallel(theses_list, classifier, n_workers=4): """Classify theses in parallel""" with concurrent.futures.ProcessPoolExecutor(max_workers=n_workers) as executor: # Map classification function to each thesis classification_results = list(executor.map( classifier.classify_thesis, [thesis['abstract'] for thesis in theses_list] )) # Combine results with original data for i, thesis in enumerate(theses_list): thesis['sdgs'] = classification_results[i] return theses_list
Future Technical Improvements
-
Enhanced Language Model
- Implementation of domain-specific pre-training
- Fine-tuning with expert-labeled examples
- Integration of SDG target-level classification
-
Visualization Enhancements
- Interactive dashboard with filtering capabilities
- Time-lapse visualization of research evolution
- Comparative analysis tools
-
Scalability Improvements
- Distributed processing for larger datasets
- Optimized storage solutions for full-text analysis
- API development for external data integration
This technical implementation provides a robust foundation for the SDG thesis mapping project while maintaining flexibility for future enhancements and refinements.