converted to glang for easier claude code usage

This commit is contained in:
2026-04-14 21:18:26 -04:00
parent 43935503fb
commit a18af39066
12 changed files with 761 additions and 1247 deletions

View File

@@ -1,5 +1,14 @@
# Vikunja MCP - Developed by acidvegas in Python (https://git.acid.vegas)
# Vikunja MCP - Developed by acidvegas in Go (https://git.acid.vegas)
# vikunja-mcp/.env.example
#
# The Go binary does not load this file itself — pass these as real environment
# variables (your MCP client's "env" block, systemd Environment=, or `export`
# them in your shell). This file is a reference for the variable names.
VIKUNJA_URL=http://localhost:3456
VIKUNJA_TOKEN=tk_your_api_token_here
# Optional: defaults for transport, host, and port when running the HTTP daemon.
# VIKUNJA_MCP_TRANSPORT=stdio
# VIKUNJA_MCP_HOST=127.0.0.1
# VIKUNJA_MCP_PORT=8000

10
.gitignore vendored
View File

@@ -1,5 +1,5 @@
.env
__pycache__/
*.pyc
.venv/
venv/
vikunja-mcp
/vikunja-mcp.exe
*.test
*.out
.env

187
README.md
View File

@@ -46,21 +46,21 @@ Vikunja MCP fixes both by moving the memory out of the model and into a structur
| Vikunja MCP |
+---------+---------+
|
+-----------------+-----------------+
| | |
stdio SSE Streamable HTTP
| | |
+------+-------+ +------+-------+ +------+--------+
| Claude Code | | Cursor, n8n, | | Claude Code, |
| Claude | | older MCP | | Cursor, and |
| Desktop | | clients | | modern MCP |
| Local LLMs | | | | clients |
+--------------+ +--------------+ +---------------+
+------------+------------+
| |
stdio Streamable HTTP
| |
+--------+--------+ +----------+----------+
| Claude Code | | Claude Code, |
| Claude Desktop | | Cursor, Claude |
| Cursor | | Desktop, modern |
| Local LLMs | | MCP clients |
+-----------------+ +---------------------+
```
The agent speaks MCP. The MCP server speaks the Vikunja REST API. Vikunja stores everything on disk in its own database. Any number of agents *(remote or local, paid or free)* can point at the same MCP server and share the same memory.
Vikunja MCP supports all three MCP transports — stdio for local subprocess use, SSE for legacy HTTP clients, and Streamable HTTP for modern HTTP clients. Pick whichever your editor or agent supports. See [Transports](#transports) for the details.
Vikunja MCP is a single self-contained Go binary and speaks two MCP transports — stdio for local subprocess use and Streamable HTTP for network clients. Pick whichever your editor or agent supports. See [Transports](#transports) for the details.
## How Information Is Stored
@@ -174,63 +174,64 @@ The payload lives in `instructions.txt` at the repo root. Edit it to customise c
## Setup
Clone the repository and install dependencies:
Vikunja MCP is a single Go program. The simplest install is zero install: you point your MCP client at `go run github.com/acidvegas/vikunja-mcp@latest` and pass your Vikunja URL and token through the client's `env` block. The Go toolchain downloads, builds, and runs the server on demand. No repo clone, no virtualenv, no build step.
```
git clone https://github.com/acidvegas/vikunja-mcp
cd vikunja-mcp
pip install -r requirements.txt
cp .env.example .env
```
Requirements:
Edit `.env`:
| Requirement | Version | Notes |
| ------------- | ------- | ---------------------------------------------------------- |
| Go | 1.25+ | Only needed on the machine running the MCP server |
| Vikunja | any | A running Vikunja instance reachable by the MCP server |
| API token | — | Generate in Vikunja under Settings → API Tokens *(`tk_...`)* |
The two environment variables the server always needs:
| Variable | Required | Description |
| --------------- | -------- | ------------------------------------------- |
| `VIKUNJA_URL` | yes | Base URL of your Vikunja server |
| `VIKUNJA_TOKEN` | yes | Personal API token *(`tk_...`)* |
Generate the token in the Vikunja web UI under Settings, API Tokens. Verify connectivity with the stdlib test from the repo root:
See [Client Configuration](#client-configuration) for the exact `env` shape per client.
**Optional local build.** If you would rather build a binary and run it yourself:
```
python3 test.py
git clone https://github.com/acidvegas/vikunja-mcp
cd vikunja-mcp
go build -o vikunja-mcp .
./vikunja-mcp --transport stdio
```
You should see `OK` for `/info`, `/user`, `/projects`, and `/tasks`.
## Transports
MCP defines three transports. Vikunja MCP speaks all three — pick the one your client supports. stdio is the default, SSE and Streamable HTTP are selected with a CLI flag.
Vikunja MCP speaks two MCP transports — pick the one your client supports. stdio is the default, Streamable HTTP is selected with a CLI flag.
| Transport | Use when | Launched by | Endpoint |
| --------------- | --------------------------------------------------------------------------------------- | -------------------- | ------------------- |
| stdio | The client runs the server as a subprocess on the same machine. Zero network exposure. | The MCP client | stdin / stdout |
| SSE | You want a network endpoint compatible with older HTTP clients *(Cursor, n8n, Continue)*. | You, as a daemon | `GET /sse` + `POST /messages/` |
| Streamable HTTP | You want the current MCP HTTP transport. Single endpoint, session aware, supports resumable streams. | You, as a daemon | `POST/GET/DELETE /mcp` |
| Transport | Use when | Launched by | Endpoint |
| --------------- | --------------------------------------------------------------------------------------- | -------------------- | ---------------- |
| stdio | The client runs the server as a subprocess on the same machine. Zero network exposure. | The MCP client | stdin / stdout |
| Streamable HTTP | You want the current MCP HTTP transport. Network accessible, multiple clients share one running daemon. | You, as a daemon | `http://host:port/` |
SSE is the legacy HTTP transport and is on the MCP spec's deprecation path, but it has the widest client support right now. Streamable HTTP is the current spec and is what new clients are adding. Running both gives you maximum compatibility.
stdio is the right default — it requires no daemon, no open ports, and no extra config. Streamable HTTP is what you use when the MCP server is on a different box, when you want multiple agents to share one running process, or when you want the server to persist across client restarts.
**Launching:**
```
python3 server.py # stdio (default)
python3 server.py --transport stdio # explicit stdio
python3 server.py --transport sse --host 0.0.0.0 --port 8000
python3 server.py --transport http --host 0.0.0.0 --port 8000
go run github.com/acidvegas/vikunja-mcp@latest # stdio (default)
go run github.com/acidvegas/vikunja-mcp@latest -t stdio # explicit stdio
go run github.com/acidvegas/vikunja-mcp@latest -t http --host 0.0.0.0 --port 8000
```
The `sse` and `http` modes bind a uvicorn HTTP server on `--host:--port` *(default `127.0.0.1:8000`)*. They can also be set via the `VIKUNJA_MCP_TRANSPORT`, `VIKUNJA_MCP_HOST`, and `VIKUNJA_MCP_PORT` environment variables.
To run both SSE and Streamable HTTP at once, start two processes on different ports *(or put them behind a reverse proxy at different paths)*:
Or with a local build:
```
python3 server.py --transport sse --port 8000 &
python3 server.py --transport http --port 8001 &
./vikunja-mcp # stdio
./vikunja-mcp -t stdio
./vikunja-mcp -t http --host 0.0.0.0 --port 8000
```
Each process is independent and talks to the same Vikunja backend, so memories written through one are immediately visible through the other.
The `http` mode binds on `--host:--port` *(default `127.0.0.1:8000`)*. Flags can also be set via the `VIKUNJA_MCP_TRANSPORT`, `VIKUNJA_MCP_HOST`, and `VIKUNJA_MCP_PORT` environment variables. Both `-t` and `--transport` are accepted.
### Running SSE or HTTP as a systemd service
### Running Streamable HTTP as a systemd service
If you want the network transports to be persistent, run them as a systemd unit. Minimal example *(`/etc/systemd/system/vikunja-mcp.service`)*:
If you want the network transport to be persistent, run it as a systemd unit. Minimal example *(`/etc/systemd/system/vikunja-mcp.service`)*:
```ini
[Unit]
Description=Vikunja MCP server
@@ -240,31 +241,40 @@ After=network-online.target
Type=simple
User=vikunja-mcp
WorkingDirectory=/opt/vikunja-mcp
EnvironmentFile=/opt/vikunja-mcp/.env
ExecStart=/usr/bin/python3 /opt/vikunja-mcp/server.py --transport http --host 127.0.0.1 --port 8000
Environment=VIKUNJA_URL=http://localhost:3456
Environment=VIKUNJA_TOKEN=tk_your_token_here
ExecStart=/opt/vikunja-mcp/vikunja-mcp -t http --host 127.0.0.1 --port 8000
Restart=on-failure
[Install]
WantedBy=multi-user.target
```
Then `systemctl enable --now vikunja-mcp` and point your clients at `http://127.0.0.1:8000/mcp`. Put it behind nginx or Caddy with TLS if you expose it beyond localhost.
Then `systemctl enable --now vikunja-mcp` and point your clients at `http://127.0.0.1:8000/`. Put it behind nginx or Caddy with TLS if you expose it beyond localhost.
## Client Configuration
The three transports use different config shapes. stdio clients launch the server as a subprocess. Network clients *(SSE, Streamable HTTP)* connect to an already running daemon by URL. Examples below cover Claude Code, Claude Desktop, Cursor, and local LLMs via Continue. The same principles apply to any other MCP client.
The two transports use different config shapes. stdio clients launch the server as a subprocess (`go run ...` or a local binary). Streamable HTTP clients connect to an already running daemon by URL. Examples below cover Claude Code, Claude Desktop, Cursor, and local LLMs via Continue. The same principles apply to any other MCP client.
### Stdio
### Stdio (recommended)
The client launches `server.py` as a subprocess and talks to it over the pipe. Simplest setup, zero network exposure.
The client launches the server as a subprocess and talks to it over the pipe. Simplest setup, zero network exposure.
**Claude Code** *(`~/.config/claude-code/mcp.json`)*:
**Claude Code** — one-liner install with `go run`, no clone required:
```
claude mcp add --transport stdio --scope user vikunja \
--env VIKUNJA_URL=http://localhost:3456 \
--env VIKUNJA_TOKEN=tk_your_token_here \
-- go run github.com/acidvegas/vikunja-mcp@latest -t stdio
```
The same setup expressed as a JSON file *(`~/.config/claude-code/mcp.json`)*:
```json
{
"mcpServers": {
"vikunja": {
"command": "python3",
"args": ["/absolute/path/to/vikunja-mcp/server.py"],
"command": "go",
"args": ["run", "github.com/acidvegas/vikunja-mcp@latest", "-t", "stdio"],
"env": {
"VIKUNJA_URL": "http://localhost:3456",
"VIKUNJA_TOKEN": "tk_your_token_here"
@@ -279,8 +289,8 @@ The client launches `server.py` as a subprocess and talks to it over the pipe. S
{
"mcpServers": {
"vikunja": {
"command": "python3",
"args": ["/absolute/path/to/vikunja-mcp/server.py"],
"command": "go",
"args": ["run", "github.com/acidvegas/vikunja-mcp@latest", "-t", "stdio"],
"env": {
"VIKUNJA_URL": "http://localhost:3456",
"VIKUNJA_TOKEN": "tk_your_token_here"
@@ -295,8 +305,8 @@ The client launches `server.py` as a subprocess and talks to it over the pipe. S
{
"mcpServers": {
"vikunja": {
"command": "python3",
"args": ["/absolute/path/to/vikunja-mcp/server.py"],
"command": "go",
"args": ["run", "github.com/acidvegas/vikunja-mcp@latest", "-t", "stdio"],
"env": {
"VIKUNJA_URL": "http://localhost:3456",
"VIKUNJA_TOKEN": "tk_your_token_here"
@@ -311,66 +321,27 @@ The client launches `server.py` as a subprocess and talks to it over the pipe. S
experimental:
mcpServers:
- name: vikunja
command: python3
command: go
args:
- /absolute/path/to/vikunja-mcp/server.py
- run
- github.com/acidvegas/vikunja-mcp@latest
- -t
- stdio
env:
VIKUNJA_URL: http://localhost:3456
VIKUNJA_TOKEN: tk_your_token_here
```
Any stdio capable MCP client *(Zed, LM Studio, generic MCP runners)* uses the same `command` + `args` + `env` shape. Restart the client after editing its config.
### SSE
Start the server as a daemon *(see [Transports](#transports))*, then point your client at the URL. No subprocess, no environment variables in the client config — the server already has `VIKUNJA_URL` and `VIKUNJA_TOKEN` from its own `.env`.
Start the server once:
```
python3 server.py --transport sse --host 127.0.0.1 --port 8000
```
**Claude Code** *(`~/.config/claude-code/mcp.json`)*:
```json
{
"mcpServers": {
"vikunja": {
"type": "sse",
"url": "http://127.0.0.1:8000/sse"
}
}
}
```
**Cursor** *(`~/.cursor/mcp.json`)*:
```json
{
"mcpServers": {
"vikunja": {
"url": "http://127.0.0.1:8000/sse"
}
}
}
```
**Continue** *(`config.yaml`)*:
```yaml
experimental:
mcpServers:
- name: vikunja
type: sse
url: http://127.0.0.1:8000/sse
```
Claude Desktop does not support SSE directly — use the Streamable HTTP config below, or wrap the daemon in a stdio shim with [`mcp-proxy`](https://github.com/sparfenyuk/mcp-proxy) if you specifically need SSE.
If you built a local binary instead, replace `command: go` and the `args` with `command: /absolute/path/to/vikunja-mcp` and `args: ["-t", "stdio"]`. Any stdio capable MCP client *(Zed, LM Studio, generic MCP runners)* uses the same `command` + `args` + `env` shape. Restart the client after editing its config.
### Streamable HTTP
Same pattern as SSE. Start the daemon, point the client at the `/mcp` URL. This is the current MCP HTTP transport and is what you should prefer when the client supports it.
Start the daemon, then point the client at its URL. This is the current MCP HTTP transport and is what you should prefer when the client supports it. No subprocess, no environment variables in the client config — the daemon already has `VIKUNJA_URL` and `VIKUNJA_TOKEN` in its own environment.
Start the server once:
```
python3 server.py --transport http --host 127.0.0.1 --port 8000
VIKUNJA_URL=http://localhost:3456 VIKUNJA_TOKEN=tk_your_token_here \
go run github.com/acidvegas/vikunja-mcp@latest -t http --host 127.0.0.1 --port 8000
```
**Claude Code** *(`~/.config/claude-code/mcp.json`)*:
@@ -379,7 +350,7 @@ python3 server.py --transport http --host 127.0.0.1 --port 8000
"mcpServers": {
"vikunja": {
"type": "http",
"url": "http://127.0.0.1:8000/mcp"
"url": "http://127.0.0.1:8000/"
}
}
}
@@ -391,7 +362,7 @@ python3 server.py --transport http --host 127.0.0.1 --port 8000
"mcpServers": {
"vikunja": {
"type": "streamable-http",
"url": "http://127.0.0.1:8000/mcp"
"url": "http://127.0.0.1:8000/"
}
}
}
@@ -402,7 +373,7 @@ python3 server.py --transport http --host 127.0.0.1 --port 8000
{
"mcpServers": {
"vikunja": {
"url": "http://127.0.0.1:8000/mcp"
"url": "http://127.0.0.1:8000/"
}
}
}
@@ -414,10 +385,10 @@ experimental:
mcpServers:
- name: vikunja
type: streamable-http
url: http://127.0.0.1:8000/mcp
url: http://127.0.0.1:8000/
```
For a local LLM setup where the local model runs on the same box, stdio is still the fastest path *(no TCP overhead, no daemon)*. Use SSE or Streamable HTTP when the client and server are on different machines, when multiple clients need to share one running server, or when you want the server to persist across client restarts.
For a local LLM setup where the local model runs on the same box, stdio is still the fastest path *(no TCP overhead, no daemon)*. Use Streamable HTTP when the client and server are on different machines, when multiple clients need to share one running server, or when you want the server to persist across client restarts.
## Usage Patterns

125
allowlist.go Normal file
View File

@@ -0,0 +1,125 @@
// Vikunja MCP - Developed by acidvegas in Go (https://git.acid.vegas)
// vikunja-mcp/allowlist.go
package main
type methodPath struct {
method string
path string
}
// Curated allowlist of endpoints the MCP exposes. Anything not in this set is
// dropped at build time. Keeps the tool surface small, predictable, and safe.
var allowlist = map[methodPath]bool{
// server / user
{"GET", "/info"}: true,
{"GET", "/user"}: true,
{"GET", "/users"}: true,
// projects
{"GET", "/projects"}: true,
{"PUT", "/projects"}: true,
{"GET", "/projects/{id}"}: true,
{"POST", "/projects/{id}"}: true,
{"DELETE", "/projects/{id}"}: true,
{"PUT", "/projects/{projectID}/duplicate"}: true,
{"GET", "/projects/{id}/projectusers"}: true,
// views
{"GET", "/projects/{project}/views"}: true,
{"PUT", "/projects/{project}/views"}: true,
{"GET", "/projects/{project}/views/{id}"}: true,
{"POST", "/projects/{project}/views/{id}"}: true,
{"DELETE", "/projects/{project}/views/{id}"}: true,
// buckets
{"GET", "/projects/{id}/views/{view}/buckets"}: true,
{"PUT", "/projects/{id}/views/{view}/buckets"}: true,
{"POST", "/projects/{projectID}/views/{view}/buckets/{bucketID}"}: true,
{"DELETE", "/projects/{projectID}/views/{view}/buckets/{bucketID}"}: true,
// tasks
{"GET", "/tasks"}: true,
{"GET", "/tasks/{id}"}: true,
{"POST", "/tasks/{id}"}: true,
{"DELETE", "/tasks/{id}"}: true,
{"PUT", "/projects/{id}/tasks"}: true,
{"POST", "/tasks/bulk"}: true,
{"POST", "/tasks/{id}/position"}: true,
{"POST", "/tasks/{projecttask}/read"}: true,
{"GET", "/projects/{id}/views/{view}/tasks"}: true,
{"POST", "/projects/{project}/views/{view}/buckets/{bucket}/tasks"}: true,
// task relations
{"PUT", "/tasks/{taskID}/relations"}: true,
{"DELETE", "/tasks/{taskID}/relations/{relationKind}/{otherTaskID}"}: true,
// assignees
{"GET", "/tasks/{taskID}/assignees"}: true,
{"PUT", "/tasks/{taskID}/assignees"}: true,
{"POST", "/tasks/{taskID}/assignees/bulk"}: true,
{"DELETE", "/tasks/{taskID}/assignees/{userID}"}: true,
// labels
{"GET", "/labels"}: true,
{"PUT", "/labels"}: true,
{"GET", "/labels/{id}"}: true,
{"POST", "/labels/{id}"}: true,
{"DELETE", "/labels/{id}"}: true,
{"GET", "/tasks/{task}/labels"}: true,
{"PUT", "/tasks/{task}/labels"}: true,
{"DELETE", "/tasks/{task}/labels/{label}"}: true,
{"POST", "/tasks/{taskID}/labels/bulk"}: true,
// comments
{"GET", "/tasks/{taskID}/comments"}: true,
{"GET", "/tasks/{taskID}/comments/{commentID}"}: true,
{"PUT", "/tasks/{taskID}/comments"}: true,
{"POST", "/tasks/{taskID}/comments/{commentID}"}: true,
{"DELETE", "/tasks/{taskID}/comments/{commentID}"}: true,
// attachments
{"GET", "/tasks/{id}/attachments"}: true,
{"GET", "/tasks/{id}/attachments/{attachmentID}"}: true,
{"PUT", "/tasks/{id}/attachments"}: true,
{"DELETE", "/tasks/{id}/attachments/{attachmentID}"}: true,
// reactions
{"GET", "/{kind}/{id}/reactions"}: true,
{"PUT", "/{kind}/{id}/reactions"}: true,
{"POST", "/{kind}/{id}/reactions/delete"}: true,
// filters
{"PUT", "/filters"}: true,
{"GET", "/filters/{id}"}: true,
{"POST", "/filters/{id}"}: true,
{"DELETE", "/filters/{id}"}: true,
// teams
{"GET", "/teams"}: true,
{"PUT", "/teams"}: true,
{"GET", "/teams/{id}"}: true,
{"POST", "/teams/{id}"}: true,
{"DELETE", "/teams/{id}"}: true,
{"PUT", "/teams/{id}/members"}: true,
{"POST", "/teams/{id}/members/{userID}/admin"}: true,
{"DELETE", "/teams/{id}/members/{username}"}: true,
// sharing
{"GET", "/projects/{id}/users"}: true,
{"PUT", "/projects/{id}/users"}: true,
{"POST", "/projects/{projectID}/users/{userID}"}: true,
{"DELETE", "/projects/{projectID}/users/{userID}"}: true,
{"GET", "/projects/{id}/teams"}: true,
{"PUT", "/projects/{id}/teams"}: true,
{"POST", "/projects/{projectID}/teams/{teamID}"}: true,
{"DELETE", "/projects/{projectID}/teams/{teamID}"}: true,
// link shares
{"GET", "/projects/{project}/shares"}: true,
{"GET", "/projects/{project}/shares/{share}"}: true,
{"PUT", "/projects/{project}/shares"}: true,
{"DELETE", "/projects/{project}/shares/{share}"}: true,
// subscriptions / notifications
{"PUT", "/subscriptions/{entity}/{entityID}"}: true,
{"DELETE", "/subscriptions/{entity}/{entityID}"}: true,
{"GET", "/notifications"}: true,
{"POST", "/notifications"}: true,
{"POST", "/notifications/{id}"}: true,
// webhooks
{"GET", "/projects/{id}/webhooks"}: true,
{"PUT", "/projects/{id}/webhooks"}: true,
{"POST", "/projects/{id}/webhooks/{webhookID}"}: true,
{"DELETE", "/projects/{id}/webhooks/{webhookID}"}: true,
{"GET", "/webhooks/events"}: true,
}
func isAllowlisted(method, path string) bool {
return allowlist[methodPath{method, path}]
}

14
go.mod Normal file
View File

@@ -0,0 +1,14 @@
module github.com/acidvegas/vikunja-mcp
go 1.25.0
require github.com/modelcontextprotocol/go-sdk v1.5.0
require (
github.com/google/jsonschema-go v0.4.2 // indirect
github.com/segmentio/asm v1.1.3 // indirect
github.com/segmentio/encoding v0.5.4 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
golang.org/x/oauth2 v0.35.0 // indirect
golang.org/x/sys v0.41.0 // indirect
)

20
go.sum Normal file
View File

@@ -0,0 +1,20 @@
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/jsonschema-go v0.4.2 h1:tmrUohrwoLZZS/P3x7ex0WAVknEkBZM46iALbcqoRA8=
github.com/google/jsonschema-go v0.4.2/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE=
github.com/modelcontextprotocol/go-sdk v1.5.0 h1:CHU0FIX9kpueNkxuYtfYQn1Z0slhFzBZuq+x6IiblIU=
github.com/modelcontextprotocol/go-sdk v1.5.0/go.mod h1:gggDIhoemhWs3BGkGwd1umzEXCEMMvAnhTrnbXJKKKA=
github.com/segmentio/asm v1.1.3 h1:WM03sfUOENvvKexOLp+pCqgb/WDjsi7EK8gIsICtzhc=
github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg=
github.com/segmentio/encoding v0.5.4 h1:OW1VRern8Nw6ITAtwSZ7Idrl3MXCFwXHPgqESYfvNt0=
github.com/segmentio/encoding v0.5.4/go.mod h1:HS1ZKa3kSN32ZHVZ7ZLPLXWvOVIiZtyJnO1gPH1sKt0=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ=
golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=

108
main.go Normal file
View File

@@ -0,0 +1,108 @@
// Vikunja MCP - Developed by acidvegas in Go (https://git.acid.vegas)
// vikunja-mcp/main.go
package main
import (
"context"
_ "embed"
"flag"
"fmt"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
//go:embed instructions.txt
var instructions string
type config struct {
baseURL string
token string
httpClient *http.Client
}
func main() {
log.SetFlags(0)
log.SetPrefix("vikunja-mcp: ")
vikunjaURL := strings.TrimRight(os.Getenv("VIKUNJA_URL"), "/")
if vikunjaURL == "" {
log.Fatal("VIKUNJA_URL is not set")
}
token := os.Getenv("VIKUNJA_TOKEN")
if token == "" {
log.Fatal("VIKUNJA_TOKEN is not set")
}
defaultTransport := envOr("VIKUNJA_MCP_TRANSPORT", "stdio")
defaultHost := envOr("VIKUNJA_MCP_HOST", "127.0.0.1")
defaultPort := 8000
if v := os.Getenv("VIKUNJA_MCP_PORT"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
defaultPort = n
}
}
var transport, host string
var port int
flag.StringVar(&transport, "transport", defaultTransport, "Transport: stdio or http")
flag.StringVar(&transport, "t", defaultTransport, "Transport (shorthand)")
flag.StringVar(&host, "host", defaultHost, "Host for http transport")
flag.IntVar(&port, "port", defaultPort, "Port for http transport")
flag.Parse()
cfg := &config{
baseURL: vikunjaURL + "/api/v1",
token: token,
httpClient: &http.Client{Timeout: 30 * time.Second},
}
spec, err := loadSpec(cfg.baseURL + "/docs.json")
if err != nil {
log.Fatalf("spec load failed: %v", err)
}
patchSpec(spec)
server := mcp.NewServer(
&mcp.Implementation{Name: "vikunja", Version: "1.0.0"},
&mcp.ServerOptions{Instructions: instructions},
)
n := registerTools(server, cfg, spec)
log.Printf("loaded %d tools from vikunja spec", n)
switch strings.ToLower(transport) {
case "stdio":
if err := server.Run(context.Background(), &mcp.StdioTransport{}); err != nil {
log.Fatal(err)
}
case "http":
handler := mcp.NewStreamableHTTPHandler(func(*http.Request) *mcp.Server { return server }, nil)
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/plain")
_, _ = w.Write([]byte("ok"))
})
mux.Handle("/", handler)
addr := fmt.Sprintf("%s:%d", host, port)
log.Printf("http listening on %s", addr)
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatal(err)
}
default:
log.Fatalf("unknown transport: %s (use stdio or http)", transport)
}
}
func envOr(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}

View File

@@ -1,8 +0,0 @@
# Vikunja MCP - Developed by acidvegas in Python (https://git.acid.vegas)
# vikunja/mcp/requirements.txt
aiohttp>=3.9
mcp>=1.0
python-dotenv>=1.0
starlette>=0.37
uvicorn>=0.30

550
server.py
View File

@@ -1,550 +0,0 @@
#!/usr/bin/env python3
# Vikunja MCP - Developed by acidvegas in Python (https://git.acid.vegas)
# vikunja-mcp/server.py
import argparse
import asyncio
import contextlib
import logging
import os
import re
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
except ImportError:
raise ImportError('missing mcp library (pip install mcp)')
try:
from dotenv import load_dotenv
except ImportError:
raise ImportError('missing python-dotenv library (pip install python-dotenv)')
load_dotenv()
log = logging.getLogger('vikunja-mcp')
_vikunja_url = os.getenv('VIKUNJA_URL', '').rstrip('/')
if not _vikunja_url:
raise SystemExit('VIKUNJA_URL is not set. Export it or add it to .env')
TOKEN = os.getenv('VIKUNJA_TOKEN', '')
if not TOKEN:
raise SystemExit('VIKUNJA_TOKEN is not set. Export it or add it to .env')
BASE_URL = _vikunja_url + '/api/v1'
SPEC_URL = BASE_URL + '/docs.json'
HEADERS = {'Authorization': f'Bearer {TOKEN}', 'Accept': 'application/json'}
TRANSPORT_CHOICES = ('stdio', 'sse', 'http')
DEFAULT_TRANSPORT = os.getenv('VIKUNJA_MCP_TRANSPORT', 'stdio').lower()
DEFAULT_HOST = os.getenv('VIKUNJA_MCP_HOST', '127.0.0.1')
DEFAULT_PORT = int(os.getenv('VIKUNJA_MCP_PORT', '8000'))
# Curated allowlist of endpoints the MCP exposes. Anything not in this set is
# dropped at build time. Keeps the tool surface small, predictable, and safe.
ALLOWLIST = frozenset({
# server / user
('GET', '/info'),
('GET', '/user'),
('GET', '/users'),
# projects
('GET', '/projects'),
('PUT', '/projects'),
('GET', '/projects/{id}'),
('POST', '/projects/{id}'),
('DELETE', '/projects/{id}'),
('PUT', '/projects/{projectID}/duplicate'),
('GET', '/projects/{id}/projectusers'),
# views
('GET', '/projects/{project}/views'),
('PUT', '/projects/{project}/views'),
('GET', '/projects/{project}/views/{id}'),
('POST', '/projects/{project}/views/{id}'),
('DELETE', '/projects/{project}/views/{id}'),
# buckets
('GET', '/projects/{id}/views/{view}/buckets'),
('PUT', '/projects/{id}/views/{view}/buckets'),
('POST', '/projects/{projectID}/views/{view}/buckets/{bucketID}'),
('DELETE', '/projects/{projectID}/views/{view}/buckets/{bucketID}'),
# tasks
('GET', '/tasks'),
('GET', '/tasks/{id}'),
('POST', '/tasks/{id}'),
('DELETE', '/tasks/{id}'),
('PUT', '/projects/{id}/tasks'),
('POST', '/tasks/bulk'),
('POST', '/tasks/{id}/position'),
('POST', '/tasks/{projecttask}/read'),
('GET', '/projects/{id}/views/{view}/tasks'),
('POST', '/projects/{project}/views/{view}/buckets/{bucket}/tasks'),
# task relations
('PUT', '/tasks/{taskID}/relations'),
('DELETE', '/tasks/{taskID}/relations/{relationKind}/{otherTaskID}'),
# assignees
('GET', '/tasks/{taskID}/assignees'),
('PUT', '/tasks/{taskID}/assignees'),
('POST', '/tasks/{taskID}/assignees/bulk'),
('DELETE', '/tasks/{taskID}/assignees/{userID}'),
# labels
('GET', '/labels'),
('PUT', '/labels'),
('GET', '/labels/{id}'),
('POST', '/labels/{id}'),
('DELETE', '/labels/{id}'),
('GET', '/tasks/{task}/labels'),
('PUT', '/tasks/{task}/labels'),
('DELETE', '/tasks/{task}/labels/{label}'),
('POST', '/tasks/{taskID}/labels/bulk'),
# comments
('GET', '/tasks/{taskID}/comments'),
('GET', '/tasks/{taskID}/comments/{commentID}'),
('PUT', '/tasks/{taskID}/comments'),
('POST', '/tasks/{taskID}/comments/{commentID}'),
('DELETE', '/tasks/{taskID}/comments/{commentID}'),
# attachments
('GET', '/tasks/{id}/attachments'),
('GET', '/tasks/{id}/attachments/{attachmentID}'),
('PUT', '/tasks/{id}/attachments'),
('DELETE', '/tasks/{id}/attachments/{attachmentID}'),
# reactions
('GET', '/{kind}/{id}/reactions'),
('PUT', '/{kind}/{id}/reactions'),
('POST', '/{kind}/{id}/reactions/delete'),
# filters
('PUT', '/filters'),
('GET', '/filters/{id}'),
('POST', '/filters/{id}'),
('DELETE', '/filters/{id}'),
# teams
('GET', '/teams'),
('PUT', '/teams'),
('GET', '/teams/{id}'),
('POST', '/teams/{id}'),
('DELETE', '/teams/{id}'),
('PUT', '/teams/{id}/members'),
('POST', '/teams/{id}/members/{userID}/admin'),
('DELETE', '/teams/{id}/members/{username}'),
# sharing
('GET', '/projects/{id}/users'),
('PUT', '/projects/{id}/users'),
('POST', '/projects/{projectID}/users/{userID}'),
('DELETE', '/projects/{projectID}/users/{userID}'),
('GET', '/projects/{id}/teams'),
('PUT', '/projects/{id}/teams'),
('POST', '/projects/{projectID}/teams/{teamID}'),
('DELETE', '/projects/{projectID}/teams/{teamID}'),
# link shares
('GET', '/projects/{project}/shares'),
('GET', '/projects/{project}/shares/{share}'),
('PUT', '/projects/{project}/shares'),
('DELETE', '/projects/{project}/shares/{share}'),
# subscriptions / notifications
('PUT', '/subscriptions/{entity}/{entityID}'),
('DELETE', '/subscriptions/{entity}/{entityID}'),
('GET', '/notifications'),
('POST', '/notifications'),
('POST', '/notifications/{id}'),
# webhooks
('GET', '/projects/{id}/webhooks'),
('PUT', '/projects/{id}/webhooks'),
('POST', '/projects/{id}/webhooks/{webhookID}'),
('DELETE', '/projects/{id}/webhooks/{webhookID}'),
('GET', '/webhooks/events'),
})
_instructions_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instructions.txt')
try:
with open(_instructions_path, 'r') as f:
INSTRUCTIONS = f.read()
except FileNotFoundError:
raise SystemExit(f'instructions file not found: {_instructions_path}')
async def build_tools(spec: dict) -> tuple:
'''Walk an OpenAPI 2 spec and build an MCP tool list and lookup table.
Applies the ALLOWLIST filter and patches known Vikunja spec bugs before
generating tools.
:param spec: Parsed OpenAPI document
'''
patch_spec(spec)
tools = []
index = {}
for path, methods in spec.get('paths', {}).items():
for method, op in methods.items():
if method not in ('get', 'post', 'put', 'delete', 'patch'):
continue
if (method.upper(), path) not in ALLOWLIST:
continue
name = sanitize_name(op.get('operationId') or f'{method}_{path}')
desc = (op.get('summary') or op.get('description') or f'{method.upper()} {path}').strip()[:1024]
schema, required = {}, []
for param in op.get('parameters', []) or []:
pname = param['name']
loc = param.get('in')
if loc == 'body':
body_schema = resolve_body_schema(param, spec)
body_schema.setdefault('description', f'Request body for {method.upper()} {path}')
schema[pname] = body_schema
else:
schema[pname] = {'type': openapi_to_json(param.get('type', 'string')), 'description': (param.get('description') or f'{loc} parameter').strip()}
if param.get('required'):
required.append(pname)
tools.append(Tool(name=name, description=desc, inputSchema={'type': 'object', 'properties': schema, 'required': required}))
index[name] = {'method': method.upper(), 'path': path, 'params': op.get('parameters', []) or []}
return tools, index
async def call_endpoint(session, spec_op: dict, args: dict) -> str:
'''Execute a Vikunja API call for a single MCP tool invocation.
:param session: Shared aiohttp client session
:param spec_op: Operation descriptor from the index returned by build_tools
:param args: Arguments supplied by the MCP client
'''
path = spec_op['path']
query = {}
body = None
for param in spec_op['params']:
pname = param['name']
if pname not in args or args[pname] is None:
continue
loc = param.get('in')
if loc == 'path':
path = path.replace('{' + pname + '}', str(args[pname]))
elif loc == 'query':
query[pname] = args[pname]
elif loc == 'body':
body = args[pname]
unresolved = re.findall(r'\{(\w+)\}', path)
if unresolved:
msg = f'error: missing required path parameters: {", ".join(unresolved)}'
log.warning(msg)
return msg
async with session.request(spec_op['method'], BASE_URL + path, params=query or None, json=body, headers=HEADERS) as resp:
text = await resp.text()
if resp.status >= 400:
log.warning('%s %s -> HTTP %d', spec_op['method'], path, resp.status)
return f'HTTP {resp.status}\n{text}'
async def load_spec() -> dict:
'''Fetch the Vikunja OpenAPI document from the running server.
Retries up to three times with exponential backoff when the server is
temporarily unreachable.
'''
for attempt in range(3):
try:
async with aiohttp.ClientSession() as session:
async with session.get(SPEC_URL, timeout=aiohttp.ClientTimeout(total=15)) as resp:
resp.raise_for_status()
return await resp.json(content_type=None)
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
if attempt == 2:
raise
delay = 2 ** (attempt + 1)
log.warning('spec load attempt %d failed (%s), retrying in %ds', attempt + 1, exc, delay)
await asyncio.sleep(delay)
async def build_server() -> tuple:
'''Build a configured MCP Server instance and its backing aiohttp session.
Returns a (server, aiohttp_session) tuple. The caller is responsible for
closing the aiohttp session when the server shuts down.
'''
spec = await load_spec()
tools, index = await build_tools(spec)
log.info('loaded %d tools from vikunja spec', len(tools))
server = Server('vikunja', instructions=INSTRUCTIONS)
session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30))
@server.list_tools()
async def _list_tools():
return tools
@server.call_tool()
async def _call_tool(name: str, arguments: dict):
op = index.get(name)
if op is None:
log.warning('unknown tool requested: %s', name)
return [TextContent(type='text', text=f'unknown tool: {name}')]
log.info('tool call: %s -> %s %s', name, op['method'], op['path'])
result = await call_endpoint(session, op, arguments or {})
return [TextContent(type='text', text=result)]
return server, session
async def run_stdio():
'''Serve the MCP protocol over stdio.'''
server, session = await build_server()
try:
async with stdio_server() as (read, write):
await server.run(read, write, server.create_initialization_options())
finally:
await session.close()
def run_sse(host: str, port: int):
'''Serve the MCP protocol over the legacy SSE HTTP transport.
Exposes two endpoints:
GET /sse - long-lived Server-Sent Events stream
POST /messages/ - client -> server JSON-RPC messages
:param host: Interface to bind the HTTP server to
:param port: TCP port to listen on
'''
try:
import uvicorn
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.responses import Response
from starlette.routing import Mount, Route
except ImportError:
raise ImportError('missing starlette/uvicorn (pip install starlette uvicorn)')
sse = SseServerTransport('/messages/')
state = {}
class SSEApp:
async def __call__(self, scope, receive, send):
server = state['server']
async with sse.connect_sse(scope, receive, send) as (read, write):
await server.run(read, write, server.create_initialization_options())
async def handle_health(_request):
return Response('ok', media_type='text/plain')
@contextlib.asynccontextmanager
async def lifespan(_app):
server, aio_session = await build_server()
state['server'] = server
state['session'] = aio_session
try:
yield
finally:
await aio_session.close()
app = Starlette(
debug=False,
routes=[
Route('/sse', endpoint=SSEApp()),
Route('/health', endpoint=handle_health, methods=['GET']),
Mount('/messages/', app=sse.handle_post_message),
],
lifespan=lifespan,
)
uvicorn.run(app, host=host, port=port, log_level='info')
def run_http(host: str, port: int):
'''Serve the MCP protocol over the Streamable HTTP transport.
Exposes a single endpoint at /mcp that handles both GET (SSE stream for
server -> client messages) and POST (client -> server JSON-RPC messages)
per the current MCP Streamable HTTP spec.
:param host: Interface to bind the HTTP server to
:param port: TCP port to listen on
'''
try:
import uvicorn
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.responses import Response
from starlette.routing import Route
except ImportError:
raise ImportError('missing starlette/uvicorn (pip install starlette uvicorn)')
state = {}
class MCPApp:
'''ASGI wrapper so Starlette treats this as a mounted app rather than a request handler.'''
async def __call__(self, scope, receive, send):
await state['manager'].handle_request(scope, receive, send)
async def handle_health(_request):
return Response('ok', media_type='text/plain')
@contextlib.asynccontextmanager
async def lifespan(_app):
server, aio_session = await build_server()
manager = StreamableHTTPSessionManager(app=server, event_store=None, json_response=False, stateless=False)
state['manager'] = manager
state['session'] = aio_session
async with manager.run():
try:
yield
finally:
await aio_session.close()
app = Starlette(
debug=False,
routes=[
Route('/mcp', endpoint=MCPApp()),
Route('/health', endpoint=handle_health, methods=['GET']),
],
lifespan=lifespan,
)
uvicorn.run(app, host=host, port=port, log_level='info')
def parse_args():
'''Parse CLI arguments for transport selection.'''
parser = argparse.ArgumentParser(description='Vikunja MCP server', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--transport', choices=TRANSPORT_CHOICES, default=DEFAULT_TRANSPORT, help='Transport to expose the MCP server over')
parser.add_argument('--host', default=DEFAULT_HOST, help='Host interface for sse and http transports')
parser.add_argument('--port', default=DEFAULT_PORT, type=int, help='TCP port for sse and http transports')
return parser.parse_args()
def main():
'''Entry point. Dispatch to the selected transport.'''
logging.basicConfig(level=logging.INFO, format='%(levelname)s %(name)s: %(message)s')
args = parse_args()
if args.transport == 'stdio':
asyncio.run(run_stdio())
elif args.transport == 'sse':
run_sse(args.host, args.port)
elif args.transport == 'http':
run_http(args.host, args.port)
def openapi_to_json(t: str) -> str:
'''Map an OpenAPI 2 primitive type to a JSON Schema primitive type.
:param t: OpenAPI type name *(string, integer, number, boolean, array, file)*
'''
return {'integer': 'integer', 'number': 'number', 'boolean': 'boolean', 'array': 'array', 'file': 'string'}.get(t, 'string')
def resolve_ref(spec: dict, ref: str) -> dict:
'''Follow a JSON Reference pointer inside an OpenAPI spec.
:param spec: Full OpenAPI document
:param ref: Reference string, e.g. "#/definitions/models.Task"
'''
node = spec
for part in ref.lstrip('#/').split('/'):
node = node.get(part, {})
return node
def resolve_body_schema(param: dict, spec: dict) -> dict:
'''Build a JSON Schema dict for a body parameter, resolving $ref when present.
Extracts top-level property names and types from the referenced definition
so the LLM knows what fields to include in the request body.
:param param: OpenAPI body parameter object
:param spec: Full OpenAPI document for $ref resolution
'''
schema = param.get('schema', {})
if '$ref' in schema:
schema = resolve_ref(spec, schema['$ref'])
if schema.get('type') != 'object' or 'properties' not in schema:
return {'type': 'object'}
props = {}
for name, prop in schema.get('properties', {}).items():
entry = {'type': openapi_to_json(prop.get('type', 'string'))}
if 'description' in prop:
entry['description'] = prop['description'].strip()
if prop.get('type') == 'array' and 'items' in prop:
items = prop['items']
if '$ref' in items:
items = resolve_ref(spec, items['$ref'])
entry['items'] = {'type': openapi_to_json(items.get('type', 'string'))}
props[name] = entry
return {'type': 'object', 'properties': props}
SPEC_PATCHES = [
('/labels/{id}', 'put', 'post'),
]
def patch_spec(spec: dict):
'''Apply known fixes to the upstream Vikunja OpenAPI spec in place.
Each entry in SPEC_PATCHES is a (path, wrong_method, correct_method) tuple.
The operation is moved from the wrong method to the correct one only when
the correct method is not already present.
:param spec: Parsed OpenAPI document to mutate
'''
paths = spec.get('paths', {})
for path, wrong, correct in SPEC_PATCHES:
node = paths.get(path)
if node and wrong in node and correct not in node:
node[correct] = node.pop(wrong)
log.info('spec patch: %s %s -> %s', path, wrong.upper(), correct.upper())
def sanitize_name(raw: str) -> str:
'''Sanitize an OpenAPI operationId or generated name into an MCP tool name.
:param raw: Candidate tool name
'''
name = re.sub(r'[^a-zA-Z0-9_-]+', '_', raw).strip('_')[:64]
return name or 'op'
if __name__ == '__main__':
main()

150
spec.go Normal file
View File

@@ -0,0 +1,150 @@
// Vikunja MCP - Developed by acidvegas in Go (https://git.acid.vegas)
// vikunja-mcp/spec.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
)
// loadSpec fetches the Vikunja OpenAPI document from the running server.
// Retries up to three times with exponential backoff when the server is
// temporarily unreachable.
func loadSpec(url string) (map[string]any, error) {
client := &http.Client{Timeout: 15 * time.Second}
var lastErr error
for attempt := 0; attempt < 3; attempt++ {
resp, err := client.Get(url)
if err != nil {
lastErr = err
} else {
if resp.StatusCode >= 400 {
resp.Body.Close()
lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
} else {
var spec map[string]any
err := json.NewDecoder(resp.Body).Decode(&spec)
resp.Body.Close()
if err == nil {
return spec, nil
}
lastErr = err
}
}
if attempt < 2 {
delay := time.Duration(1<<(attempt+1)) * time.Second
log.Printf("spec load attempt %d failed (%v), retrying in %s", attempt+1, lastErr, delay)
time.Sleep(delay)
}
}
return nil, lastErr
}
var specPatches = []struct {
path string
wrong string
correct string
}{
{"/labels/{id}", "put", "post"},
}
// patchSpec applies known fixes to the upstream Vikunja OpenAPI spec in place.
// The operation is moved from the wrong method to the correct one only when
// the correct method is not already present.
func patchSpec(spec map[string]any) {
paths, _ := spec["paths"].(map[string]any)
for _, p := range specPatches {
node, ok := paths[p.path].(map[string]any)
if !ok {
continue
}
op, hasWrong := node[p.wrong]
if !hasWrong {
continue
}
if _, hasCorrect := node[p.correct]; hasCorrect {
continue
}
node[p.correct] = op
delete(node, p.wrong)
log.Printf("spec patch: %s %s -> %s", p.path, strings.ToUpper(p.wrong), strings.ToUpper(p.correct))
}
}
// resolveRef follows a JSON Reference pointer inside an OpenAPI spec.
func resolveRef(spec map[string]any, ref string) map[string]any {
var node any = spec
for _, part := range strings.Split(strings.TrimPrefix(ref, "#/"), "/") {
m, ok := node.(map[string]any)
if !ok {
return map[string]any{}
}
node = m[part]
}
if m, ok := node.(map[string]any); ok {
return m
}
return map[string]any{}
}
// openapiToJSON maps an OpenAPI 2 primitive type to a JSON Schema primitive type.
func openapiToJSON(t string) string {
switch t {
case "integer", "number", "boolean", "array":
return t
case "file":
return "string"
default:
return "string"
}
}
// resolveBodySchema builds a JSON Schema dict for a body parameter, resolving
// $ref when present. Extracts top-level property names and types from the
// referenced definition so the LLM knows what fields to include.
func resolveBodySchema(param map[string]any, spec map[string]any) map[string]any {
schema, _ := param["schema"].(map[string]any)
if schema == nil {
schema = map[string]any{}
}
if ref, ok := schema["$ref"].(string); ok {
schema = resolveRef(spec, ref)
}
if t, _ := schema["type"].(string); t != "object" {
return map[string]any{"type": "object"}
}
properties, ok := schema["properties"].(map[string]any)
if !ok {
return map[string]any{"type": "object"}
}
props := map[string]any{}
for name, prop := range properties {
propMap, ok := prop.(map[string]any)
if !ok {
continue
}
propType, _ := propMap["type"].(string)
entry := map[string]any{"type": openapiToJSON(propType)}
if desc, ok := propMap["description"].(string); ok {
entry["description"] = strings.TrimSpace(desc)
}
if propType == "array" {
if items, ok := propMap["items"].(map[string]any); ok {
if ref, ok := items["$ref"].(string); ok {
items = resolveRef(spec, ref)
}
itemType, _ := items["type"].(string)
entry["items"] = map[string]any{"type": openapiToJSON(itemType)}
}
}
props[name] = entry
}
return map[string]any{"type": "object", "properties": props}
}

575
test.py
View File

@@ -1,575 +0,0 @@
#!/usr/bin/env python3
# Vikunja MCP - Developed by acidvegas in Python (https://git.acid.vegas)
# vikunja-mcp/test.py
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime, timedelta, timezone
try:
from dotenv import load_dotenv
except ImportError:
raise ImportError('missing python-dotenv library (pip install python-dotenv)')
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
except ImportError:
raise ImportError('missing mcp library (pip install mcp)')
load_dotenv()
URL = os.getenv('VIKUNJA_URL', 'http://localhost:3456').rstrip('/')
TOKEN = os.getenv('VIKUNJA_TOKEN', '')
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SERVER_PATH = os.path.join(SCRIPT_DIR, 'server.py')
NOW = datetime.now(timezone.utc)
PROJECT_MEMORY = {
'title': 'Memory',
'description': 'Long term agent memory. Each task is a single memory entry, tagged with namespaced labels.',
'hex_color': '6366f1',
}
PROJECT_REPO = {
'title': 'vikunja-mcp',
'description': 'Per repository project tracking for the Vikunja MCP codebase.',
'hex_color': '10b981',
}
PROJECT_HOME = {
'title': 'home',
'description': 'Personal todos and reminders.',
'hex_color': 'f59e0b',
}
LABELS = {
'topic:postgres': '3b82f6',
'topic:docker': '3b82f6',
'topic:auth': '3b82f6',
'topic:deployment': '3b82f6',
'topic:infra': '3b82f6',
'person:alice': 'f59e0b',
'person:bob': 'f59e0b',
'source:slack': '6b7280',
'source:ops': '6b7280',
'source:meeting': '6b7280',
'kind:fact': '8b5cf6',
'kind:decision': '8b5cf6',
'kind:preference': '8b5cf6',
'kind:reference': '8b5cf6',
'bug': 'ef4444',
'feature': '10b981',
'refactor': '06b6d4',
'docs': '3b82f6',
'chore': '9ca3af',
'p0': 'dc2626',
'p1': 'f59e0b',
'p2': 'eab308',
}
MEMORIES = [
{
'title': 'Postgres maintenance window',
'description': 'The primary Postgres cluster fails over every Tuesday night at 02:00 UTC for patching. Do not start long migrations during this window.',
'labels': ['topic:postgres', 'kind:fact', 'source:ops'],
'priority': 3,
},
{
'title': 'Alice prefers async communication',
'description': 'Alice on the backend team prefers async updates in GitHub PR reviews over Slack pings. Batch questions and ping her no more than once a day.',
'labels': ['person:alice', 'kind:preference'],
'priority': 2,
},
{
'title': 'Bob on vacation',
'description': 'Bob is out of office from 2026-04-20 through 2026-05-02. Do not assign P0 issues to him during this window.',
'labels': ['person:bob', 'kind:fact'],
'priority': 2,
},
{
'title': 'JWT over server sessions',
'description': 'We chose JWT tokens instead of server side sessions for the REST API. Reasoning: horizontal scaling without a shared session store, cleaner cache semantics. Revisit if token revocation becomes a real concern.',
'labels': ['kind:decision', 'topic:auth'],
'priority': 3,
},
{
'title': 'Staging deploy needs force flag',
'description': 'The staging deploy script intentionally requires the force flag because of the symlinked config directory. Not a bug. Workaround until the config store migration ships.',
'labels': ['topic:deployment', 'kind:fact'],
'priority': 2,
},
{
'title': 'Oncall Slack channel',
'description': '#infra-oncall is the primary escalation channel. #general is unmonitored outside business hours. Page via PagerDuty if #infra-oncall is silent for more than 15 minutes on a P0.',
'labels': ['source:slack', 'kind:reference'],
'priority': 2,
},
{
'title': 'Docker socket proxy',
'description': 'Production hosts run a docker socket proxy at /var/run/docker-proxy.sock. The real docker socket is not exposed. All compose files and automation must target the proxy path.',
'labels': ['topic:docker', 'topic:infra', 'kind:fact'],
'priority': 3,
},
{
'title': 'SSL cert renewal hook',
'description': 'certbot renews SSL certs automatically via cron but the nginx reload hook is manual. Check /etc/letsencrypt/renewal-hooks/deploy/ after a renewal and reload nginx if needed.',
'labels': ['topic:infra', 'kind:fact', 'source:ops'],
'priority': 3,
},
]
REPO_TODOS = [
{
'title': 'Implement memory search tool',
'description': 'Add a higher level tool that takes a natural language query and returns the top matching memories. Should wrap get_tasks with a filter expression generator.',
'labels': ['feature', 'p1'],
'priority': 4,
'done': False,
},
{
'title': 'Write integration tests for MCP server',
'description': 'Cover the full tool list against a local Vikunja instance. Use pytest with an async fixture that starts a throwaway Vikunja container.',
'labels': ['feature', 'p2'],
'priority': 3,
'done': False,
},
{
'title': 'Document label namespace conventions',
'description': 'Write a short guide covering person:, topic:, source:, kind: and how to add your own namespaces. Target the README.',
'labels': ['docs', 'p2'],
'priority': 2,
'done': False,
},
{
'title': 'Stream responses for large queries',
'description': 'get_tasks with high per_page values returns a huge JSON blob. Investigate whether the MCP SDK can stream chunks to the client.',
'labels': ['feature', 'p2'],
'priority': 2,
'done': False,
},
{
'title': 'Fix path parameter substitution for kind',
'description': 'The reactions endpoints use a kind path parameter that is not always substituted correctly when the argument is missing. Add a validation step before firing the request.',
'labels': ['bug', 'p1'],
'priority': 4,
'done': False,
},
{
'title': 'Spec patch for labels update',
'description': 'Upstream spec documented PUT /labels/{id} but the server expects POST. Added a patch_spec step to rewrite the operation before build_tools runs.',
'labels': ['bug', 'p1'],
'priority': 4,
'done': True,
},
{
'title': 'Initial MCP server scaffolding',
'description': 'Set up the aiohttp transport, spec loader, tool builder, and stdio entry point.',
'labels': ['feature', 'p0'],
'priority': 5,
'done': True,
},
{
'title': 'Add allowlist filter',
'description': 'Hardcode the curated set of method and path tuples the MCP exposes. Drop everything else at build time.',
'labels': ['feature', 'p0'],
'priority': 5,
'done': True,
},
]
HOME_TODOS = [
{
'title': 'Pay electric bill',
'description': 'Autopay is off. Log in and pay manually.',
'priority': 4,
'done': False,
'due_offset': 3,
},
{
'title': 'Schedule dentist appointment',
'description': 'Last cleaning was eight months ago. Overdue.',
'priority': 2,
'done': False,
},
{
'title': 'Buy groceries',
'description': 'Check the fridge first. Low on eggs and milk.',
'priority': 3,
'done': False,
},
{
'title': 'Renew passport',
'description': 'Current one expires in October. Apply at least three months ahead.',
'priority': 3,
'done': False,
'due_offset': 30,
},
{
'title': 'Return library books',
'description': 'Three books on the shelf, due tomorrow.',
'priority': 3,
'done': False,
'due_offset': 1,
},
]
async def call(session, tool: str, **args):
'''Invoke an MCP tool and return the parsed JSON payload from the response.
The server wraps every reply as "HTTP {status}\\n{body}". Strip the status
line, then parse the body as JSON. Raise on non 2xx responses.
:param session: Open MCP ClientSession
:param tool: Tool name exposed by the MCP server
:param args: Keyword arguments forwarded as the tool arguments dict
'''
result = await session.call_tool(tool, arguments=args)
if result.isError:
text = result.content[0].text if result.content else '<empty>'
raise RuntimeError(f'mcp tool {tool} failed: {text}')
if not result.content:
return None
text = result.content[0].text
if text.startswith('HTTP '):
status_line, _, body = text.partition('\n')
status = int(status_line.split()[1])
if status >= 400:
raise RuntimeError(f'{tool} -> {status_line}\n{body[:400]}')
else:
body = text
if not body.strip():
return None
try:
return json.loads(body)
except ValueError:
return body
async def create_task(session, project_id: int, spec: dict, label_map: dict) -> dict:
'''Create a task through the MCP and attach its labels.
:param session: Open MCP ClientSession
:param project_id: Target project identifier
:param spec: Task definition *(title, description, priority, done, due_offset, labels)*
:param label_map: Name to label object map built by ensure_labels
'''
body = {'title': spec['title'], 'description': spec.get('description', '')}
if 'priority' in spec:
body['priority'] = spec['priority']
if spec.get('done'):
body['done'] = True
if 'due_offset' in spec:
body['due_date'] = (NOW + timedelta(days=spec['due_offset'])).isoformat().replace('+00:00', 'Z')
task = await call(session, 'put__projects_id_tasks', id=project_id, task=body)
for name in spec.get('labels', []):
label = label_map.get(name)
if label is None:
continue
await call(session, 'put__tasks_task_labels', task=task['id'], label={'label_id': label['id']})
return task
async def ensure_label(session, title: str, hex_color: str) -> dict:
'''Return an existing label by title, creating it if missing.
:param session: Open MCP ClientSession
:param title: Label title
:param hex_color: Label hex color, six chars without the hash
'''
existing = await call(session, 'get__labels', s=title, per_page=50) or []
for label in existing:
if label.get('title') == title:
return label
return await call(session, 'put__labels', label={'title': title, 'hex_color': hex_color})
async def ensure_labels(session) -> dict:
'''Ensure every label in LABELS exists and return a name to label map.
:param session: Open MCP ClientSession
'''
result = {}
for title, color in LABELS.items():
result[title] = await ensure_label(session, title, color)
print(f' label ready: {title}')
return result
async def ensure_project(session, spec: dict) -> dict:
'''Return an existing project by title, creating it if missing.
:param session: Open MCP ClientSession
:param spec: Project definition *(title, description, hex_color)*
'''
existing = await call(session, 'get__projects', s=spec['title'], per_page=50) or []
for project in existing:
if project.get('title') == spec['title']:
return project
return await call(session, 'put__projects', project=spec)
async def main():
'''Seed and validate a Vikunja instance through the MCP server end to end.'''
if not TOKEN:
print('error: VIKUNJA_TOKEN is not set in .env')
sys.exit(1)
if not os.path.isfile(SERVER_PATH):
print(f'error: mcp server not found at {SERVER_PATH}')
sys.exit(1)
print(f'mcp server : {SERVER_PATH}')
print(f'vikunja : {URL}')
print()
params = StdioServerParameters(
command = 'python3',
args = [SERVER_PATH],
env = {**os.environ, 'VIKUNJA_URL': URL, 'VIKUNJA_TOKEN': TOKEN},
)
async with stdio_client(params) as (read, write):
async with ClientSession(read, write) as session:
init = await session.initialize()
print(f'connected to : {init.serverInfo.name}')
listing = await session.list_tools()
print(f'tools exposed: {len(listing.tools)}')
print()
me = await call(session, 'get__user')
print(f'authenticated as {me["username"]} (id {me["id"]})')
print()
print('projects:')
memory = await ensure_project(session, PROJECT_MEMORY)
repo = await ensure_project(session, PROJECT_REPO)
home = await ensure_project(session, PROJECT_HOME)
print(f' Memory id={memory["id"]}')
print(f' vikunja-mcp id={repo["id"]}')
print(f' home id={home["id"]}')
print()
print('labels:')
label_map = await ensure_labels(session)
print()
print('wiping existing tasks in the seed projects...')
for pid in (memory['id'], repo['id'], home['id']):
await wipe_project_tasks(session, pid)
print()
print('seeding Memory:')
for spec in MEMORIES:
task = await create_task(session, memory['id'], spec, label_map)
print(f' {task["id"]:>4} {spec["title"]}')
print()
print('seeding vikunja-mcp:')
for spec in REPO_TODOS:
task = await create_task(session, repo['id'], spec, label_map)
status = 'done' if spec.get('done') else 'open'
print(f' {task["id"]:>4} [{status:>4}] {spec["title"]}')
print()
print('seeding home:')
for spec in HOME_TODOS:
task = await create_task(session, home['id'], spec, label_map)
print(f' {task["id"]:>4} {spec["title"]}')
print()
print('validation queries through the MCP:')
print()
postgres_id = label_map['topic:postgres']['id']
alice_id = label_map['person:alice']['id']
decision_id = label_map['kind:decision']['id']
p0_id = label_map['p0']['id']
p1_id = label_map['p1']['id']
await validate_query(session, 'memories tagged topic:postgres',
{'filter': f'labels = {postgres_id}'})
await validate_query(session, 'memories tagged person:alice',
{'filter': f'labels = {alice_id}'})
await validate_query(session, 'decisions recorded in memory',
{'filter': f'labels = {decision_id}'})
await validate_query(session, 'open vikunja-mcp work',
{'filter': f'project = {repo["id"]} && done = false'})
await validate_query(session, 'completed vikunja-mcp work',
{'filter': f'project = {repo["id"]} && done = true'})
await validate_query(session, 'open p0 or p1 tasks',
{'filter': f'labels in {p0_id},{p1_id} && done = false'})
await validate_query(session, 'title search "postgres"',
{'filter': 'title like "postgres"'})
await validate_query(session, 'home todos',
{'filter': f'project = {home["id"]}'})
print()
print('seed and validation complete.')
print()
print(f'open {URL} in a browser to inspect the three projects in the web UI.')
async def validate_query(session, label: str, params: dict):
'''Run a get__tasks query through the MCP and pretty print the hits.
:param session: Open MCP ClientSession
:param label: Short human label for the query
:param params: Arguments forwarded to the get__tasks tool
'''
print(f' {label}')
print(f' filter: {params.get("filter", "(none)")}')
try:
hits = await call(session, 'get__tasks', **params) or []
except RuntimeError as e:
print(f' error: {e}')
print()
return
if not hits:
print(' (no matches)')
else:
for h in hits:
print(f' - [{h.get("id")}] {h.get("title")}')
print()
async def wipe_project_tasks(session, project_id: int):
'''Delete every task currently assigned to a project via the MCP.
:param session: Open MCP ClientSession
:param project_id: Target project identifier
'''
tasks = await call(session, 'get__tasks', per_page=500) or []
for task in tasks:
if task.get('project_id') != project_id:
continue
try:
await call(session, 'delete__tasks_id', id=task['id'])
except RuntimeError:
pass
async def delete_by_title(session, kind: str, title: str):
'''Search for items by title and delete all exact matches via the MCP.
:param session: Open MCP ClientSession
:param kind: Resource type ("projects" or "labels")
:param title: Exact title to match
'''
items = await call(session, f'get__{kind}', s=title, per_page=50) or []
singular = kind.rstrip('s')
found = False
for item in items:
if item.get('title') != title:
continue
found = True
try:
await call(session, f'delete__{kind}_id', id=item['id'])
print(f' deleted {singular} \'{title}\' (id {item["id"]})')
except RuntimeError as e:
print(f' failed {singular} \'{title}\' (id {item["id"]}): {e}')
if not found:
print(f' {singular} \'{title}\' not found')
async def wipe():
'''Delete all seeded projects and labels through the MCP server.'''
if not TOKEN:
print('error: VIKUNJA_TOKEN is not set in .env')
sys.exit(1)
if not os.path.isfile(SERVER_PATH):
print(f'error: mcp server not found at {SERVER_PATH}')
sys.exit(1)
params = StdioServerParameters(
command = 'python3',
args = [SERVER_PATH],
env = {**os.environ, 'VIKUNJA_URL': URL, 'VIKUNJA_TOKEN': TOKEN},
)
project_titles = [PROJECT_MEMORY['title'], PROJECT_REPO['title'], PROJECT_HOME['title']]
label_titles = list(LABELS.keys())
async with stdio_client(params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
print('>>> deleting seeded projects (cascades to their tasks)')
for title in project_titles:
await delete_by_title(session, 'projects', title)
print()
print('>>> deleting seeded labels')
for title in label_titles:
await delete_by_title(session, 'labels', title)
print()
print('>>> done')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Seed and validate a Vikunja instance through the MCP server')
parser.add_argument('-w', '--wipe', action='store_true', help='Delete all seeded projects and labels instead of seeding')
args = parser.parse_args()
if args.wipe:
asyncio.run(wipe())
else:
asyncio.run(main())

250
tools.go Normal file
View File

@@ -0,0 +1,250 @@
// Vikunja MCP - Developed by acidvegas in Go (https://git.acid.vegas)
// vikunja-mcp/tools.go
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"regexp"
"strings"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
var (
nameSanitizer = regexp.MustCompile(`[^a-zA-Z0-9_-]+`)
pathParamRe = regexp.MustCompile(`\{(\w+)\}`)
)
// sanitizeName turns an OpenAPI operationId into a valid MCP tool name.
func sanitizeName(raw string) string {
name := nameSanitizer.ReplaceAllString(raw, "_")
name = strings.Trim(name, "_")
if len(name) > 64 {
name = name[:64]
}
if name == "" {
return "op"
}
return name
}
type opDescriptor struct {
method string
path string
params []map[string]any
}
// registerTools walks an OpenAPI 2 spec and registers an MCP tool for every
// allowlisted operation. Returns the number of tools registered.
func registerTools(server *mcp.Server, cfg *config, spec map[string]any) int {
paths, _ := spec["paths"].(map[string]any)
count := 0
for path, methodsAny := range paths {
methods, ok := methodsAny.(map[string]any)
if !ok {
continue
}
for method, opAny := range methods {
switch method {
case "get", "post", "put", "delete", "patch":
default:
continue
}
methodU := strings.ToUpper(method)
if !isAllowlisted(methodU, path) {
continue
}
op, ok := opAny.(map[string]any)
if !ok {
continue
}
name := toolNameFor(op, method, path)
desc := toolDescFor(op, methodU, path)
properties, required, params := buildSchema(op, spec, methodU, path)
schema := map[string]any{
"type": "object",
"properties": properties,
"required": required,
}
schemaBytes, err := json.Marshal(schema)
if err != nil {
log.Printf("skip %s %s: schema marshal failed: %v", methodU, path, err)
continue
}
descriptor := opDescriptor{method: methodU, path: path, params: params}
server.AddTool(&mcp.Tool{
Name: name,
Description: desc,
InputSchema: json.RawMessage(schemaBytes),
}, makeHandler(cfg, descriptor))
count++
}
}
return count
}
func toolNameFor(op map[string]any, method, path string) string {
raw, _ := op["operationId"].(string)
if raw == "" {
raw = fmt.Sprintf("%s_%s", method, path)
}
return sanitizeName(raw)
}
func toolDescFor(op map[string]any, methodU, path string) string {
desc, _ := op["summary"].(string)
if desc == "" {
desc, _ = op["description"].(string)
}
if desc == "" {
desc = fmt.Sprintf("%s %s", methodU, path)
}
desc = strings.TrimSpace(desc)
if len(desc) > 1024 {
desc = desc[:1024]
}
return desc
}
func buildSchema(op, spec map[string]any, methodU, path string) (map[string]any, []string, []map[string]any) {
properties := map[string]any{}
required := []string{}
var params []map[string]any
paramsAny, _ := op["parameters"].([]any)
for _, p := range paramsAny {
param, ok := p.(map[string]any)
if !ok {
continue
}
params = append(params, param)
pname, _ := param["name"].(string)
loc, _ := param["in"].(string)
if loc == "body" {
body := resolveBodySchema(param, spec)
if _, has := body["description"]; !has {
body["description"] = fmt.Sprintf("Request body for %s %s", methodU, path)
}
properties[pname] = body
} else {
pType, _ := param["type"].(string)
paramDesc, _ := param["description"].(string)
if paramDesc == "" {
paramDesc = fmt.Sprintf("%s parameter", loc)
}
properties[pname] = map[string]any{
"type": openapiToJSON(pType),
"description": strings.TrimSpace(paramDesc),
}
}
if req, _ := param["required"].(bool); req {
required = append(required, pname)
}
}
return properties, required, params
}
// makeHandler returns a ToolHandler that executes a single Vikunja API call
// using the supplied operation descriptor.
func makeHandler(cfg *config, op opDescriptor) mcp.ToolHandler {
return func(ctx context.Context, req *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
args := map[string]any{}
if len(req.Params.Arguments) > 0 {
_ = json.Unmarshal(req.Params.Arguments, &args)
}
path := op.path
query := map[string]string{}
var body any
for _, param := range op.params {
pname, _ := param["name"].(string)
val, ok := args[pname]
if !ok || val == nil {
continue
}
switch loc, _ := param["in"].(string); loc {
case "path":
path = strings.ReplaceAll(path, "{"+pname+"}", fmt.Sprintf("%v", val))
case "query":
query[pname] = fmt.Sprintf("%v", val)
case "body":
body = val
}
}
if matches := pathParamRe.FindAllStringSubmatch(path, -1); len(matches) > 0 {
missing := make([]string, 0, len(matches))
for _, m := range matches {
missing = append(missing, m[1])
}
msg := "error: missing required path parameters: " + strings.Join(missing, ", ")
log.Printf("%s %s -> %s", op.method, op.path, msg)
return toolErr(msg), nil
}
var reqBody io.Reader
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return toolErr("error: failed to marshal body: " + err.Error()), nil
}
reqBody = bytes.NewReader(b)
}
httpReq, err := http.NewRequestWithContext(ctx, op.method, cfg.baseURL+path, reqBody)
if err != nil {
return toolErr("error: " + err.Error()), nil
}
if len(query) > 0 {
q := httpReq.URL.Query()
for k, v := range query {
q.Set(k, v)
}
httpReq.URL.RawQuery = q.Encode()
}
httpReq.Header.Set("Authorization", "Bearer "+cfg.token)
httpReq.Header.Set("Accept", "application/json")
if body != nil {
httpReq.Header.Set("Content-Type", "application/json")
}
resp, err := cfg.httpClient.Do(httpReq)
if err != nil {
return toolErr("error: " + err.Error()), nil
}
defer resp.Body.Close()
respBytes, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
log.Printf("%s %s -> HTTP %d", op.method, path, resp.StatusCode)
}
text := fmt.Sprintf("HTTP %d\n%s", resp.StatusCode, string(respBytes))
return &mcp.CallToolResult{
Content: []mcp.Content{&mcp.TextContent{Text: text}},
}, nil
}
}
func toolErr(msg string) *mcp.CallToolResult {
return &mcp.CallToolResult{
Content: []mcp.Content{&mcp.TextContent{Text: msg}},
IsError: true,
}
}