Create a Python Kafka Stream

Modern data ecosystems support various patterns for data ingestion but at their heart is real time streaming and ingestion. In real time environments, data is treated less as static records and more as continuous signals—events emitted by applications, devices, users, and systems as they happen. These signals flow through streaming platforms (for example, event brokers and stream processors), where they can be filtered, enriched, joined, and acted upon in milliseconds.

Demos come to life when enriched with real-time data rather than static samples. The blog looks at creating a Python Kaka stream using Wikimedia EventStreams as the source. EventStreams is a public domain web service that exposes continuous streams of structured event data through API’s. It’s the perfect source for demoing real time streaming.

This demo shows how real-time data can be used to create something immediately tangible and engaging. By streaming live Wikimedia Recent Changes events via Server-Sent Events (SSE) and publishing them straight into Kafka, we turn a constantly changing public data source into a live event stream that can be consumed by downstream analytics, dashboards, or data platforms. Unlike static sample datasets, this approach lets demos react in real time—new edits appear instantly, patterns emerge as activity spikes, and enrichment or analytics can be layered on the fly—making the demo feel alive and far closer to real-world production behaviour.

The following python code creates a Kafka stream based on wiki media live updates. In this example I used a local Kafka broker (seperate blog on this)

"""

Wikimedia Recent Changes → Kafka (Real-Time Streaming Demo)

Streams live Wikimedia edit events via Server-Sent Events (SSE)

and publishes them to a Kafka topic for use in real-time demos,

analytics, and downstream stream processing.

Requirements:

- Kafka broker running and reachable

- Python packages: requests, sseclient-py, kafka-python

"""

import json

import sys

import requests

from sseclient import SSEClient  # pip install sseclient-py

from kafka import KafkaProducer  # pip install kafka-python

# Wikimedia requires a descriptive User-Agent

headers = {

    "User-Agent": "MyKafkaProducer/1.0 ([enter email])" # enter email here

}


# Wikimedia EventStreams endpoint for recent changes

STREAM_URL = "https://stream.wikimedia.org/v2/stream/recentchange"


# Kafka configuration

KAFKA_BROKER = "localhost:9092"         # Change if using Docker or remote broker

KAFKA_TOPIC = "wikimedia.recentchange"  # Topic name

# Set up a Kafka producer for streaming Wikimedia events

def create_kafka_producer():

    """Create a Kafka producer with JSON serialization."""

    return KafkaProducer(

        bootstrap_servers=[KAFKA_BROKER],

        value_serializer=lambda v: json.dumps(v).encode("utf-8"),

        retries=5

    )

# Read Wikimedia SSE events and push each event to a Kafka topic

def stream_recent_changes(producer):

    try:

        # Open a streaming connection

        response = requests.get(STREAM_URL, headers=headers, stream=True, timeout=30)

        response.raise_for_status()


        # Wrap the response with SSEClient to parse events

        client = SSEClient(response)


        print(f"Listening to Wikimedia recent changes... streaming to Kafka topic '{KAFKA_TOPIC}'\n")

        for event in client.events():

            try:

                change = json.loads(event.data)

              # Print for visibility

                print(f"[{change.get('wiki')}] {change.get('user')} edited {change.get('title')} ({change.get('type')})")


                # Send to Kafka

                producer.send(KAFKA_TOPIC, value=change)


            except json.JSONDecodeError:

                continue  # Skip malformed JSON


    except requests.exceptions.RequestException as e:

        print(f"Connection error: {e}", file=sys.stderr)

    except KeyboardInterrupt:

        print("\nStopped by user.")

    finally:

        producer.flush()

        producer.close()


if __name__ == "__main__":

    producer = create_kafka_producer()

    stream_recent_changes(producer)




Next
Next

Teradata VS Code add in