Python para Análise de Dados em Tempo Real
A análise de dados em tempo real tornou-se um componente crítico para empresas que precisam tomar decisões rápidas baseadas em informações atualizadas. O Python estabeleceu-se como uma das linguagens mais poderosas para implementar soluções de processamento de dados em tempo real, combinando facilidade de uso com bibliotecas robustas. Este artigo explora como utilizar Python para análise em tempo real, focando nas principais tecnologias e técnicas.
O Python consolidou sua posição como linguagem preferida para análise de dados em tempo real por diversas razões:
Essas vantagens vem se tornando ainda mais evidentes, com o ecossistema Python para análise em tempo real evoluindo para atender às crescentes demandas de velocidade e volume de dados.
Antes de mergulharmos nas ferramentas específicas, é importante entender as arquiteturas comuns para análise de dados em tempo real com Python:
A arquitetura Lambda combina processamento em lote e em tempo real. Útil quando preciso de dados imediatos e históricos para análise:
[Fontes de Dados] → [Camada de Velocidade (Tempo Real)] → [Camada de Serviço] → [Aplicações] → [Camada de Lote] →
A arquitetura Kappa simplifica o modelo tratando tudo como streams. Ideal quando o foco está totalmente em dados em tempo real e eventos contínuos:
[Fontes de Dados] → [Sistema de Streaming] → [Processamento de Stream] → [Armazenamento] → [Aplicações]
A stack SMACK (Spark, Mesos, Akka, Cassandra, Kafka) tornou-se popular para aplicações de dados em tempo real:
[Kafka (Ingestão)] → [Spark Streaming (Processamento)] → [Cassandra (Armazenamento)] → [Aplicações]
↑
[Mesos/Kubernetes (Orquestração)]
↑
[Akka (Mensageria)]
O ecossistema Python para análise em tempo real oferece diversas ferramentas poderosas. Vamos explorar as mais importantes.
O Apache Kafka continua sendo a plataforma de streaming distribuído mais popular, e sua integração com Python é excelente.
O Apache Kafka é uma das principais opções quando precisamos de uma fila de mensagens distribuída de alta performance. Podemos utilizar a biblioteca confluent-kafka para integrar com Python.
Antes de mostrar o código, é importante entender que estou simulando um cenário de sensores que enviam temperatura e umidade para um tópico Kafka. O produtor envia essas leituras, enquanto o consumidor processa e identifica, por exemplo, alertas de temperatura alta.
# Exemplo de produtor Kafka com confluent-kafka
from confluent_kafka import Producer
import json
import time
import random
# Configuração do produtor
config = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'python-producer'
}
producer = Producer(config)
# Função de callback para confirmação de entrega
def delivery_report(err, msg):
if err is not None:
print(f'Falha na entrega da mensagem: {err}')
else:
print(f'Mensagem entregue ao tópico {msg.topic()} [partição {msg.partition()}]')
# Gerar e enviar dados simulados
for i in range(100):
# Criar dados simulados de sensor
data = {
'sensor_id': f'sensor-{random.randint(1, 10)}',
'temperature': round(random.uniform(20.0, 35.0), 2),
'humidity': round(random.uniform(30.0, 80.0), 2),
'timestamp': int(time.time())
}
# Serializar para JSON
payload = json.dumps(data)
# Enviar para o tópico Kafka
producer.produce('sensor-data',
key=data['sensor_id'],
value=payload,
callback=delivery_report)
# Liberar buffer periodicamente
producer.poll(0)
time.sleep(0.5) # Simular intervalo entre leituras
# Garantir que todas as mensagens sejam enviadas
producer.flush()
Esse consumidor lê os dados enviados, converte de JSON e analisa se a temperatura está acima de um limite pré-definido. É uma base simples para lógica de detecção de anomalias.
# Exemplo de consumidor Kafka com confluent-kafka
from confluent_kafka import Consumer, KafkaError
import json
# Configuração do consumidor
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-consumer-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(config)
consumer.subscribe(['sensor-data'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f'Chegou ao fim da partição {msg.partition()}')
else:
print(f'Erro: {msg.error()}')
else:
# Processar mensagem recebida
try:
data = json.loads(msg.value())
print(f"Sensor: {data['sensor_id']}, Temperatura: {data['temperature']}°C, Umidade: {data['humidity']}%")
# Aqui você poderia implementar lógica de processamento em tempo real
if data['temperature'] > 30.0:
print(f"ALERTA: Temperatura alta detectada no sensor {data['sensor_id']}!")
except json.JSONDecodeError:
print("Erro ao decodificar JSON")
except KeyboardInterrupt:
pass
finally:
# Fechar consumidor
consumer.close()
O Apache Spark com sua API Python (PySpark) oferece capacidades poderosas para processamento de streams.
Quando precisamos de maior capacidade de agregação e análises em janela de tempo, o Spark Structured Streaming com PySpark entra como ferramenta essencial.
No exemplo abaixo, conecto um stream Kafka com Spark para calcular estatísticas por sensor em janelas deslizantes de 5 minutos e detectar anomalias.
# Exemplo de Spark Structured Streaming com PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Criar sessão Spark
spark = SparkSession.builder \
.appName("RealTimeAnalytics") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
.getOrCreate()
# Definir esquema dos dados
schema = StructType([
StructField("sensor_id", StringType(), True),
StructField("temperature", FloatType(), True),
StructField("humidity", FloatType(), True),
StructField("timestamp", TimestampType(), True)
])
# Ler stream do Kafka
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor-data") \
.option("startingOffsets", "latest") \
.load()
# Extrair e transformar dados
parsed_stream = kafka_stream \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*") \
.withWatermark("timestamp", "10 seconds")
# Calcular estatísticas em janelas de tempo
window_stats = parsed_stream \
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("sensor_id")
) \
.agg(
avg("temperature").alias("avg_temp"),
max("temperature").alias("max_temp"),
min("temperature").alias("min_temp"),
avg("humidity").alias("avg_humidity")
)
# Detectar anomalias
anomalies = parsed_stream \
.filter(col("temperature") > 32.0) \
.select(
col("sensor_id"),
col("temperature"),
col("timestamp")
)
# Saída para console (para desenvolvimento)
query1 = window_stats.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
query2 = anomalies.writeStream \
.outputMode("append") \
.format("console") \
.start()
# Saída para banco de dados (para produção)
# Exemplo com PostgreSQL
postgres_output = window_stats.writeStream \
.foreachBatch(lambda df, epoch_id: df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/sensordb") \
.option("dbtable", "sensor_stats") \
.option("user", "username") \
.option("password", "password") \
.mode("append") \
.save()
) \
.outputMode("update") \
.start()
# Aguardar término (em produção, isso seria controlado externamente)
spark.streams.awaitAnyTermination()
O Apache Flink ganhou popularidade por seu processamento de streams de baixa latência.
O Apache Flink é uma ótima escolha quando precisamos de latência extremamente baixa e alto controle sobre eventos de stream. O PyFlink oferece uma API poderosa baseada em SQL e DataStream API.
Antes de mostrar o código, saiba que aqui configuro uma fonte Kafka, processo eventos por minuto com agregações e envio os resultados de volta para outro tópico Kafka.
# Exemplo de PyFlink para processamento de streams
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json
from pyflink.table.window import Tumble
# Criar ambiente de execução
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
# Configurar conexão com Kafka
t_env.connect(
Kafka()
.version("universal")
.topic("sensor-data")
.start_from_latest()
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "pyflink-consumer")
) \
.with_format(
Json()
.fail_on_missing_field(False)
.schema(
Schema()
.field("sensor_id", "STRING")
.field("temperature", "DOUBLE")
.field("humidity", "DOUBLE")
.field("timestamp", "BIGINT")
)
) \
.with_schema(
Schema()
.field("sensor_id", "STRING")
.field("temperature", "DOUBLE")
.field("humidity", "DOUBLE")
.field("event_time", "TIMESTAMP(3)")
.proctime()
) \
.create_temporary_table("sensor_source")
# Definir consulta SQL para processamento em tempo real
result_table = t_env.sql_query("""
SELECT
sensor_id,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
COUNT(*) AS reading_count,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp,
AVG(humidity) AS avg_humidity
FROM sensor_source
GROUP BY
TUMBLE(event_time, INTERVAL '1' MINUTE),
sensor_id
""")
# Configurar saída para console (desenvolvimento)
t_env.connect(
Kafka()
.version("universal")
.topic("sensor-stats")
.property("bootstrap.servers", "localhost:9092")
.start_from_latest()
) \
.with_format(
Json()
.derive_schema()
) \
.with_schema(
Schema()
.field("sensor_id", "STRING")
.field("window_start", "TIMESTAMP(3)")
.field("window_end", "TIMESTAMP(3)")
.field("reading_count", "BIGINT")
.field("avg_temp", "DOUBLE")
.field("max_temp", "DOUBLE")
.field("min_temp", "DOUBLE")
.field("avg_humidity", "DOUBLE")
) \
.create_temporary_table("sensor_sink")
# Executar inserção na tabela de saída
result_table.execute_insert("sensor_sink").wait()
Para casos de uso mais simples, bibliotecas Python nativas oferecem soluções eficientes.
Quando precisamos de uma solução leve, rápida e fácil de prototipar, a biblioteca streamz nos ajuda a criar pipelines de streaming direto com Python puro. Ideal para testes locais ou aplicações menores, especialmente quando não uso clusters distribuídos.
No exemplo a seguir, simulo um fluxo de dados de sensores, aplico parsing, filtros e análise básica de estatísticas com Pandas:
# Exemplo com streamz para processamento de streams em Python puro
from streamz import Stream
import json
import time
import random
import pandas as pd
from datetime import datetime
# Criar stream
source = Stream()
# Definir funções de processamento
def parse_data(message):
try:
return json.loads(message)
except json.JSONDecodeError:
return None
def filter_valid_readings(data):
if data is None:
return False
return 'sensor_id' in data and 'temperature' in data and 'humidity' in data
def detect_anomalies(data):
if data['temperature'] > 30.0:
print(f"ALERTA: Temperatura alta ({data['temperature']}°C) detectada no sensor {data['sensor_id']}!")
return data
def add_timestamp(data):
data['processed_at'] = datetime.now().isoformat()
return data
def batch_to_dataframe(batch):
df = pd.DataFrame(batch)
return df
# Construir pipeline de processamento
processed = source \
.map(parse_data) \
.filter(filter_valid_readings) \
.map(detect_anomalies) \
.map(add_timestamp)
# Criar janelas deslizantes para análise
windowed = processed \
.sliding_window(10) \
.map(batch_to_dataframe) \
.map(lambda df: df.describe())
# Adicionar saídas
processed.sink(print)
windowed.sink(print)
# Simular fonte de dados
for i in range(100):
data = {
'sensor_id': f'sensor-{random.randint(1, 5)}',
'temperature': round(random.uniform(20.0, 35.0), 2),
'humidity': round(random.uniform(30.0, 80.0), 2),
'timestamp': int(time.time())
}
source.emit(json.dumps(data))
time.sleep(0.5)
A análise de dados em tempo real com Python tem aplicações em diversos setores. Vamos explorar alguns casos de uso populares:
Um caso de uso interessante é em projetos de IoT. Aqui, sensores espalhados enviam dados para um pipeline que detecta padrões fora do normal. Utilizamos Scikit-learn para aplicar modelos simples como Isolation Forest.
Antes de mostrar o código, vale entender que o modelo é treinado com dados históricos e depois aplicado em tempo real conforme as leituras chegam pelo Kafka.
# Exemplo de detecção de anomalias em tempo real com Scikit-learn e Kafka
from confluent_kafka import Consumer
import json
import numpy as np
from sklearn.ensemble import IsolationForest
import pandas as pd
import time
# Configurar modelo de detecção de anomalias
model = IsolationForest(contamination=0.05, random_state=42)
# Histórico de dados para treinamento inicial
historical_data = []
# Configurar consumidor Kafka
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'anomaly-detector',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['sensor-data'])
# Função para retreinar o modelo
def retrain_model(data_points):
if len(data_points) < 100:
return False
# Converter para DataFrame
df = pd.DataFrame(data_points)
# Selecionar apenas colunas numéricas para treinamento
features = df[['temperature', 'humidity']].values
# Treinar modelo
model.fit(features)
print(f"Modelo retreinado com {len(data_points)} pontos de dados")
return True
# Função para detectar anomalias
def detect_anomaly(data_point):
# Extrair features
features = np.array([[data_point['temperature'], data_point['humidity']]])
# Prever
prediction = model.predict(features)
score = model.decision_function(features)
# -1 indica anomalia, 1 indica normal
is_anomaly = prediction[0] == -1
return is_anomaly, score[0]
# Processar stream
try:
# Fase inicial: coletar dados para treinamento
print("Coletando dados iniciais para treinamento...")
start_time = time.time()
while len(historical_data) < 100 and time.time() - start_time < 60:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Erro: {msg.error()}")
continue
try:
data = json.loads(msg.value())
historical_data.append(data)
print(f"Coletado: {len(historical_data)}/100 pontos de dados")
except json.JSONDecodeError:
print("Erro ao decodificar JSON")
# Treinar modelo inicial
if len(historical_data) >= 50:
retrain_model(historical_data)
print("Modelo inicial treinado. Iniciando detecção de anomalias...")
else:
print("Dados insuficientes para treinamento inicial.")
exit(1)
# Fase de detecção
retrain_counter = 0
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Erro: {msg.error()}")
continue
try:
data = json.loads(msg.value())
# Detectar anomalia
is_anomaly, score = detect_anomaly(data)
# Adicionar ao histórico
historical_data.append(data)
if len(historical_data) > 1000: # Manter janela deslizante
historical_data.pop(0)
# Reportar resultado
if is_anomaly:
print(f"ANOMALIA DETECTADA! Sensor: {data['sensor_id']}, Temp: {data['temperature']}°C, Umidade: {data['humidity']}%, Score: {score:.4f}")
else:
print(f"Normal - Sensor: {data['sensor_id']}, Score: {score:.4f}")
# Retreinar periodicamente
retrain_counter += 1
if retrain_counter >= 100:
retrain_model(historical_data)
retrain_counter = 0
except json.JSONDecodeError:
print("Erro ao decodificar JSON")
except KeyboardInterrupt:
pass
finally:
consumer.close()
Outro caso muito interessante onde podemos aplicar o Python para análise de dados em tempo real é no monitoramento de redes sociais. Aqui, o objetivo é classificar sentimentos de posts conforme eles são publicados, usando modelos pré-treinados de NLP como o distilbert-base-uncased-finetuned-sst-2-english.
A ideia é consumir os posts de um tópico Kafka, aplicar o modelo de análise de sentimento com a biblioteca transformers e reenviar os resultados para outro tópico para visualização ou armazenamento.
# Exemplo de análise de sentimento em tempo real para tweets
from confluent_kafka import Consumer, Producer
import json
from transformers import pipeline
import time
# Inicializar modelo de análise de sentimento
sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
# Configurar consumidor Kafka
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'sentiment-analyzer',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['social-media-posts'])
# Configurar produtor Kafka para resultados
producer = Producer({
'bootstrap.servers': 'localhost:9092'
})
# Função para analisar sentimento
def analyze_sentiment(text):
try:
result = sentiment_analyzer(text)[0]
return {
'label': result['label'],
'score': float(result['score'])
}
except Exception as e:
print(f"Erro na análise de sentimento: {e}")
return {
'label': 'ERROR',
'score': 0.0
}
# Processar stream
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Erro: {msg.error()}")
continue
try:
# Decodificar mensagem
post = json.loads(msg.value())
# Extrair texto
text = post.get('text', '')
if not text:
continue
# Analisar sentimento
sentiment = analyze_sentiment(text)
# Adicionar resultado ao post
post['sentiment'] = sentiment
# Enviar resultado para outro tópico
producer.produce(
'analyzed-posts',
key=post.get('id', str(time.time())),
value=json.dumps(post)
)
# Imprimir resultado
print(f"Post: '{text[:50]}...' - Sentimento: {sentiment['label']} ({sentiment['score']:.4f})")
# Liberar buffer periodicamente
producer.poll(0)
except json.JSONDecodeError:
print("Erro ao decodificar JSON")
except KeyboardInterrupt:
pass
finally:
consumer.close()
producer.flush()
Em muitos projetos, especialmente no contexto corporativo, precisamos visualizar os dados processados em tempo real de forma clara e interativa. Para isso, utilizamos o framework Dash em conjunto com Plotly, criando painéis que se atualizam automaticamente com as últimas leituras recebidas via Kafka.
O exemplo abaixo mostra como construo um dashboard em tempo real para visualizar temperatura e umidade de sensores, consumindo dados de um tópico Kafka e exibindo gráficos atualizados a cada segundo.
# Exemplo de dashboard em tempo real com Dash e Kafka
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
from confluent_kafka import Consumer
import json
import pandas as pd
from collections import deque
import threading
import time
# Configurar armazenamento de dados em memória
max_length = 100
times = deque(maxlen=max_length)
temperatures = {f'sensor-{i}': deque(maxlen=max_length) for i in range(1, 6)}
humidities = {f'sensor-{i}': deque(maxlen=max_length) for i in range(1, 6)}
# Função para consumir dados do Kafka em thread separada
def consume_kafka_data():
# Configurar consumidor
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'dashboard-consumer',
'auto.offset.reset': 'latest'
})
consumer.subscribe(['sensor-data'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Erro: {msg.error()}")
continue
try:
# Processar mensagem
data = json.loads(msg.value())
# Extrair dados
sensor_id = data.get('sensor_id')
temperature = data.get('temperature')
humidity = data.get('humidity')
timestamp = data.get('timestamp')
# Adicionar aos deques
if sensor_id in temperatures and temperature is not None:
current_time = pd.to_datetime(timestamp, unit='s')
times.append(current_time)
temperatures[sensor_id].append(temperature)
humidities[sensor_id].append(humidity)
except json.JSONDecodeError:
print("Erro ao decodificar JSON")
except Exception as e:
print(f"Erro no consumidor Kafka: {e}")
finally:
consumer.close()
# Iniciar thread do consumidor
kafka_thread = threading.Thread(target=consume_kafka_data, daemon=True)
kafka_thread.start()
# Criar aplicação Dash
app = dash.Dash(__name__)
app.layout = html.Div([
html.H1("Dashboard de Sensores em Tempo Real"),
html.Div([
html.H2("Temperatura"),
dcc.Graph(id='temperature-graph'),
dcc.Interval(
id='temperature-update',
interval=1000, # Atualizar a cada segundo
n_intervals=0
)
]),
html.Div([
html.H2("Umidade"),
dcc.Graph(id='humidity-graph'),
dcc.Interval(
id='humidity-update',
interval=1000, # Atualizar a cada segundo
n_intervals=0
)
])
])
@app.callback(
Output('temperature-graph', 'figure'),
Input('temperature-update', 'n_intervals')
)
def update_temperature_graph(n):
traces = []
for sensor_id, temp_data in temperatures.items():
if len(temp_data) > 0:
traces.append(go.Scatter(
x=list(times)[-len(temp_data):],
y=list(temp_data),
name=sensor_id,
mode='lines+markers'
))
return {
'data': traces,
'layout': go.Layout(
xaxis=dict(title='Tempo'),
yaxis=dict(title='Temperatura (°C)'),
title='Temperatura dos Sensores em Tempo Real',
height=400
)
}
@app.callback(
Output('humidity-graph', 'figure'),
Input('humidity-update', 'n_intervals')
)
def update_humidity_graph(n):
traces = []
for sensor_id, humidity_data in humidities.items():
if len(humidity_data) > 0:
traces.append(go.Scatter(
x=list(times)[-len(humidity_data):],
y=list(humidity_data),
name=sensor_id,
mode='lines+markers'
))
return {
'data': traces,
'layout': go.Layout(
xaxis=dict(title='Tempo'),
yaxis=dict(title='Umidade (%)'),
title='Umidade dos Sensores em Tempo Real',
height=400
)
}
# Executar servidor
if __name__ == '__main__':
app.run_server(debug=True, host='0.0.0.0')
Essa abordagem visual torna muito mais fácil acompanhar comportamentos inesperados ou tendências nos dados.
Para implementar soluções eficientes de análise de dados em tempo real com Python, considere estas melhores práticas:
Ao processar grandes volumes de mensagens, agrupar os dados por lote ou janelas de tempo reduz o overhead e melhora a performance. Bibliotecas como Spark e Beam facilitam esse tipo de agregação nativamente.
# Exemplo de otimização com processamento em lotes
from confluent_kafka import Consumer
import json
import time
# Configurar consumidor
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'batch-processor',
'auto.offset.reset': 'earliest',
'max.poll.records': 500 # Processar até 500 registros por vez
})
consumer.subscribe(['high-volume-data'])
# Processar em lotes
try:
while True:
batch = []
start_time = time.time()
# Coletar lote por tempo ou tamanho
while time.time() - start_time < 5 and len(batch) < 1000:
msg = consumer.poll(0.1)
if msg is None:
continue
if msg.error():
print(f"Erro: {msg.error()}")
continue
try:
data = json.loads(msg.value())
batch.append(data)
except json.JSONDecodeError:
print("Erro ao decodificar JSON")
# Processar lote
if batch:
print(f"Processando lote de {len(batch)} mensagens")
# Aqui você implementaria o processamento em lote
# Por exemplo, usando pandas para análise eficiente
# df = pd.DataFrame(batch)
# resultados = df.groupby('categoria').agg({'valor': ['mean', 'sum', 'count']})
print(f"Lote processado em {time.time() - start_time:.2f} segundos")
except KeyboardInterrupt:
pass
finally:
consumer.close()
Erros acontecem, principalmente ao consumir dados externos ou trabalhar com streams instáveis. Implementar padrões como retries com backoff, circuit breakers e logs estruturados mantem a robustez.
# Exemplo de padrão de circuit breaker para APIs externas
import requests
import time
from functools import wraps
class CircuitBreaker:
def __init__(self, max_failures=3, reset_timeout=60):
self.max_failures = max_failures
self.reset_timeout = reset_timeout
self.failures = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF-OPEN
self.last_failure_time = None
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == "OPEN":
# Verificar se o tempo de reset passou
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "HALF-OPEN"
print(f"Circuit breaker mudou para HALF-OPEN após {self.reset_timeout}s")
else:
raise Exception(f"Circuit breaker aberto. Tentando novamente em {self.reset_timeout - (time.time() - self.last_failure_time):.1f}s")
try:
result = func(*args, **kwargs)
# Sucesso em estado HALF-OPEN, resetar
if self.state == "HALF-OPEN":
self.failures = 0
self.state = "CLOSED"
print("Circuit breaker resetado para CLOSED após sucesso")
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.state == "CLOSED" and self.failures >= self.max_failures:
self.state = "OPEN"
print(f"Circuit breaker aberto após {self.failures} falhas")
raise e
return wrapper
# Uso do circuit breaker
@CircuitBreaker(max_failures=3, reset_timeout=30)
def call_external_api(url):
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
# Exemplo de uso em processamento de stream
def process_stream_with_resilience():
while True:
try:
# Obter dados do stream
data = get_next_data_point()
# Chamar API externa com circuit breaker
try:
enriched_data = call_external_api(f"https://api.example.com/enrich?id={data['id']}")
data.update(enriched_data)
except Exception as e:
print(f"Erro ao enriquecer dados: {e}")
# Continuar com dados parciais
# Processar e salvar dados
process_and_save(data)
except Exception as e:
print(f"Erro no processamento: {e}")
# Implementar backoff exponencial
time.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_retry_delay)
Ferramentas como Prometheus, Grafana ou OpenTelemetry nos permitem monitorar a saúde dos pipelines. Métricas como latência, throughput e erros por segundo ajudam a diagnosticar problemas antes que afetem o negócio.
# Exemplo de instrumentação com OpenTelemetry
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace.status import Status, StatusCode
import time
import random
# Configurar tracer
resource = Resource(attributes={
SERVICE_NAME: "real-time-data-processor"
})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# Função para processar mensagem com tracing
def process_message(message_id, payload):
with tracer.start_as_current_span("process_message") as span:
span.set_attribute("message.id", message_id)
span.set_attribute("message.size", len(payload))
try:
# Simular etapas de processamento
with tracer.start_as_current_span("parse_data"):
time.sleep(random.uniform(0.01, 0.05)) # Simular trabalho
data = json.loads(payload)
with tracer.start_as_current_span("transform_data"):
time.sleep(random.uniform(0.05, 0.1)) # Simular trabalho
# Transformar dados...
with tracer.start_as_current_span("save_results"):
time.sleep(random.uniform(0.1, 0.2)) # Simular trabalho
# Salvar resultados...
span.set_status(Status(StatusCode.OK))
return True
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
return False
# Uso em processador de stream
def process_stream_with_tracing():
for i in range(100):
message_id = f"msg-{i}"
payload = json.dumps({
"sensor_id": f"sensor-{random.randint(1, 5)}",
"temperature": random.uniform(20, 35),
"humidity": random.uniform(30, 80),
"timestamp": time.time()
})
success = process_message(message_id, payload)
print(f"Mensagem {message_id} processada: {'sucesso' if success else 'falha'}")
time.sleep(0.5) # Simular intervalo entre mensagens
# Executar processador
process_stream_with_tracing()
O campo de análise em tempo real com Python continua evoluindo rapidamente. Algumas tendências notáveis para 2025 incluem:
A integração de modelos de IA em pipelines de streaming está transformando a análise em tempo real.
Não basta mais apenas monitorar dados; a demanda agora é por interpretação inteligente dos eventos. Modelos de Machine Learning e Deep Learning estão sendo incorporados diretamente nos pipelines de streaming, usando bibliotecas como torch, transformers e onnxruntime para inferência em tempo real.
# Exemplo conceitual de detecção de objetos em stream de vídeo
import cv2
import numpy as np
from confluent_kafka import Consumer, Producer
import json
import base64
import time
from transformers import DetrImageProcessor, DetrForObjectDetection
import torch
from PIL import Image
import io
# Carregar modelo de detecção de objetos
processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50")
# Configurar consumidor Kafka
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'video-analyzer',
'auto.offset.reset': 'latest'
})
consumer.subscribe(['video-frames'])
# Configurar produtor Kafka
producer = Producer({
'bootstrap.servers': 'localhost:9092'
})
# Função para detectar objetos
def detect_objects(image_bytes):
try:
# Converter bytes para imagem
image = Image.open(io.BytesIO(image_bytes))
# Preparar imagem para o modelo
inputs = processor(images=image, return_tensors="pt")
# Fazer predição
with torch.no_grad():
outputs = model(**inputs)
# Processar resultados
target_sizes = torch.tensor([image.size[::-1]])
results = processor.post_process_object_detection(
outputs, target_sizes=target_sizes, threshold=0.7
)[0]
detections = []
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
box = [round(i, 2) for i in box.tolist()]
detections.append({
"label": model.config.id2label[label.item()],
"score": round(score.item(), 3),
"box": box
})
return detections
except Exception as e:
print(f"Erro na detecção de objetos: {e}")
return []
# Processar stream de vídeo
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Erro: {msg.error()}")
continue
try:
# Decodificar mensagem
frame_data = json.loads(msg.value())
# Extrair metadados e imagem
frame_id = frame_data.get('frame_id')
timestamp = frame_data.get('timestamp')
camera_id = frame_data.get('camera_id')
image_b64 = frame_data.get('image')
if not image_b64:
continue
# Decodificar imagem
image_bytes = base64.b64decode(image_b64)
# Detectar objetos
start_time = time.time()
detections = detect_objects(image_bytes)
processing_time = time.time() - start_time
# Criar resultado
result = {
'frame_id': frame_id,
'timestamp': timestamp,
'camera_id': camera_id,
'detections': detections,
'processing_time': round(processing_time, 3),
'processed_at': time.time()
}
# Enviar resultado para outro tópico
producer.produce(
'video-analysis',
key=str(frame_id),
value=json.dumps(result)
)
# Imprimir resultado
objects_found = [f"{d['label']} ({d['score']:.2f})" for d in detections]
print(f"Frame {frame_id}, Câmera {camera_id}: {len(detections)} objetos detectados: {', '.join(objects_found)}")
# Liberar buffer periodicamente
producer.poll(0)
except json.JSONDecodeError:
print("Erro ao decodificar JSON")
except KeyboardInterrupt:
pass
finally:
consumer.close()
producer.flush()
O processamento na borda está ganhando importância para reduzir latência.
Com o aumento dos dispositivos conectados, mover parte do processamento para próximo da fonte de dados (como gateways IoT ou sensores inteligentes) reduz latência e largura de banda.
# Exemplo conceitual de processamento na borda com sincronização para nuvem
import numpy as np
import pandas as pd
import time
import json
import requests
from datetime import datetime
import threading
import queue
# Simular sensor IoT na borda
class EdgeDevice:
def __init__(self, device_id, sync_interval=60):
self.device_id = device_id
self.sync_interval = sync_interval
self.local_buffer = []
self.cloud_queue = queue.Queue()
self.last_sync = time.time()
self.running = True
# Iniciar threads
self.sensor_thread = threading.Thread(target=self._sensor_loop)
self.processing_thread = threading.Thread(target=self._processing_loop)
self.sync_thread = threading.Thread(target=self._sync_loop)
def start(self):
print(f"Dispositivo {self.device_id} iniciando...")
self.sensor_thread.start()
self.processing_thread.start()
self.sync_thread.start()
def stop(self):
print(f"Dispositivo {self.device_id} parando...")
self.running = False
self.sensor_thread.join()
self.processing_thread.join()
self.sync_thread.join()
print(f"Dispositivo {self.device_id} parado.")
def _sensor_loop(self):
"""Simula leituras de sensor"""
while self.running:
# Simular leitura de sensor
reading = {
'device_id': self.device_id,
'timestamp': datetime.now().isoformat(),
'temperature': np.random.uniform(20, 35),
'humidity': np.random.uniform(30, 80),
'pressure': np.random.uniform(980, 1020)
}
# Adicionar ao buffer local
self.local_buffer.append(reading)
# Simular intervalo entre leituras
time.sleep(1)
def _processing_loop(self):
"""Processa dados localmente"""
while self.running:
if len(self.local_buffer) >= 10:
# Copiar buffer para processamento
to_process = self.local_buffer.copy()
# Processar localmente
df = pd.DataFrame(to_process)
# Detectar anomalias localmente (exemplo simplificado)
mean_temp = df['temperature'].mean()
std_temp = df['temperature'].std()
for reading in to_process:
# Marcar anomalias
temp = reading['temperature']
if abs(temp - mean_temp) > 2 * std_temp:
reading['anomaly'] = True
print(f"Anomalia detectada localmente! Temperatura: {temp:.2f}°C")
# Enviar anomalias imediatamente para a nuvem
self.cloud_queue.put(reading)
else:
reading['anomaly'] = False
# Calcular estatísticas
stats = {
'device_id': self.device_id,
'timestamp': datetime.now().isoformat(),
'window_start': to_process[0]['timestamp'],
'window_end': to_process[-1]['timestamp'],
'reading_count': len(to_process),
'temperature_mean': float(mean_temp),
'temperature_min': float(df['temperature'].min()),
'temperature_max': float(df['temperature'].max()),
'humidity_mean': float(df['humidity'].mean()),
'pressure_mean': float(df['pressure'].mean()),
'anomalies_detected': int(df['anomaly'].sum() if 'anomaly' in df else 0)
}
# Adicionar estatísticas à fila de sincronização
self.cloud_queue.put(stats)
time.sleep(1)
def _sync_loop(self):
"""Sincroniza dados com a nuvem"""
while self.running:
current_time = time.time()
# Sincronizar periodicamente ou quando houver muitos dados
if current_time - self.last_sync >= self.sync_interval or self.cloud_queue.qsize() > 50:
batch = []
# Coletar dados da fila
try:
while not self.cloud_queue.empty() and len(batch) < 100:
batch.append(self.cloud_queue.get_nowait())
except queue.Empty:
pass
if batch:
# Enviar para a nuvem
try:
print(f"Sincronizando {len(batch)} itens com a nuvem...")
# Em um caso real, isso seria uma chamada de API
# response = requests.post(
# "https://api.example.com/device-data",
# json={'device_id': self.device_id, 'data': batch},
# timeout=10
# )
# response.raise_for_status()
# Simular envio bem-sucedido
time.sleep(0.5)
print(f"Sincronização concluída: {len(batch)} itens enviados")
self.last_sync = current_time
except Exception as e:
print(f"Erro na sincronização: {e}")
# Devolver itens para a fila
for item in batch:
self.cloud_queue.put(item)
time.sleep(1)
# Criar e iniciar dispositivo de borda
edge_device = EdgeDevice("sensor-edge-01")
try:
edge_device.start()
# Executar por um tempo
time.sleep(120)
finally:
edge_device.stop()
Linguagens declarativas estão simplificando a análise em tempo real.
Frameworks como Apache Flink, Beam e KSQLDB estão popularizando o uso de consultas SQL contínuas para analisar dados em tempo real. A tendência é que menos código seja necessário para construir pipelines complexos, aumentando a produtividade.
# Exemplo com SQLStreamBuilder (conceitual)
from sqlstreambuilder import StreamBuilder, Schema, Field, WindowType
import time
# Definir esquema
sensor_schema = Schema([
Field("sensor_id", "STRING"),
Field("temperature", "DOUBLE"),
Field("humidity", "DOUBLE"),
Field("timestamp", "TIMESTAMP")
])
# Criar builder
builder = StreamBuilder()
# Registrar fonte de dados
builder.register_stream(
name="sensor_data",
schema=sensor_schema,
source={
"type": "kafka",
"topic": "sensor-data",
"bootstrap.servers": "localhost:9092",
"group.id": "sql-processor"
}
)
# Definir consultas SQL
builder.create_view(
name="temperature_stats",
sql="""
SELECT
sensor_id,
TUMBLE_START(timestamp, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(timestamp, INTERVAL '1' MINUTE) AS window_end,
COUNT(*) AS reading_count,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp
FROM sensor_data
GROUP BY
TUMBLE(timestamp, INTERVAL '1' MINUTE),
sensor_id
"""
)
builder.create_view(
name="humidity_stats",
sql="""
SELECT
sensor_id,
TUMBLE_START(timestamp, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(timestamp, INTERVAL '1' MINUTE) AS window_end,
AVG(humidity) AS avg_humidity
FROM sensor_data
GROUP BY
TUMBLE(timestamp, INTERVAL '1' MINUTE),
sensor_id
"""
)
builder.create_view(
name="high_temperature_alerts",
sql="""
SELECT
sensor_id,
temperature,
timestamp
FROM sensor_data
WHERE temperature > 30.0
"""
)
# Definir saídas
builder.create_sink(
name="stats_sink",
source="temperature_stats",
sink={
"type": "kafka",
"topic": "temperature-stats",
"bootstrap.servers": "localhost:9092"
}
)
builder.create_sink(
name="humidity_sink",
source="humidity_stats",
sink={
"type": "kafka",
"topic": "humidity-stats",
"bootstrap.servers": "localhost:9092"
}
)
builder.create_sink(
name="alerts_sink",
source="high_temperature_alerts",
sink={
"type": "kafka",
"topic": "temperature-alerts",
"bootstrap.servers": "localhost:9092"
}
)
# Executar pipeline
job = builder.build()
job.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
job.stop()
A convergência de processamento em lote e em tempo real está simplificando arquiteturas.
Unificação de batch e stream em uma única arquitetura de processamento. Ferramentas como Apache Beam e Delta Lake estão facilitando esse modelo híbrido, onde escrevemos uma vez e executamos em diferentes modos de execução.
# Exemplo conceitual com Apache Beam para processamento unificado
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
import json
import datetime
# Definir transformações
class ParseJson(beam.DoFn):
def process(self, element):
try:
record = json.loads(element.decode('utf-8'))
return [record]
except Exception as e:
print(f"Erro ao analisar JSON: {e}")
return []
class AddTimestamp(beam.DoFn):
def process(self, element):
# Adicionar timestamp para windowing
timestamp = element.get('timestamp')
if timestamp:
# Converter para timestamp do Beam
event_time = datetime.datetime.fromtimestamp(timestamp)
yield beam.window.TimestampedValue(element, event_time.timestamp())
else:
yield element
class CalculateStats(beam.DoFn):
def process(self, window_pair):
key, readings = window_pair
if not readings:
return []
temperatures = [r.get('temperature', 0) for r in readings if 'temperature' in r]
humidities = [r.get('humidity', 0) for r in readings if 'humidity' in r]
if not temperatures:
return []
stats = {
'sensor_id': key,
'window_timestamp': datetime.datetime.now().isoformat(),
'reading_count': len(readings),
'temperature_avg': sum(temperatures) / len(temperatures),
'temperature_min': min(temperatures),
'temperature_max': max(temperatures)
}
if humidities:
stats['humidity_avg'] = sum(humidities) / len(humidities)
return [json.dumps(stats).encode('utf-8')]
# Configurar pipeline
pipeline_options = PipelineOptions([
'--runner=DirectRunner',
'--streaming'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
# Ler de Kafka
readings = (
pipeline
| 'ReadFromKafka' >> ReadFromKafka(
consumer_config={
'bootstrap.servers': 'localhost:9092',
'auto.offset.reset': 'latest'
},
topics=['sensor-data']
)
| 'ParseJson' >> beam.ParDo(ParseJson())
| 'AddEventTimestamps' >> beam.ParDo(AddTimestamp())
)
# Processar em janelas de tempo
windowed_stats = (
readings
| 'WindowByMinute' >> beam.WindowInto(FixedWindows(60)) # Janelas de 1 minuto
| 'ExtractSensorId' >> beam.Map(lambda x: (x.get('sensor_id', 'unknown'), x))
| 'GroupBySensor' >> beam.GroupByKey()
| 'CalculateStats' >> beam.ParDo(CalculateStats())
)
# Detectar anomalias
anomalies = (
readings
| 'FilterHighTemperature' >> beam.Filter(lambda x: x.get('temperature', 0) > 30.0)
| 'FormatAnomaly' >> beam.Map(lambda x: json.dumps({
'sensor_id': x.get('sensor_id', 'unknown'),
'temperature': x.get('temperature', 0),
'timestamp': x.get('timestamp', 0),
'alert_type': 'HIGH_TEMPERATURE',
'detected_at': datetime.datetime.now().isoformat()
}).encode('utf-8'))
)
# Escrever resultados em Kafka
windowed_stats | 'WriteStatsToKafka' >> WriteToKafka(
producer_config={'bootstrap.servers': 'localhost:9092'},
topic='sensor-stats'
)
anomalies | 'WriteAnomaliesToKafka' >> WriteToKafka(
producer_config={'bootstrap.servers': 'localhost:9092'},
topic='sensor-anomalies'
)
A análise de dados em tempo real com Python continuará evoluindo rapidamente nos próximos anos. As ferramentas e técnicas apresentadas neste artigo fornecem uma base sólida para implementar soluções de streaming de dados que podem processar informações em tempo real, extrair insights valiosos e acionar ações imediatas.
Em 2025, as organizações que dominam a análise em tempo real têm uma vantagem competitiva significativa, sendo capazes de responder rapidamente a mudanças no mercado, detectar anomalias antes que se tornem problemas e oferecer experiências personalizadas aos clientes.
Ao combinar o poder e a flexibilidade do Python com as tecnologias modernas de streaming como Kafka, Spark e Flink, você pode construir sistemas robustos de processamento em tempo real que escalam para atender às demandas do seu negócio, independentemente do volume, velocidade ou variedade dos seus dados.
O desenvolvimento web evoluiu significativamente nos últimos anos, e os frameworks Python estão na vanguarda…
A inteligência artificial está revolucionando todos os setores da sociedade, desde aplicações empresariais até soluções…
Nesta aula do minicurso de Python, quero abordar dois tipos de coleção que são usadas…
Entre as estruturas de dados mais versáteis do Python, os dicionários se destacam. Eles nos…
Nesta nona aula do mini curso, quero falar sobre um dos tipos de dados mais…
Entender o escopo de variáveis em Python é essencial para evitar erros e criar programas…
Este blog utiliza cookies. Se você continuar assumiremos que você está satisfeito com ele.
Leia Mais...