# -*- coding: utf-8 -*-
"""
IPM Empirical Validation – Regularity R1 (C > 0) – Versão rápida
Domínios: SOI, VIX, terremotos, ECG, EEG, manchas solares
Apenas cálculos de C e p-valor (sem loops pesados)
"""
import numpy as np
import pandas as pd
import warnings
import requests
import urllib.request
import os
from scipy.signal import find_peaks
from datetime import datetime
import time
warnings.filterwarnings('ignore')
# Instala pacotes necessários (se não tiver)
!pip install -q yfinance scikit-learn wfdb
# ==================================================
# Função C discreta (estável)
# ==================================================
def compute_C_discrete(tau, bins='auto'):
if len(tau) < 3:
return np.nan
if bins == 'auto':
bins = max(2, int(np.ceil(np.log2(len(tau)) + 1)))
tau_clipped = np.clip(tau, np.percentile(tau, 1), np.percentile(tau, 99))
try:
tau_disc = np.digitize(tau, bins=np.percentile(tau_clipped, np.linspace(0, 100, bins+1)[1:-1]))
except:
tau_disc = np.digitize(tau, bins=np.linspace(tau.min(), tau.max(), bins+1)[1:-1])
probs = np.bincount(tau_disc) / len(tau_disc)
probs = probs[probs > 0]
if len(probs) == 0:
return np.nan
H_marg = -np.sum(probs * np.log2(probs))
pairs = list(zip(tau_disc[:-1], tau_disc[1:]))
if len(pairs) == 0:
return np.nan
cond_entropy = 0.0
total = len(pairs)
for x in np.unique(tau_disc):
y_vals = [p[1] for p in pairs if p[0] == x]
if len(y_vals) == 0:
continue
probs_y = np.bincount(y_vals) / len(y_vals)
probs_y = probs_y[probs_y > 0]
H_cond_x = -np.sum(probs_y * np.log2(probs_y))
cond_entropy += H_cond_x * (len(y_vals) / total)
return H_marg - cond_entropy
# ==================================================
# Surrogate test (block bootstrap)
# ==================================================
def block_bootstrap_surrogates(tau, block_size=5, n_surr=30):
if len(tau) < block_size*2:
return []
n = len(tau)
surrogates = []
for _ in range(n_surr):
idx = np.arange(n)
np.random.shuffle(idx)
tau_surr = tau[idx].copy()
surrogates.append(tau_surr)
return surrogates
def test_C_significance(tau, n_surr=30):
C_orig = compute_C_discrete(tau)
if np.isnan(C_orig) or len(tau) < 8:
return np.nan, np.nan
surrogates = block_bootstrap_surrogates(tau, block_size=5, n_surr=n_surr)
C_surr = []
for s in surrogates:
Cs = compute_C_discrete(s)
if not np.isnan(Cs):
C_surr.append(Cs)
if len(C_surr) < 5:
return C_orig, np.nan
p = np.mean(np.array(C_surr) >= C_orig)
return C_orig, p
# ==================================================
# 1. SOI
# ==================================================
def get_soi():
url = "https://psl.noaa.gov/data/correlation/soi.data"
resp = requests.get(url, timeout=20)
lines = resp.text.split('\n')[1:]
records = []
for line in lines:
if not line.strip(): continue
parts = line.split()
if len(parts) >= 13:
for i in range(1,13):
val = parts[i]
if val not in ('-99.99','-9.99'):
records.append([int(parts[0]), i, float(val)])
df = pd.DataFrame(records, columns=['year','month','soi'])
df['date'] = pd.to_datetime(df['year'].astype(str)+'-'+df['month'].astype(str)+'-01')
return df.set_index('date')['soi']
print("\n1. CLIMATE: SOI")
soi = get_soi()
split = int(len(soi)*0.8)
test = soi.iloc[split:]
events = test[test < -1.0]
tau = (events.index[1:] - events.index[:-1]).days.values if len(events)>=3 else []
if len(tau)>=8:
C, p = test_C_significance(tau)
print(f"Events: {len(events)} | C = {C:.4f}, p = {p:.3f}")
else:
C, p = np.nan, np.nan
print(f"Insufficient events: {len(events)}")
results = [("Climate (SOI)", C, p, len(events))]
# ==================================================
# 2. VIX
# ==================================================
import yfinance as yf
print("\n2. FINANCE: VIX")
vix = yf.download("^VIX", start="1990-01-01", end="2026-06-13", progress=False)['Close'].dropna()
split = int(len(vix)*0.8)
test = vix.iloc[split:]
events = test[test > 30.0]
tau = (events.index[1:] - events.index[:-1]).days.values if len(events)>=3 else []
if len(tau)>=8:
C, p = test_C_significance(tau)
print(f"Events: {len(events)} | C = {C:.4f}, p = {p:.3f}")
else:
C, p = np.nan, np.nan
print(f"Insufficient events: {len(events)}")
results.append(("Finance (VIX)", C, p, len(events)))
# ==================================================
# 3. Earthquakes M≥6.5
# ==================================================
print("\n3. SEISMOLOGY: Earthquakes M≥6.5")
url_eq = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime=1990-01-01&endtime=2026-06-13&minmagnitude=6.5&limit=5000"
resp = requests.get(url_eq, timeout=20)
data = resp.json()
mags = [f['properties']['mag'] for f in data['features'] if f['properties']['mag'] is not None]
times = [datetime.fromtimestamp(f['properties']['time']/1000) for f in data['features']]
eq = pd.Series(mags, index=times, name='mag').sort_index()
split = int(len(eq)*0.8)
test = eq.iloc[split:]
tau = (test.index[1:] - test.index[:-1]).total_seconds().values
if len(tau)>=8:
C, p = test_C_significance(tau)
print(f"Intervals: {len(tau)} | C = {C:.4f}, p = {p:.3f}")
else:
C, p = np.nan, np.nan
print(f"Insufficient intervals: {len(tau)}")
results.append(("Seismology (M≥6.5)", C, p, len(tau)))
# ==================================================
# 4. ECG (MIT-BIH)
# ==================================================
print("\n4. HUMAN BIOLOGY: ECG")
try:
import wfdb
for f in ['100.dat', '100.hea']:
if not os.path.exists(f):
urllib.request.urlretrieve(f"https://physionet.org/files/mitdb/1.0.0/{f}", f)
record = wfdb.rdrecord('100', sampto=30000)
sig = record.p_signal[:, 0]
fs = record.fs
peaks, _ = find_peaks(sig, height=np.percentile(sig, 80), distance=int(fs*0.4))
rr = np.diff(peaks) / fs
split = int(len(rr)*0.8)
test_rr = rr[split:]
mean_rr = np.mean(rr[:split])
std_rr = np.std(rr[:split])
events_idx = np.where(np.abs(test_rr - mean_rr) > 0.8 * std_rr)[0]
tau = np.diff(events_idx) if len(events_idx) > 1 else []
print(f"RR deviations: {len(events_idx)}")
if len(tau) >= 8:
C, p = test_C_significance(tau)
print(f"C = {C:.4f}, p = {p:.3f}")
else:
C, p = np.nan, np.nan
print(f"Insufficient intervals: {len(tau)}")
except Exception as e:
print(f"ECG error: {e}")
C, p = np.nan, np.nan
results.append(("Human biology (ECG)", C, p, len(tau) if 'tau' in locals() else 0))
# ==================================================
# 5. EEG (UCI)
# ==================================================
print("\n5. NEUROSCIENCE: EEG")
try:
from sklearn.datasets import fetch_openml
eeg_data = fetch_openml('eeg-eye-state', version=1, parser='pandas')
eeg = eeg_data.data.iloc[:, 0].values
sfreq = 250
peaks, _ = find_peaks(np.abs(eeg), height=np.percentile(np.abs(eeg), 70), distance=int(sfreq*0.2))
if len(peaks) > 10:
tau = np.diff(peaks) / sfreq
split = int(len(tau)*0.8)
test_tau = tau[split:]
if len(test_tau) >= 8:
C, p = test_C_significance(test_tau)
print(f"Intervals: {len(test_tau)} | C = {C:.4f}, p = {p:.3f}")
else:
C, p = np.nan, np.nan
print(f"Insufficient intervals: {len(test_tau)}")
else:
C, p = np.nan, np.nan
print("Insufficient peaks")
except Exception as e:
print(f"EEG error: {e}")
C, p = np.nan, np.nan
results.append(("Neuroscience (EEG)", C, p, len(test_tau) if 'test_tau' in locals() else 0))
# ==================================================
# 6. Sunspots
# ==================================================
print("\n6. ASTRONOMY: Sunspots")
url_sun = "https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json"
try:
resp = requests.get(url_sun, timeout=20)
data = resp.json()
dates, sun = [], []
for entry in data:
if 'time-tag' in entry and 'ssn' in entry and entry['ssn'] is not None:
dates.append(pd.to_datetime(entry['time-tag']))
sun.append(float(entry['ssn']))
series = pd.Series(sun, index=dates).dropna().sort_index()
series = series[series.index <= '2026-06-13']
split = int(len(series)*0.8)
test = series.iloc[split:]
events = test[test > 100]
tau = (events.index[1:] - events.index[:-1]).days.values if len(events)>=3 else []
if len(tau) >= 8:
C, p = test_C_significance(tau)
print(f"Events: {len(events)} | C = {C:.4f}, p = {p:.3f}")
else:
C, p = np.nan, np.nan
print(f"Insufficient events: {len(events)}")
except Exception as e:
print(f"Sunspot error: {e}")
C, p = np.nan, np.nan
results.append(("Astronomy (sunspots)", C, p, len(events) if 'events' in locals() else 0))
# ==================================================
# Final table
# ==================================================
print("\n" + "="*70)
print("EMPIRICAL VALIDATION OF IPM – REGULARITY R1")
print("="*70)
print("\n{:<30} {:>8} {:>12} {:>10} {:>6} {:>8}".format(
"Domain", "C", "p-value", "Events", "C>0?", "p<0.05?"))
print("-"*76)
for name, C, p, ev in results:
if np.isnan(C):
c_str = "NaN"
p_str = "NaN"
c_pos = "?"
p_sig = "?"
else:
c_str = f"{C:.4f}"
p_str = f"{p:.3f}"
c_pos = "✓" if C > 0 else "✗"
p_sig = "✓" if (p < 0.05 and not np.isnan(p)) else "✗"
print("{:<30} {:>8} {:>12} {:>10} {:>6} {:>8}".format(
name, c_str, p_str, ev, c_pos, p_sig))
print("\nInterpretation:")
print("- C > 0 indicates structural memory (non-Markovianity).")
print("- p < 0.05 indicates memory not explainable by linear surrogates (nonlinearity).")
print("Regularity R1 is supported if C > 0 is observed.\n")