Time-Series Analysis¶
ParquetFrame provides comprehensive time-series analysis capabilities through the .ts accessor. This guide covers datetime handling, resampling, rolling operations, and advanced time-series workflows.
Getting Started¶
Automatic Datetime Detection¶
ParquetFrame can automatically detect datetime columns in your data:
import parquetframe as pf
# Load data with potential datetime columns
df = pf.read("sensor_data.csv")
# Detect all datetime columns
datetime_columns = df.ts.detect_datetime_columns()
print(f"Found datetime columns: {datetime_columns}")
The detection supports various formats:
- ISO 8601: 2023-01-01, 2023-01-01T10:30:00, 2023-01-01 10:30:00+00:00
- US Format: 01/15/2023, 1/15/2023 2:30 PM
- European Format: 15/01/2023, 15.01.2023 14:30
- Timestamp: Unix timestamps (seconds or milliseconds)
Manual Datetime Parsing¶
For specific datetime formats, use manual parsing:
# Parse with specific format
df_parsed = df.ts.parse_datetime('date_string', format='%Y-%m-%d %H:%M:%S')
# Parse with automatic inference
df_inferred = df.ts.parse_datetime('timestamp_col', infer=True)
# Parse in-place to modify original DataFrame
df.ts.parse_datetime('date_col', inplace=True)
Resampling Operations¶
Resampling changes the frequency of your time-series data, useful for aggregating high-frequency data or filling gaps in low-frequency data.
Basic Resampling¶
# Load time-series data
sales_data = pf.read("daily_sales.csv")
# Resample to weekly averages
weekly_avg = sales_data.ts.resample('1W').mean()
# Resample to monthly totals
monthly_total = sales_data.ts.resample('1M').sum()
# Resample to quarterly statistics
quarterly_stats = sales_data.ts.resample('1Q').agg(['mean', 'std', 'count'])
Frequency Aliases¶
ParquetFrame supports pandas frequency aliases:
| Alias | Description | Example |
|---|---|---|
S |
Seconds | 30S (30 seconds) |
T, min |
Minutes | 15T (15 minutes) |
H |
Hours | 2H (2 hours) |
D |
Days | 1D (daily) |
W |
Weeks | 2W (bi-weekly) |
M |
Month end | 1M (monthly) |
MS |
Month start | 1MS (month start) |
Q |
Quarter end | 1Q (quarterly) |
Y |
Year end | 1Y (yearly) |
Advanced Resampling¶
# Resample with custom aggregations
custom_agg = sales_data.ts.resample('1W').agg({
'sales': ['sum', 'mean', 'std'],
'profit': ['sum', 'mean'],
'customers': 'nunique' # Count unique customers per week
})
# Resample with multiple methods
multi_resample = (sales_data.ts.resample('1M')
.agg(['sum', 'mean', 'median', 'std', 'min', 'max']))
Rolling Window Operations¶
Rolling operations calculate statistics over a moving window of your time-series data.
Basic Rolling Operations¶
# 7-day rolling average (smoothing)
rolling_avg = sales_data.ts.rolling(7).mean()
# 30-day rolling standard deviation (volatility)
rolling_vol = sales_data.ts.rolling(30).std()
# 14-day rolling sum
rolling_sum = sales_data.ts.rolling(14).sum()
# Rolling minimum and maximum
rolling_range = sales_data.ts.rolling(21).agg(['min', 'max'])
Time-Based Windows¶
Use time-based windows for irregular time series:
# 7-day time window (regardless of data frequency)
time_rolling = sales_data.ts.rolling('7D').mean()
# 30-day rolling statistics
monthly_rolling = sales_data.ts.rolling('30D').agg(['mean', 'std'])
# Business day rolling (excludes weekends)
business_rolling = sales_data.ts.rolling('5B').mean()
Advanced Rolling Operations¶
# Custom rolling function
def rolling_sharpe(series, window=30, risk_free_rate=0.02):
"""Calculate rolling Sharpe ratio"""
returns = series.pct_change()
rolling_mean = returns.rolling(window).mean() * 252 # Annualized
rolling_std = returns.rolling(window).std() * np.sqrt(252) # Annualized
return (rolling_mean - risk_free_rate) / rolling_std
# Apply custom rolling function
stock_data = pf.read("stock_prices.csv")
sharpe_ratios = stock_data.ts.rolling(30).apply(lambda x: rolling_sharpe(x))
Time-Based Filtering¶
Filter your time-series data based on time of day, day of week, or specific time ranges.
Time of Day Filtering¶
# Filter business hours (9 AM to 5 PM)
business_hours = sales_data.ts.between_time('09:00', '17:00')
# Filter lunch hours
lunch_time = sales_data.ts.between_time('12:00', '14:00')
# Filter specific time points
opening_bell = stock_data.ts.at_time('09:30') # Market open
closing_bell = stock_data.ts.at_time('16:00') # Market close
Advanced Time Filtering¶
# Combine with other operations
weekend_patterns = (sales_data
.ts.between_time('10:00', '18:00') # Business hours
.query("dayofweek >= 5") # Weekends only
.ts.resample('1H') # Hourly aggregation
.mean())
# Morning vs evening comparison
morning_sales = sales_data.ts.between_time('06:00', '12:00')
evening_sales = sales_data.ts.between_time('18:00', '23:00')
morning_avg = morning_sales.groupby('day_of_week')['sales'].mean()
evening_avg = evening_sales.groupby('day_of_week')['sales'].mean()
Lag and Lead Operations¶
Create lagged or leading versions of your time series for analysis and modeling.
Basic Shift Operations¶
# Create 1-period lag
lagged_sales = sales_data.ts.lag(1)
# Create 2-period lead (future values)
leading_sales = sales_data.ts.lead(2)
# Manual shift (positive = forward, negative = backward)
shifted_back = sales_data.ts.shift(-5) # 5 periods back
shifted_forward = sales_data.ts.shift(3) # 3 periods forward
Multiple Lags for Modeling¶
# Create multiple lags for time series modeling
def create_lags(df, column, max_lags=7):
"""Create multiple lag features for modeling"""
result_df = df.copy()
for lag in range(1, max_lags + 1):
lag_col = f"{column}_lag_{lag}"
result_df[lag_col] = df.ts.lag(lag).pandas_df[column]
return result_df
# Create lagged features
modeling_data = create_lags(sales_data, 'sales', max_lags=7)
# Remove rows with NaN values (due to lagging)
clean_data = modeling_data.dropna()
Lead-Lag Analysis¶
# Compare current values with future and past values
analysis_df = sales_data.copy()
# Add lag and lead columns
analysis_df['sales_lag_1'] = sales_data.ts.lag(1).pandas_df['sales']
analysis_df['sales_lead_1'] = sales_data.ts.lead(1).pandas_df['sales']
# Calculate momentum indicators
analysis_df['momentum'] = (analysis_df['sales'] - analysis_df['sales_lag_1']) / analysis_df['sales_lag_1']
analysis_df['future_change'] = (analysis_df['sales_lead_1'] - analysis_df['sales']) / analysis_df['sales']
# Analyze relationships
correlation = analysis_df[['momentum', 'future_change']].corr()
print(f"Momentum-Future Change Correlation: {correlation.iloc[0, 1]:.3f}")
Advanced Time-Series Workflows¶
Seasonal Analysis¶
# Decompose time series into trend, seasonal, and residual components
import numpy as np
def seasonal_decompose_simple(df, column, period=7):
"""Simple seasonal decomposition"""
# Calculate moving average (trend)
trend = df.ts.rolling(period).mean().pandas_df[column]
# Remove trend to get seasonal + noise
detrended = df.pandas_df[column] - trend
# Calculate seasonal component
seasonal_means = detrended.groupby(df.pandas_df.index.dayofweek).mean()
seasonal = df.pandas_df.index.dayofweek.map(seasonal_means)
# Calculate residuals
residual = detrended - seasonal
return {
'original': df.pandas_df[column],
'trend': trend,
'seasonal': seasonal,
'residual': residual
}
# Apply seasonal decomposition
decomposition = seasonal_decompose_simple(sales_data, 'sales')
Anomaly Detection in Time Series¶
# Time-series anomaly detection using rolling statistics
def detect_time_series_anomalies(df, column, window=30, threshold=3):
"""Detect anomalies using rolling Z-score"""
# Calculate rolling mean and std
rolling_mean = df.ts.rolling(window).mean().pandas_df[column]
rolling_std = df.ts.rolling(window).std().pandas_df[column]
# Calculate Z-score
z_scores = np.abs((df.pandas_df[column] - rolling_mean) / rolling_std)
# Mark anomalies
anomalies = z_scores > threshold
return {
'anomalies': anomalies,
'z_scores': z_scores,
'anomaly_values': df.pandas_df[column][anomalies]
}
# Detect anomalies
anomaly_results = detect_time_series_anomalies(sales_data, 'sales')
print(f"Found {anomaly_results['anomalies'].sum()} anomalies")
Multi-Series Analysis¶
# Analyze multiple time series together
def correlation_analysis(df, date_col, value_cols, freq='1D'):
"""Analyze correlations between multiple time series"""
# Ensure datetime index
df_indexed = df.set_index(date_col)
results = {}
# Resample all series to same frequency
for col in value_cols:
resampled = df_indexed[col].resample(freq).mean()
results[col] = resampled
# Create correlation matrix
combined_df = pd.DataFrame(results)
correlation_matrix = combined_df.corr()
return correlation_matrix, combined_df
# Example with multiple metrics
metrics = ['sales', 'profit', 'customers', 'advertising_spend']
corr_matrix, daily_data = correlation_analysis(
sales_data.pandas_df, 'date', metrics, '1D'
)
print("Cross-correlation matrix:")
print(corr_matrix)
Performance Optimization¶
Dask Integration¶
For large time-series datasets, ParquetFrame automatically uses Dask:
# Large time-series dataset
large_ts = pf.read("large_timeseries.parquet", islazy=True)
# Operations are automatically optimized for Dask
daily_avg = large_ts.ts.resample('1D').mean() # Computed in parallel
rolling_stats = large_ts.ts.rolling(30).mean() # Memory-efficient rolling
# Convert to pandas only when needed
final_result = rolling_stats.to_pandas()
Memory-Efficient Operations¶
# Chain operations to minimize memory usage
efficient_pipeline = (pf.read("huge_sensor_data.parquet")
.query("sensor_id == 'TEMP_01'") # Filter early
.ts.resample('1H') # Reduce frequency
.mean() # Aggregate
.ts.rolling(24) # 24-hour rolling
.mean() # Smooth
.save("processed_temp_data.parquet")) # Save result
CLI Integration¶
All time-series operations are available through the command line:
# Detect datetime columns
pframe timeseries sensor_data.csv --detect-datetime
# Resample to hourly averages
pframe timeseries stock_prices.parquet --resample 1H --agg mean --output hourly_prices.parquet
# Apply 7-day rolling average
pframe timeseries sales_daily.csv --rolling 7 --agg mean --datetime-col date
# Filter business hours
pframe timeseries trading_data.json --between-time 09:30 16:00 --output business_hours.csv
# Create lagged version
pframe timeseries time_series.parquet --shift 1 --output lagged_series.parquet
# Combine operations with output
pframe timeseries raw_data.csv --resample 1D --agg mean --rolling 7 --output smoothed_daily.parquet
Integration with Other Features¶
SQL Integration¶
Combine time-series operations with SQL queries:
# Time-series processing with SQL analysis
ts_processed = (sales_data
.ts.resample('1W')
.mean()
.sql("""
SELECT
strftime('%Y-%m', date) as year_month,
AVG(sales) as avg_weekly_sales,
STDDEV(sales) as sales_volatility
FROM df
GROUP BY year_month
ORDER BY year_month
"""))
Statistical Analysis Integration¶
# Combine with statistical analysis
resampled_data = sales_data.ts.resample('1D').mean()
# Detect outliers in resampled data
outliers = resampled_data.stats.detect_outliers('sales', method='iqr')
# Analyze outlier patterns by day of week
outlier_analysis = outliers.sql("""
SELECT
strftime('%w', date) as day_of_week,
COUNT(*) as total_days,
SUM(CAST(sales_outlier_iqr as INTEGER)) as outlier_days,
AVG(sales) as avg_sales
FROM df
GROUP BY day_of_week
ORDER BY day_of_week
""")
This comprehensive time-series functionality makes ParquetFrame ideal for temporal data analysis, from simple resampling to complex multi-series analysis and anomaly detection.