The premise for below is that you want to set up a robust messaging system that can handle real-time data with proper structure and timestamps. On top of that, you want to introduce “Data Contracts” to your org’s systems. To do this we will leverage Protocol Buffers as the contract. This all might feel intimidating at first, but after this article anyone working with Data should feel empowered to do this. All scripts to get you started are located HERE. It's time to skill up and understand this critical piece of the Full Data Stack!
Let's Get Started:
Today we are going to build a Protocol Buffer message system that publishes structured data to Google Cloud Pub/Sub. This simple application is meant to mimic real-time event data you could use for analysis and derive insights. The beauty here is that each message contains properly structured data with timestamps, making it perfect for downstream analytics and real-time processing.
Here's what it will look like:
A Python script that randomly selects a US state name and its abbreviation, packages them into Protocol Buffer messages with timestamps, and publishes them to a Pub/Sub topic. Clean, structured, and ready for consumption by any downstream service.
Here's what you need:
1. A Google Cloud Project with Pub/Sub enabled
2. Python environment with virtual environment support
3. Protocol Buffer compiler (protoc) via grpcio-tools
4. The required Google Cloud libraries
A Quick Note About Why This Matters:
Protocol Buffers aren't just a fancy kind of JSON. They're actually incredibly efficient for real-time data streaming. They're smaller, faster to serialize/deserialize, and provide strong typing that prevents the data quality issues we see all the time with loose JSON schemas. For Data Teams building real-time pipelines, this is the foundation you want. To do this at scale, it would require buy in from software engineers (or anyone who is in charge of the data source). For a review on how you would even pitch this idea to your org, see my previous newsletter HERE.
What are Protocol Buffers?
From the official documentation:
> Protocol buffers are a language-neutral, platform-neutral extensible mechanism for serializing structured data.
So essentially, Protocol Buffers let you define your data structure once in a `.proto` file, then generate code for any language you want. The nice thing about Protocol Buffers is that they enforce structure. So no more wondering if that timestamp field is going to be a string, integer, or some weird date format depending on who sent the message.
This differs from sending raw JSON where you're constantly dealing with type mismatches and missing fields. With Protocol Buffers, you define the contract up front and everyone plays by the same rules.
NOTE: For all the following scripts, you can clone the repo HERE instead of making the project yourself. But I show the scripts below so you can just read the article as needed.
Let's Build Our Message Schema
First, we need to define what our message looks like. We're going to create a simple schema for US state data with a timestamp:
us-states.proto
syntax = "proto3";
message StateProto {
int64 publish_time = 1; // Unix timestamp in seconds
string name = 2;
string post_abbr = 3;
}
This schema defines three fields:
1. `publish_time` - Unix timestamp when the message was created
2. `post_abbr` - The postal abbreviation (like "CA")
3. `name` - The full state name (like "California")
The numbers (1, 2, 3) are field identifiers that Protocol Buffers use for efficient serialization. Once you pick these numbers, don't change them. That's how backward compatibility works.
Setting Up Our Dependencies
We need a few Python packages to make this work. Here's our `requirements.txt`:
requirements.txt
google-cloud-pubsub>=2.18.0
protobuf>=4.24.0
google-api-core>=2.11.0
grpcio-tools>=1.59.0
The `grpcio-tools` package gives us the Protocol Buffer compiler so we can generate Python code from our `.proto` file.
Install everything:
```bash
pip install -r requirements.txt
```
Then generate the Python code from our schema:
```bash
python -m grpc_tools.protoc -I. --python_out=. us-states.proto
```
This creates `us_states_pb2.py` with all the Python classes we need.
Building the Publisher
Now for the script that actually creates the message. We want something that randomly selects states and publishes them with real timestamps:
The Complete Publisher Script:
publish-proto.py
```python
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding
import datetime
import random
import time
import us_states_pb2 # type: ignore
# Dictionary of US states and their abbreviations
US_STATES = {
"Alabama": "AL", "Alaska": "AK", "Arizona": "AZ", "Arkansas": "AR", "California": "CA",
"Colorado": "CO", "Connecticut": "CT", "Delaware": "DE", "Florida": "FL", "Georgia": "GA",
"Hawaii": "HI", "Idaho": "ID", "Illinois": "IL", "Indiana": "IN", "Iowa": "IA",
"Kansas": "KS", "Kentucky": "KY", "Louisiana": "LA", "Maine": "ME", "Maryland": "MD",
"Massachusetts": "MA", "Michigan": "MI", "Minnesota": "MN", "Mississippi": "MS", "Missouri": "MO",
"Montana": "MT", "Nebraska": "NE", "Nevada": "NV", "New Hampshire": "NH", "New Jersey": "NJ",
"New Mexico": "NM", "New York": "NY", "North Carolina": "NC", "North Dakota": "ND", "Ohio": "OH",
"Oklahoma": "OK", "Oregon": "OR", "Pennsylvania": "PA", "Rhode Island": "RI", "South Carolina": "SC",
"South Dakota": "SD", "Tennessee": "TN", "Texas": "TX", "Utah": "UT", "Vermont": "VT",
"Virginia": "VA", "Washington": "WA", "West Virginia": "WV", "Wisconsin": "WI", "Wyoming": "WY"
}
# TODO(developer): Replace these variables before running the sample.
project_id = "PROJECT_ID"
topic_id = "TOPIC_ID"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Randomly select a state
state_name = random.choice(list(US_STATES.keys()))
state_abbr = US_STATES[state_name]
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
# Set current timestamp as Unix epoch time (first attribute)
state.publish_time = int(time.time())
state.name = state_name
state.post_abbr = state_abbr
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
data = state.SerializeToString()
print(f"Preparing a binary-encoded message:\n{data}")
elif encoding == Encoding.JSON:
json_object = MessageToJson(state)
data = str(json_object).encode("utf-8")
print(f"Preparing a JSON-encoded message:\n{data}")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message for state {state_name} ({state_abbr}) with ID: {future.result()}")
print(f"Message timestamp: {datetime.datetime.fromtimestamp(state.publish_time)}")
except NotFound:
print(f"{topic_id} not found.")
```
What Makes This Powerful
The beauty of this setup is in the details:
1. Strong Typing: Protocol Buffers ensure that `publish_time` is always an integer, `name` is always a string, etc. No more data quality surprises.
2. Efficient Serialization: Protocol Buffers are much smaller than equivalent JSON, which matters when you're publishing thousands of messages per second.
3. Dual Encoding Support: The script handles both binary and JSON encoding depending on your Pub/Sub topic schema settings.
4. Real Timestamps: Every message gets a Unix timestamp at the moment of creation, perfect for time-series analysis downstream.
5. Random Data Generation: Great for testing, load testing, or generating synthetic data for development environments.
Running Your Publisher
Before you run this, make sure you have:
A Google Cloud project with Pub/Sub API enabled
A PusbSub schema created (docs to do that are HERE)
Choose the “Protocol Buffer” radio button
Paste what is in the us-states.proto into the text box
Select the “Validate Definition” button to check the syntax is correct
A PubSub topic and subscription created (docs to do that are HERE)
Check the “Add a default subscription” checkbox
Check the “Use a schema” checkbox
Select the PubSub schema you created above
Leave the revision range to default
Proper authentication set up (docs to do that are HERE)
Then just update your project and topic IDs in the script and run:
```bash
python publish-proto.py
```
Each time you run it, you'll get a different random state published to your topic with a fresh timestamp. You can validate that the message has successfully made it through by going to your PubSub subscription page, clicking on the “Messages” on the page and then clicking “PULL” link. You should see the message show up like the following:
Why This Matters for Data Teams
This isn't just about getting messages into Pub/Sub. This is about building the foundation for real-time data systems that actually work at scale. When you have strongly-typed, timestamped messages flowing through your system, you can build reliable downstream consumers, accurate analytics, and real-time dashboards without constantly debugging data quality issues.
The time has come for Data Teams to stop treating messaging systems as an afterthought. In the new world of real-time analytics and AI applications, having clean, structured data flowing through your system is what separates the teams that ship reliable products from the ones that spend all their time debugging data pipeline failures.
What's Next?
This publisher is just the beginning. Next steps could include:
- Building consumers that process these messages
- Adding schema evolution as your data requirements change
- Setting up monitoring and alerting for your message flows
- Integrating with your data warehouse for batch processing
But for now, you have a solid foundation for structured, real-time messaging. Let's start getting comfortable with these tools and build data systems that actually work reliably.
---
Want to see more content like this? Subscribe to The Full Data Stack for practical tutorials on building production data systems.
I also have a Youtube channel where I go over my favorite Data Stack topics.
You can follow me on LinkedIn as well for smaller, more consistent tidbits of info about Data and Technical Product Management in Data.