blog Deploying Personal Projects

Tags:
deployment faust forgejo kafka saltstack

Over time, I’ve been slowly refining how I automatically deploy things to my development environment. This is an improvement to my previous deploy from github

Push Code to Forgejo

I’m running a forgejo instance, that allows me to better customize how I organize repos and handle webhooks. I have setup a global webhook to push to my Kafka instance. Recent versions of Kafka support kraft , which makes it much easier to self-host Kafka, and it can be done simply with containers and any kind of small home server.

Webhook to Kafka with Faust

First step is to get our webhooks into Kafka. For this, I am using a small Faust container to handle the webhooks. I could also likely use some other kind of REST Kafka project. For this part, I mostly submit the webhooks as-is, and do further processing later.

@app.page("/forgejo")
class forgejo(View):
    __topic = app.topic(Topics.Forgjo.WEBHOOK, value_serializer="json")

    async def post(self, request: Request) -> Response:
        await self.__topic.send(
            value={
                "payload": await request.json(),
                "headers": dict(request.headers),
                "@timestamp": time.time(),
            }
        )
        return self.json({"status": "ok"})

Process events with Faust

Using Faust, I can easily process various Kafka streams into any form I want. This lets me push all kinds of events from anything I can think of, and easily process the streams to create new streams. Here I use some code to process the full, unmodified webhook payload, into something smaller.

topic_forgejo = app.topic(Topics.Forgjo.WEBHOOK, value_serializer="json")
topic_repo_push = app.topic(Topics.Forgjo.REPO, value_serializer="json")

@app.agent(topic_forgejo)
async def process_forgejo(stream):
    async for event in stream.events():
        headers = event.value["headers"]
        payload = event.value["payload"]

        match headers["X-GitHub-Event-Type"]:
            case "push":
                repository = payload["repository"]
                logger.warning("Push for %s", repository["full_name"])

                await topic_repo_push.send(
                    value={
                        "@timestamp": event.message.timestamp,
                        "ref": payload["ref"],
                        "full_name": repository["full_name"],
                        "html_url": repository["html_url"],
                    },
                    timestamp=event.message.timestamp,
                )

Kafka to Salt

Now that I have a stream of push events converted to a simple form, I want to follow those events from my salt-master. I have written a salt-kafka module, so that I can use my salt-master to query from a Kafka topic. This means I can read from a processed repo-push event and send it to the salt bus under its own namespace

# From salt-master configuration
engines:
  - kafka_consumer:
      broker: kafka.example.com:9092
      subscribe:
        # Read from Kafka Topic and write to Salt Tag
        push-repo: kafka/repo
        push-package: kafka/container

Salt Reactor

Now that my salt-master is receiving a stream of push events, I can map them to a Salt state that I want to deploy. I map each of the above Salt tags, to a salt reactor state that I want to run.

# From salt-master configuration
reactor:
  - "kafka/repo":
      - salt://_reactor/kafka/repo.sls
  - "kafka/container":
      - salt://_reactor/kafka/container.sls

I chose to write these as reactors instead of engines, because it makes it (somewhat) easier to develop since Salt will automatically load my reactor each time. If I used a salt-engine, then I would need to restart the engine each time I made a change.

To simplify the syntax for a local state deploy, I wrote a small helper function.

def format(tgt, state, tgt_type="glob", ret="highstate"):
    log.warning("Deploying %s to %s", state, tgt)
    return {
        "deploy": {
            "local.state.sls": [
                {"tgt": tgt},
                {"tgt_type": tgt_type},
                {"ret": ret},
                {"args": [{"mods": state}]},
            ]
        }
    }

I default to using the highstate_return so that I can get an email at the end of each deploy. Then I can use that within my Reactor run() statement to map repository pushes to states I want to deploy.

def run():
    if data["ref"] in ["refs/heads/master", "refs/heads/main"]:
        match data["full_name"]:
            case "org1/repo1" | "org1/repo2":
                return format("host.example.com", "some.state")
            case "org2/repoA":
                return format("host.example.com", "another.state")