Compare commits
11 Commits
AI-AGENT
...
a7af27d064
| Author | SHA1 | Date | |
|---|---|---|---|
| a7af27d064 | |||
| e1210e6e20 | |||
| 12d28170d2 | |||
| 0ba1caaa23 | |||
| e1782a6e3b | |||
| 4a272f373d | |||
| f4fa01ef7e | |||
| a50efd0082 | |||
| a4010e1173 | |||
| 6e2155d8bd | |||
| cd842eb7ac |
3
.env_example
Normal file
3
.env_example
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
OLLAMA_URL=https://ollama.host.com
|
||||||
|
OLLAMA_AUTH="Basic <BASE64 Auth string>"
|
||||||
|
#OLLAMA_AUTH="Bearer <TOKEN>"
|
||||||
60
.github/workflows/docker-publish-agent.yml
vendored
Normal file
60
.github/workflows/docker-publish-agent.yml
vendored
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
name: Publish Metadata Agent Image
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- '**'
|
||||||
|
tags:
|
||||||
|
- 'v*.*.*'
|
||||||
|
|
||||||
|
env:
|
||||||
|
REGISTRY: docker.io
|
||||||
|
IMAGE_NAME: ${{ secrets.DOCKERHUB_USERNAME }}/furumi-metadata-agent
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-and-push-image:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
permissions:
|
||||||
|
contents: read
|
||||||
|
packages: write
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
- name: Log in to Docker Hub
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
||||||
|
- name: Determine version and tags
|
||||||
|
id: info
|
||||||
|
run: |
|
||||||
|
IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}"
|
||||||
|
SHORT_SHA="$(echo '${{ github.sha }}' | cut -c1-7)"
|
||||||
|
|
||||||
|
if [[ "${{ github.ref }}" == refs/tags/v* ]]; then
|
||||||
|
TAG="${{ github.ref_name }}"
|
||||||
|
VERSION="${TAG#v}"
|
||||||
|
echo "tags=${IMAGE}:${VERSION},${IMAGE}:latest" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "version=${VERSION}" >> "$GITHUB_OUTPUT"
|
||||||
|
else
|
||||||
|
echo "tags=${IMAGE}:trunk,${IMAGE}:${SHORT_SHA}" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "version=${SHORT_SHA}" >> "$GITHUB_OUTPUT"
|
||||||
|
fi
|
||||||
|
|
||||||
|
- name: Build and push Docker image
|
||||||
|
uses: docker/build-push-action@v5
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
file: Dockerfile.agent
|
||||||
|
push: true
|
||||||
|
tags: ${{ steps.info.outputs.tags }}
|
||||||
|
build-args: |
|
||||||
|
FURUMI_VERSION=${{ steps.info.outputs.version }}
|
||||||
|
cache-from: type=gha
|
||||||
|
cache-to: type=gha,mode=max
|
||||||
60
.github/workflows/docker-publish-player.yml
vendored
Normal file
60
.github/workflows/docker-publish-player.yml
vendored
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
name: Publish Web Player Image
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- '**'
|
||||||
|
tags:
|
||||||
|
- 'v*.*.*'
|
||||||
|
|
||||||
|
env:
|
||||||
|
REGISTRY: docker.io
|
||||||
|
IMAGE_NAME: ${{ secrets.DOCKERHUB_USERNAME }}/furumi-web-player
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-and-push-image:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
permissions:
|
||||||
|
contents: read
|
||||||
|
packages: write
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
- name: Log in to Docker Hub
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
||||||
|
- name: Determine version and tags
|
||||||
|
id: info
|
||||||
|
run: |
|
||||||
|
IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}"
|
||||||
|
SHORT_SHA="$(echo '${{ github.sha }}' | cut -c1-7)"
|
||||||
|
|
||||||
|
if [[ "${{ github.ref }}" == refs/tags/v* ]]; then
|
||||||
|
TAG="${{ github.ref_name }}"
|
||||||
|
VERSION="${TAG#v}"
|
||||||
|
echo "tags=${IMAGE}:${VERSION},${IMAGE}:latest" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "version=${VERSION}" >> "$GITHUB_OUTPUT"
|
||||||
|
else
|
||||||
|
echo "tags=${IMAGE}:trunk,${IMAGE}:${SHORT_SHA}" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "version=${SHORT_SHA}" >> "$GITHUB_OUTPUT"
|
||||||
|
fi
|
||||||
|
|
||||||
|
- name: Build and push Docker image
|
||||||
|
uses: docker/build-push-action@v5
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
file: Dockerfile.web-player
|
||||||
|
push: true
|
||||||
|
tags: ${{ steps.info.outputs.tags }}
|
||||||
|
build-args: |
|
||||||
|
FURUMI_VERSION=${{ steps.info.outputs.version }}
|
||||||
|
cache-from: type=gha
|
||||||
|
cache-to: type=gha,mode=max
|
||||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1 +1,4 @@
|
|||||||
/target
|
/target
|
||||||
|
/inbox
|
||||||
|
/storage
|
||||||
|
.env
|
||||||
|
|||||||
32
Dockerfile.agent
Normal file
32
Dockerfile.agent
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
FROM rust:1.88.0-bookworm AS builder
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y \
|
||||||
|
pkg-config \
|
||||||
|
libssl-dev \
|
||||||
|
protobuf-compiler \
|
||||||
|
cmake \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
WORKDIR /usr/src/app
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
ARG FURUMI_VERSION=dev
|
||||||
|
RUN FURUMI_VERSION=${FURUMI_VERSION} cargo build --release --bin furumi-agent
|
||||||
|
|
||||||
|
FROM debian:bookworm-slim
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y \
|
||||||
|
ca-certificates \
|
||||||
|
libssl-dev \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
RUN useradd -ms /bin/bash appuser
|
||||||
|
WORKDIR /home/appuser
|
||||||
|
|
||||||
|
COPY --from=builder /usr/src/app/target/release/furumi-agent /usr/local/bin/furumi-agent
|
||||||
|
|
||||||
|
USER appuser
|
||||||
|
|
||||||
|
EXPOSE 8090
|
||||||
|
|
||||||
|
ENTRYPOINT ["furumi-agent"]
|
||||||
32
Dockerfile.web-player
Normal file
32
Dockerfile.web-player
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
FROM rust:1.88.0-bookworm AS builder
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y \
|
||||||
|
pkg-config \
|
||||||
|
libssl-dev \
|
||||||
|
protobuf-compiler \
|
||||||
|
cmake \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
WORKDIR /usr/src/app
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
ARG FURUMI_VERSION=dev
|
||||||
|
RUN FURUMI_VERSION=${FURUMI_VERSION} cargo build --release --bin furumi-web-player
|
||||||
|
|
||||||
|
FROM debian:bookworm-slim
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y \
|
||||||
|
ca-certificates \
|
||||||
|
libssl-dev \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
RUN useradd -ms /bin/bash appuser
|
||||||
|
WORKDIR /home/appuser
|
||||||
|
|
||||||
|
COPY --from=builder /usr/src/app/target/release/furumi-web-player /usr/local/bin/furumi-web-player
|
||||||
|
|
||||||
|
USER appuser
|
||||||
|
|
||||||
|
EXPOSE 8080
|
||||||
|
|
||||||
|
ENTRYPOINT ["furumi-web-player"]
|
||||||
221
README.md
221
README.md
@@ -1,54 +1,117 @@
|
|||||||
# Furumi-ng
|
# Furumi-ng
|
||||||
|
|
||||||
Remote filesystem over encrypted gRPC. Mount a directory from a remote server as a local folder via FUSE.
|
A music platform consisting of a remote filesystem, an AI-powered metadata agent, and a database-backed web player.
|
||||||
|
|
||||||
Designed for streaming media (video, music) over the network.
|
## Components
|
||||||
|
|
||||||
|
```
|
||||||
|
furumi-server gRPC remote filesystem with TLS and auth
|
||||||
|
furumi-client-core Cross-platform gRPC client library
|
||||||
|
furumi-mount-linux FUSE mount for Linux
|
||||||
|
furumi-mount-macos NFS mount for macOS
|
||||||
|
furumi-agent AI metadata agent (LLM-powered ingest pipeline + admin UI)
|
||||||
|
furumi-web-player Database-backed web music player with OIDC auth
|
||||||
|
```
|
||||||
|
|
||||||
|
### furumi-server
|
||||||
|
|
||||||
|
Exposes a local directory over encrypted gRPC. Supports Bearer token auth, OIDC/SSO, Prometheus metrics, and a built-in web player for direct filesystem browsing.
|
||||||
|
|
||||||
|
### furumi-agent
|
||||||
|
|
||||||
|
Background service that watches an inbox folder for new music files, extracts metadata, normalizes it using a local LLM (via Ollama), and stores canonical metadata in PostgreSQL. Features:
|
||||||
|
|
||||||
|
- Automatic metadata extraction (Symphonia) and file path parsing
|
||||||
|
- LLM normalization with RAG (queries existing artists/albums in DB for consistency)
|
||||||
|
- Featured artist detection (`feat.`, `п.у.`, `&`, etc.)
|
||||||
|
- Album cover image processing
|
||||||
|
- Auto-approval for high-confidence results, review queue for uncertain ones
|
||||||
|
- Admin web UI with batch operations, inline editing, album grouping
|
||||||
|
- Organized file storage (`Artist/Album/Track.ext`)
|
||||||
|
- Configurable system prompt (built-in default or external file)
|
||||||
|
- Database migrations via sqlx
|
||||||
|
|
||||||
|
### furumi-web-player
|
||||||
|
|
||||||
|
Web music player that reads exclusively from the database populated by the agent. Features:
|
||||||
|
|
||||||
|
- Browse by Artists, Albums, Tracks
|
||||||
|
- Full-text search across the library
|
||||||
|
- Audio streaming with HTTP Range request support
|
||||||
|
- Album cover art (from DB or embedded in audio files)
|
||||||
|
- Queue management with shuffle, repeat, drag-and-drop reorder
|
||||||
|
- Media Session API (hardware controls, lock screen integration)
|
||||||
|
- OIDC/SSO authentication
|
||||||
|
- Deep linking (`?t=<track_slug>`)
|
||||||
|
- Relative URL paths (works behind any reverse proxy prefix)
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
```
|
```
|
||||||
furumi-server (gRPC + TLS) ←→ furumi-client-core ←→ furumi-mount-{linux,macos} (FUSE / NFS)
|
┌─────────────────┐
|
||||||
|
│ Ollama (LLM) │
|
||||||
|
└────────┬────────┘
|
||||||
|
│
|
||||||
|
┌──────────┐ ┌────────────┴────────────┐ ┌──────────────────┐
|
||||||
|
│ Inbox │───→│ furumi-agent │───→│ Storage (files) │
|
||||||
|
│ folder │ │ (ingest + admin UI) │ └────────┬─────────┘
|
||||||
|
└──────────┘ └────────────┬────────────┘ │
|
||||||
|
│ │
|
||||||
|
┌──────┴──────┐ ┌───────┴────────┐
|
||||||
|
│ PostgreSQL │←──────────│ furumi-web- │
|
||||||
|
│ (metadata) │ │ player │
|
||||||
|
└─────────────┘ └────────────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
- **furumi-server** — exposes a directory over gRPC with auto-TLS, Bearer token auth, and Prometheus metrics
|
|
||||||
- **furumi-client-core** — cross-platform gRPC client library with attribute caching
|
|
||||||
- **furumi-mount-linux** — mounts the remote directory locally via FUSE (read-only)
|
|
||||||
- **furumi-mount-macos** — mounts the remote directory locally via a local NFS server (read-only)
|
|
||||||
|
|
||||||
## Quick Start
|
## Quick Start
|
||||||
|
|
||||||
|
### Remote Filesystem (FUSE/NFS mount)
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Build
|
|
||||||
cargo build --release --workspace
|
cargo build --release --workspace
|
||||||
|
|
||||||
# Server — auto-generates TLS certificate, saves it for client
|
# Server
|
||||||
./target/release/furumi-server \
|
./target/release/furumi-server \
|
||||||
--root /path/to/media \
|
--root /path/to/media \
|
||||||
--token mysecrettoken \
|
--token mysecrettoken
|
||||||
--tls-cert-out /tmp/furumi-ca.pem
|
|
||||||
|
|
||||||
# Client
|
# Client (Linux)
|
||||||
./target/release/furumi-mount-macos \
|
./target/release/furumi-mount-linux \
|
||||||
--server server-ip:50051 \
|
--server server-ip:50051 \
|
||||||
--token mysecrettoken \
|
--token mysecrettoken \
|
||||||
--mount /Volumes/remote
|
--mount /mnt/remote
|
||||||
|
|
||||||
# Use it
|
|
||||||
ls /mnt/remote
|
|
||||||
mpv /mnt/remote/video.mkv
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Linux FUSE3
|
### Music Platform (Agent + Player)
|
||||||
Linux client uses FUSE. Install with:
|
|
||||||
```
|
|
||||||
sudo add-apt-repository universe
|
|
||||||
sudo apt install libfuse3-dev
|
|
||||||
```
|
|
||||||
## Encryption
|
|
||||||
|
|
||||||
TLS is enabled by default. The server auto-generates a self-signed certificate on each start — no manual cert management required. The client automatically trusts the server's certificate for encryption.
|
Requires PostgreSQL with `pg_trgm` extension and Ollama for LLM.
|
||||||
|
|
||||||
To disable TLS (not recommended): `--no-tls` on both server and client.
|
```bash
|
||||||
|
# 1. Start PostgreSQL
|
||||||
|
docker run -d --name furumi-pg \
|
||||||
|
-e POSTGRES_DB=furumi -e POSTGRES_USER=furumi -e POSTGRES_PASSWORD=furumi \
|
||||||
|
-p 5432:5432 postgres:17
|
||||||
|
|
||||||
|
# 2. Create directories
|
||||||
|
mkdir -p /music/inbox /music/storage
|
||||||
|
|
||||||
|
# 3. Start the agent (runs migrations automatically)
|
||||||
|
./target/release/furumi-agent \
|
||||||
|
--inbox-dir /music/inbox \
|
||||||
|
--storage-dir /music/storage \
|
||||||
|
--database-url "postgres://furumi:furumi@localhost:5432/furumi" \
|
||||||
|
--ollama-url "http://localhost:11434" \
|
||||||
|
--ollama-model "qwen3:14b"
|
||||||
|
|
||||||
|
# 4. Start the web player
|
||||||
|
./target/release/furumi-web-player \
|
||||||
|
--storage-dir /music/storage \
|
||||||
|
--database-url "postgres://furumi:furumi@localhost:5432/furumi"
|
||||||
|
|
||||||
|
# 5. Drop music files into /music/inbox — agent processes them automatically
|
||||||
|
# 6. Open http://localhost:8080 to play music
|
||||||
|
# 7. Open http://localhost:8090 for the agent admin UI
|
||||||
|
```
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
@@ -62,9 +125,11 @@ All options can be set via CLI flags or environment variables.
|
|||||||
| `--root` | `FURUMI_ROOT` | `.` | Directory to expose |
|
| `--root` | `FURUMI_ROOT` | `.` | Directory to expose |
|
||||||
| `--token` | `FURUMI_TOKEN` | *(empty, auth off)* | Bearer token |
|
| `--token` | `FURUMI_TOKEN` | *(empty, auth off)* | Bearer token |
|
||||||
| `--metrics-bind` | `FURUMI_METRICS_BIND` | `0.0.0.0:9090` | Prometheus endpoint |
|
| `--metrics-bind` | `FURUMI_METRICS_BIND` | `0.0.0.0:9090` | Prometheus endpoint |
|
||||||
|
| `--web-bind` | `FURUMI_WEB_BIND` | `0.0.0.0:8080` | Built-in web player |
|
||||||
|
| `--no-web` | — | `false` | Disable built-in web player |
|
||||||
| `--no-tls` | — | `false` | Disable TLS |
|
| `--no-tls` | — | `false` | Disable TLS |
|
||||||
|
|
||||||
### Client
|
### Client (Linux / macOS)
|
||||||
|
|
||||||
| Flag | Env | Default | Description |
|
| Flag | Env | Default | Description |
|
||||||
|------|-----|---------|-------------|
|
|------|-----|---------|-------------|
|
||||||
@@ -73,22 +138,110 @@ All options can be set via CLI flags or environment variables.
|
|||||||
| `--mount` | `FURUMI_MOUNT` | — | Mount point directory |
|
| `--mount` | `FURUMI_MOUNT` | — | Mount point directory |
|
||||||
| `--no-tls` | — | `false` | Disable TLS |
|
| `--no-tls` | — | `false` | Disable TLS |
|
||||||
|
|
||||||
|
### Metadata Agent
|
||||||
|
|
||||||
|
| Flag | Env | Default | Description |
|
||||||
|
|------|-----|---------|-------------|
|
||||||
|
| `--bind` | `FURUMI_AGENT_BIND` | `0.0.0.0:8090` | Admin UI address |
|
||||||
|
| `--inbox-dir` | `FURUMI_AGENT_INBOX_DIR` | — | Watch folder for new files |
|
||||||
|
| `--storage-dir` | `FURUMI_AGENT_STORAGE_DIR` | — | Permanent storage folder |
|
||||||
|
| `--database-url` | `FURUMI_AGENT_DATABASE_URL` | — | PostgreSQL URL |
|
||||||
|
| `--ollama-url` | `FURUMI_AGENT_OLLAMA_URL` | `http://localhost:11434` | Ollama API endpoint |
|
||||||
|
| `--ollama-model` | `FURUMI_AGENT_OLLAMA_MODEL` | `qwen3:14b` | LLM model name |
|
||||||
|
| `--poll-interval-secs` | `FURUMI_AGENT_POLL_INTERVAL_SECS` | `30` | Inbox scan interval |
|
||||||
|
| `--confidence-threshold` | `FURUMI_AGENT_CONFIDENCE_THRESHOLD` | `0.85` | Auto-approve threshold |
|
||||||
|
| `--system-prompt-file` | `FURUMI_AGENT_SYSTEM_PROMPT_FILE` | *(built-in)* | Custom LLM prompt |
|
||||||
|
|
||||||
|
### Web Player
|
||||||
|
|
||||||
|
| Flag | Env | Default | Description |
|
||||||
|
|------|-----|---------|-------------|
|
||||||
|
| `--bind` | `FURUMI_PLAYER_BIND` | `0.0.0.0:8080` | Player address |
|
||||||
|
| `--database-url` | `FURUMI_PLAYER_DATABASE_URL` | — | PostgreSQL URL |
|
||||||
|
| `--storage-dir` | `FURUMI_PLAYER_STORAGE_DIR` | — | Storage folder (for streaming) |
|
||||||
|
| `--oidc-issuer-url` | `FURUMI_PLAYER_OIDC_ISSUER_URL` | *(disabled)* | OIDC issuer |
|
||||||
|
| `--oidc-client-id` | `FURUMI_PLAYER_OIDC_CLIENT_ID` | — | OIDC client ID |
|
||||||
|
| `--oidc-client-secret` | `FURUMI_PLAYER_OIDC_CLIENT_SECRET` | — | OIDC client secret |
|
||||||
|
| `--oidc-redirect-url` | `FURUMI_PLAYER_OIDC_REDIRECT_URL` | — | OIDC redirect URL |
|
||||||
|
| `--oidc-session-secret` | `FURUMI_PLAYER_OIDC_SESSION_SECRET` | *(random)* | Session HMAC secret |
|
||||||
|
|
||||||
|
## Docker Compose
|
||||||
|
|
||||||
|
The easiest way to run the entire backend stack (PostgreSQL, Agent, Web Player, and gRPC Server) is using Docker Compose.
|
||||||
|
|
||||||
|
### Quick Start
|
||||||
|
|
||||||
|
1. **Prepare directories**:
|
||||||
|
```bash
|
||||||
|
mkdir -p inbox storage
|
||||||
|
```
|
||||||
|
2. **Start the services**:
|
||||||
|
```bash
|
||||||
|
docker compose up -d
|
||||||
|
```
|
||||||
|
3. **Check logs**:
|
||||||
|
```bash
|
||||||
|
docker compose logs -f
|
||||||
|
```
|
||||||
|
|
||||||
|
The following services will be available:
|
||||||
|
- **Web Player**: [http://localhost:8085](http://localhost:8085)
|
||||||
|
- **Agent Admin UI**: [http://localhost:8090](http://localhost:8090)
|
||||||
|
- **Metrics**: [http://localhost:9090/metrics](http://localhost:9090/metrics)
|
||||||
|
|
||||||
|
> [!NOTE]
|
||||||
|
> The Agent expects Ollama to be running. By default, it tries to connect to the host at `http://localhost:11434`.
|
||||||
|
|
||||||
|
### Reference Commands
|
||||||
|
|
||||||
|
- **Start**: `docker compose up -d`
|
||||||
|
- **Stop**: `docker compose stop`
|
||||||
|
- **Stop and remove containers**: `docker compose down`
|
||||||
|
- **Clear database and storage**: `docker compose down -v`
|
||||||
|
|
||||||
|
### Environment Variables
|
||||||
|
|
||||||
|
To configure the Agent (especially for remote Ollama or private models) and database, create an `.env` file in the root directory:
|
||||||
|
|
||||||
|
```env
|
||||||
|
# Database
|
||||||
|
POSTGRES_PASSWORD=secure-password
|
||||||
|
|
||||||
|
# LLM (Ollama)
|
||||||
|
OLLAMA_URL=http://your-ollama-host:11434
|
||||||
|
OLLAMA_AUTH="Bearer your-token"
|
||||||
|
|
||||||
|
# Server Security
|
||||||
|
FURUMI_TOKEN=secure-server-token
|
||||||
|
```
|
||||||
|
|
||||||
|
For more options, refer to the [Configuration](#configuration) section.
|
||||||
|
|
||||||
|
## Docker
|
||||||
|
|
||||||
|
Pre-built images are available on Docker Hub:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker pull <user>/furumi-server
|
||||||
|
docker pull <user>/furumi-web-player
|
||||||
|
docker pull <user>/furumi-metadata-agent
|
||||||
|
```
|
||||||
|
|
||||||
## Prometheus Metrics
|
## Prometheus Metrics
|
||||||
|
|
||||||
Available at `http://<metrics-bind>/metrics`:
|
Available at `http://<metrics-bind>/metrics` (server only):
|
||||||
|
|
||||||
- `furumi_grpc_requests_total` — request count by method and status
|
- `furumi_grpc_requests_total` — request count by method and status
|
||||||
- `furumi_grpc_request_duration_seconds` — request latency histogram
|
- `furumi_grpc_request_duration_seconds` — request latency histogram
|
||||||
- `furumi_bytes_read_total` — total bytes streamed
|
- `furumi_bytes_read_total` — total bytes streamed
|
||||||
- `furumi_active_streams` — current streaming connections
|
- `furumi_active_streams` — current streaming connections
|
||||||
- `furumi_file_open_errors_total` — file access errors
|
|
||||||
- `furumi_auth_failures_total` — authentication failures
|
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
- Linux with `libfuse3-dev` and `pkg-config` (for client)
|
|
||||||
- macOS (uses built-in NFS client)
|
|
||||||
- Rust 2024 edition
|
- Rust 2024 edition
|
||||||
|
- PostgreSQL 14+ with `pg_trgm` extension (for agent and web player)
|
||||||
|
- Ollama with a local LLM (for agent)
|
||||||
|
- Linux with `libfuse3-dev` (for FUSE client only)
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
||||||
|
|||||||
61
docker-compose.yml
Normal file
61
docker-compose.yml
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
services:
|
||||||
|
db:
|
||||||
|
image: postgres:17-alpine
|
||||||
|
container_name: furumi-db
|
||||||
|
environment:
|
||||||
|
POSTGRES_DB: ${POSTGRES_DB:-furumi}
|
||||||
|
POSTGRES_USER: ${POSTGRES_USER:-furumi}
|
||||||
|
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-furumi}
|
||||||
|
volumes:
|
||||||
|
- pgdata:/var/lib/postgresql/data
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD-SHELL", "pg_isready -U furumi -d furumi"]
|
||||||
|
interval: 5s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 5
|
||||||
|
|
||||||
|
agent:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile.agent
|
||||||
|
container_name: furumi-agent
|
||||||
|
depends_on:
|
||||||
|
db:
|
||||||
|
condition: service_healthy
|
||||||
|
ports:
|
||||||
|
- "8090:8090"
|
||||||
|
environment:
|
||||||
|
FURUMI_AGENT_DATABASE_URL: "postgres://${POSTGRES_USER:-furumi}:${POSTGRES_PASSWORD:-furumi}@db:5432/${POSTGRES_DB:-furumi}"
|
||||||
|
FURUMI_AGENT_INBOX_DIR: "/inbox"
|
||||||
|
FURUMI_AGENT_STORAGE_DIR: "/storage"
|
||||||
|
FURUMI_AGENT_OLLAMA_URL: "${OLLAMA_URL:-http://host.docker.internal:11434}"
|
||||||
|
FURUMI_AGENT_OLLAMA_AUTH: "${OLLAMA_AUTH:-CHANGE-ME}"
|
||||||
|
FURUMI_PLAYER_BIND: "0.0.0.0:8090"
|
||||||
|
FURUMI_AGENT_POLL_INTERVAL_SECS: 5
|
||||||
|
volumes:
|
||||||
|
- ./inbox:/inbox
|
||||||
|
- ./storage:/storage
|
||||||
|
extra_hosts:
|
||||||
|
- "host.docker.internal:host-gateway"
|
||||||
|
restart: always
|
||||||
|
|
||||||
|
web-player:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile.web-player
|
||||||
|
container_name: furumi-web-player
|
||||||
|
depends_on:
|
||||||
|
db:
|
||||||
|
condition: service_healthy
|
||||||
|
ports:
|
||||||
|
- "8085:8085"
|
||||||
|
environment:
|
||||||
|
FURUMI_PLAYER_DATABASE_URL: "postgres://${POSTGRES_USER:-furumi}:${POSTGRES_PASSWORD:-furumi}@db:5432/${POSTGRES_DB:-furumi}"
|
||||||
|
FURUMI_PLAYER_STORAGE_DIR: "/storage"
|
||||||
|
FURUMI_PLAYER_BIND: "0.0.0.0:8085"
|
||||||
|
volumes:
|
||||||
|
- ./storage:/storage
|
||||||
|
restart: always
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
pgdata:
|
||||||
10
furumi-agent/migrations/0003_artist_merges.sql
Normal file
10
furumi-agent/migrations/0003_artist_merges.sql
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
CREATE TABLE artist_merges (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
source_artist_ids TEXT NOT NULL,
|
||||||
|
proposal TEXT,
|
||||||
|
llm_notes TEXT,
|
||||||
|
error_message TEXT,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
65
furumi-agent/prompts/merge.txt
Normal file
65
furumi-agent/prompts/merge.txt
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
You are a music library artist merge assistant. You will receive a list of artists (with their albums and tracks, each with database IDs) that have been identified as potential duplicates. Your job is to analyze them and produce a merge plan.
|
||||||
|
|
||||||
|
## Input format
|
||||||
|
|
||||||
|
You will receive a structured list like:
|
||||||
|
|
||||||
|
### Artist ID 42: "pink floyd"
|
||||||
|
Album ID 10: "the wall" (1979)
|
||||||
|
- 01. "In the Flesh?" [track_id=100]
|
||||||
|
- 02. "The Thin Ice" [track_id=101]
|
||||||
|
|
||||||
|
### Artist ID 43: "Pink Floyd"
|
||||||
|
Album ID 11: "Wish You Were Here" (1975)
|
||||||
|
- 01. "Shine On You Crazy Diamond (Parts I-V)" [track_id=200]
|
||||||
|
|
||||||
|
## Your task
|
||||||
|
|
||||||
|
Determine if the artists are duplicates and produce a merge plan.
|
||||||
|
|
||||||
|
## Rules
|
||||||
|
|
||||||
|
### 1. Canonical artist name
|
||||||
|
- Use correct capitalization and canonical spelling (e.g., "pink floyd" → "Pink Floyd", "AC DC" → "AC/DC").
|
||||||
|
- If the database already contains an artist with a well-formed name, prefer that exact form.
|
||||||
|
- If one artist has clearly more tracks or albums, their name spelling may be more authoritative.
|
||||||
|
- Fix obvious typos or casing errors.
|
||||||
|
|
||||||
|
### 2. Winner artist
|
||||||
|
- `winner_artist_id` must be the ID of one of the provided artists — the one whose identity (ID) will survive in the database.
|
||||||
|
- All other artists are "losers" and will be deleted after their albums and tracks are moved to the winner.
|
||||||
|
- Prefer the artist ID that has the most tracks/albums, or the one with the most correct canonical name.
|
||||||
|
|
||||||
|
### 3. Canonical album names
|
||||||
|
- Use correct capitalization (title case for English, preserve language for non-English).
|
||||||
|
- Fix slug-like names: "new-songs" → "New Songs", "the_dark_side" → "The Dark Side".
|
||||||
|
- Fix all-lowercase or all-uppercase: "WISH YOU WERE HERE" → "Wish You Were Here".
|
||||||
|
- Preserve creative/intentional stylization (e.g., "OK Computer" stays as-is, "(What's the Story) Morning Glory?" stays as-is).
|
||||||
|
- If the database already contains the album under another artist with a well-formed name, use that exact name.
|
||||||
|
|
||||||
|
### 4. Album deduplication
|
||||||
|
- If two albums (across the artists being merged) have the same or very similar name, they are the same album. In that case, pick the better-formed one as the "winner album".
|
||||||
|
- Set `merge_into_album_id` to the winner album's ID for the duplicate album. This means all tracks from the duplicate will be moved into the winner album, and the duplicate album will be deleted.
|
||||||
|
- If an album is unique (no duplicate exists), set `merge_into_album_id` to null — the album will simply be renamed and moved to the winner artist.
|
||||||
|
- When comparing album names for similarity, ignore case, punctuation, and common suffixes like "(Remastered)" for the purpose of duplicate detection. However, treat remastered editions as separate albums unless both albums are clearly the same remaster.
|
||||||
|
|
||||||
|
### 5. Album mappings coverage
|
||||||
|
- `album_mappings` must include an entry for EVERY album across ALL source artists, not just duplicates.
|
||||||
|
- Every album (from every artist being merged) needs a canonical name, even if it is not being merged into another album.
|
||||||
|
|
||||||
|
### 6. Notes
|
||||||
|
- The `notes` field should briefly explain: which artist was chosen as winner and why, which albums were renamed, which albums were deduplicated and into what.
|
||||||
|
|
||||||
|
## Response format
|
||||||
|
|
||||||
|
You MUST respond with a single JSON object, no markdown fences, no extra text:
|
||||||
|
|
||||||
|
{"canonical_artist_name": "...", "winner_artist_id": 42, "album_mappings": [{"source_album_id": 10, "canonical_name": "The Wall", "merge_into_album_id": null}, {"source_album_id": 11, "canonical_name": "Wish You Were Here", "merge_into_album_id": null}], "notes": "..."}
|
||||||
|
|
||||||
|
- `canonical_artist_name`: the single correct name for this artist after merging.
|
||||||
|
- `winner_artist_id`: the integer ID of the artist whose record survives (must be one of the IDs provided).
|
||||||
|
- `album_mappings`: an array covering ALL albums from ALL source artists. Each entry:
|
||||||
|
- `source_album_id`: the integer ID of this album (as provided in the input).
|
||||||
|
- `canonical_name`: the corrected canonical name for this album.
|
||||||
|
- `merge_into_album_id`: null if this album is just renamed/moved to the winner artist; or the integer ID of another album (the winner album) if this album's tracks should be merged into that album and this album deleted. Never set merge_into_album_id to the same album's own ID.
|
||||||
|
- `notes`: brief explanation of the decisions made.
|
||||||
@@ -43,6 +43,7 @@ You are a music metadata normalization assistant. Your job is to take raw metada
|
|||||||
- Preserve original language for non-English albums.
|
- Preserve original language for non-English albums.
|
||||||
- If the database already contains a matching album under the same artist, use the existing name exactly.
|
- If the database already contains a matching album under the same artist, use the existing name exactly.
|
||||||
- Do not alter the creative content of album names (same principle as track titles).
|
- Do not alter the creative content of album names (same principle as track titles).
|
||||||
|
- **Remastered editions**: A remastered release is a separate album entity, even if it shares the same title and tracks as the original. If the tags or path indicate a remaster (e.g., "Remastered", "Remaster", "REMASTERED" anywhere in tags, filename, or path), append " (Remastered)" to the album name if not already present, and use the year of the remaster release (not the original). Example: original album "The Wall" (1979) remastered in 2011 → album: "The Wall (Remastered)", year: 2011.
|
||||||
|
|
||||||
4. **Track titles** must use correct capitalization, but their content must be preserved exactly.
|
4. **Track titles** must use correct capitalization, but their content must be preserved exactly.
|
||||||
- Use title case for English titles.
|
- Use title case for English titles.
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ use clap::Parser;
|
|||||||
/// Default system prompt, compiled into the binary as a fallback.
|
/// Default system prompt, compiled into the binary as a fallback.
|
||||||
const DEFAULT_SYSTEM_PROMPT: &str = include_str!("../prompts/normalize.txt");
|
const DEFAULT_SYSTEM_PROMPT: &str = include_str!("../prompts/normalize.txt");
|
||||||
|
|
||||||
|
const DEFAULT_MERGE_PROMPT: &str = include_str!("../prompts/merge.txt");
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[command(version, about = "Furumi Agent: music metadata ingest and normalization")]
|
#[command(version, about = "Furumi Agent: music metadata ingest and normalization")]
|
||||||
pub struct Args {
|
pub struct Args {
|
||||||
@@ -32,6 +34,10 @@ pub struct Args {
|
|||||||
#[arg(long, env = "FURUMI_AGENT_OLLAMA_MODEL", default_value = "qwen3:14b")]
|
#[arg(long, env = "FURUMI_AGENT_OLLAMA_MODEL", default_value = "qwen3:14b")]
|
||||||
pub ollama_model: String,
|
pub ollama_model: String,
|
||||||
|
|
||||||
|
/// Authorization header value for Ollama API (e.g. "Bearer <token>" or "Basic <base64>")
|
||||||
|
#[arg(long, env = "FURUMI_AGENT_OLLAMA_AUTH")]
|
||||||
|
pub ollama_auth: Option<String>,
|
||||||
|
|
||||||
/// Inbox scan interval in seconds
|
/// Inbox scan interval in seconds
|
||||||
#[arg(long, env = "FURUMI_AGENT_POLL_INTERVAL_SECS", default_value_t = 30)]
|
#[arg(long, env = "FURUMI_AGENT_POLL_INTERVAL_SECS", default_value_t = 30)]
|
||||||
pub poll_interval_secs: u64,
|
pub poll_interval_secs: u64,
|
||||||
@@ -43,6 +49,10 @@ pub struct Args {
|
|||||||
/// Path to a custom system prompt file (overrides the built-in default)
|
/// Path to a custom system prompt file (overrides the built-in default)
|
||||||
#[arg(long, env = "FURUMI_AGENT_SYSTEM_PROMPT_FILE")]
|
#[arg(long, env = "FURUMI_AGENT_SYSTEM_PROMPT_FILE")]
|
||||||
pub system_prompt_file: Option<PathBuf>,
|
pub system_prompt_file: Option<PathBuf>,
|
||||||
|
|
||||||
|
/// Path to a custom merge prompt file (overrides the built-in default)
|
||||||
|
#[arg(long, env = "FURUMI_AGENT_MERGE_PROMPT_FILE")]
|
||||||
|
pub merge_prompt_file: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Args {
|
impl Args {
|
||||||
@@ -72,4 +82,14 @@ impl Args {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn load_merge_prompt(&self) -> Result<String, Box<dyn std::error::Error>> {
|
||||||
|
match &self.merge_prompt_file {
|
||||||
|
Some(path) => {
|
||||||
|
tracing::info!("Loading merge prompt from {:?}", path);
|
||||||
|
Ok(std::fs::read_to_string(path)?)
|
||||||
|
}
|
||||||
|
None => Ok(DEFAULT_MERGE_PROMPT.to_owned()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,6 +88,7 @@ pub struct SimilarAlbum {
|
|||||||
pub similarity: f32,
|
pub similarity: f32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||||
pub struct AlbumImage {
|
pub struct AlbumImage {
|
||||||
pub id: i64,
|
pub id: i64,
|
||||||
@@ -326,6 +327,18 @@ pub async fn approve_and_finalize(
|
|||||||
.fetch_one(pool)
|
.fetch_one(pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Check if track already exists (e.g. previously approved but pending not cleaned up)
|
||||||
|
let existing: Option<(i64,)> = sqlx::query_as("SELECT id FROM tracks WHERE file_hash = $1")
|
||||||
|
.bind(&pt.file_hash)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some((track_id,)) = existing {
|
||||||
|
// Already finalized — just mark pending as approved
|
||||||
|
update_pending_status(pool, pending_id, "approved", None).await?;
|
||||||
|
return Ok(track_id);
|
||||||
|
}
|
||||||
|
|
||||||
let artist_name = pt.norm_artist.as_deref().unwrap_or("Unknown Artist");
|
let artist_name = pt.norm_artist.as_deref().unwrap_or("Unknown Artist");
|
||||||
let artist_id = upsert_artist(pool, artist_name).await?;
|
let artist_id = upsert_artist(pool, artist_name).await?;
|
||||||
|
|
||||||
@@ -404,6 +417,7 @@ pub async fn insert_album_image(
|
|||||||
Ok(row.0)
|
Ok(row.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
pub async fn get_album_images(pool: &PgPool, album_id: i64) -> Result<Vec<AlbumImage>, sqlx::Error> {
|
pub async fn get_album_images(pool: &PgPool, album_id: i64) -> Result<Vec<AlbumImage>, sqlx::Error> {
|
||||||
sqlx::query_as::<_, AlbumImage>("SELECT * FROM album_images WHERE album_id = $1 ORDER BY image_type")
|
sqlx::query_as::<_, AlbumImage>("SELECT * FROM album_images WHERE album_id = $1 ORDER BY image_type")
|
||||||
.bind(album_id)
|
.bind(album_id)
|
||||||
@@ -425,6 +439,16 @@ pub async fn find_album_id(pool: &PgPool, artist_name: &str, album_name: &str) -
|
|||||||
Ok(row.map(|r| r.0))
|
Ok(row.map(|r| r.0))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetch pending tracks that need (re-)processing by the LLM pipeline.
|
||||||
|
pub async fn list_pending_for_processing(pool: &PgPool, limit: i64) -> Result<Vec<PendingTrack>, sqlx::Error> {
|
||||||
|
sqlx::query_as::<_, PendingTrack>(
|
||||||
|
"SELECT * FROM pending_tracks WHERE status = 'pending' ORDER BY created_at ASC LIMIT $1"
|
||||||
|
)
|
||||||
|
.bind(limit)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
// --- DTOs for insert helpers ---
|
// --- DTOs for insert helpers ---
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
@@ -541,6 +565,8 @@ pub struct Stats {
|
|||||||
pub pending_count: i64,
|
pub pending_count: i64,
|
||||||
pub review_count: i64,
|
pub review_count: i64,
|
||||||
pub error_count: i64,
|
pub error_count: i64,
|
||||||
|
pub merged_count: i64,
|
||||||
|
pub active_merges: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_stats(pool: &PgPool) -> Result<Stats, sqlx::Error> {
|
pub async fn get_stats(pool: &PgPool) -> Result<Stats, sqlx::Error> {
|
||||||
@@ -550,5 +576,410 @@ pub async fn get_stats(pool: &PgPool) -> Result<Stats, sqlx::Error> {
|
|||||||
let (pending_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'pending'").fetch_one(pool).await?;
|
let (pending_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'pending'").fetch_one(pool).await?;
|
||||||
let (review_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'review'").fetch_one(pool).await?;
|
let (review_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'review'").fetch_one(pool).await?;
|
||||||
let (error_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'error'").fetch_one(pool).await?;
|
let (error_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'error'").fetch_one(pool).await?;
|
||||||
Ok(Stats { total_tracks, total_artists, total_albums, pending_count, review_count, error_count })
|
let (merged_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'merged'").fetch_one(pool).await?;
|
||||||
|
let (active_merges,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_merges WHERE status IN ('pending','processing')").fetch_one(pool).await?;
|
||||||
|
Ok(Stats { total_tracks, total_artists, total_albums, pending_count, review_count, error_count, merged_count, active_merges })
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================== Library search ===================
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||||
|
pub struct TrackRow {
|
||||||
|
pub id: i64,
|
||||||
|
pub title: String,
|
||||||
|
pub artist_name: String,
|
||||||
|
pub album_id: Option<i64>,
|
||||||
|
pub album_name: Option<String>,
|
||||||
|
pub year: Option<i32>,
|
||||||
|
pub track_number: Option<i32>,
|
||||||
|
pub duration_secs: Option<f64>,
|
||||||
|
pub genre: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||||
|
pub struct AlbumRow {
|
||||||
|
pub id: i64,
|
||||||
|
pub name: String,
|
||||||
|
pub artist_name: String,
|
||||||
|
pub year: Option<i32>,
|
||||||
|
pub track_count: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||||
|
pub struct ArtistRow {
|
||||||
|
pub id: i64,
|
||||||
|
pub name: String,
|
||||||
|
pub album_count: i64,
|
||||||
|
pub track_count: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn search_tracks(
|
||||||
|
pool: &PgPool,
|
||||||
|
q: &str, artist: &str, album: &str,
|
||||||
|
limit: i64, offset: i64,
|
||||||
|
) -> Result<Vec<TrackRow>, sqlx::Error> {
|
||||||
|
sqlx::query_as::<_, TrackRow>(
|
||||||
|
r#"SELECT t.id, t.title, ar.name AS artist_name, t.album_id, al.name AS album_name,
|
||||||
|
al.year, t.track_number, t.duration_secs, t.genre
|
||||||
|
FROM tracks t
|
||||||
|
JOIN track_artists ta ON ta.track_id = t.id AND ta.role = 'primary'
|
||||||
|
JOIN artists ar ON ar.id = ta.artist_id
|
||||||
|
LEFT JOIN albums al ON al.id = t.album_id
|
||||||
|
WHERE ($1 = '' OR t.title ILIKE '%' || $1 || '%')
|
||||||
|
AND ($2 = '' OR ar.name ILIKE '%' || $2 || '%')
|
||||||
|
AND ($3 = '' OR al.name ILIKE '%' || $3 || '%')
|
||||||
|
ORDER BY ar.name, al.name NULLS LAST, t.track_number NULLS LAST, t.title
|
||||||
|
LIMIT $4 OFFSET $5"#,
|
||||||
|
)
|
||||||
|
.bind(q).bind(artist).bind(album).bind(limit).bind(offset)
|
||||||
|
.fetch_all(pool).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn count_tracks(pool: &PgPool, q: &str, artist: &str, album: &str) -> Result<i64, sqlx::Error> {
|
||||||
|
let (n,): (i64,) = sqlx::query_as(
|
||||||
|
r#"SELECT COUNT(*) FROM tracks t
|
||||||
|
JOIN track_artists ta ON ta.track_id = t.id AND ta.role = 'primary'
|
||||||
|
JOIN artists ar ON ar.id = ta.artist_id
|
||||||
|
LEFT JOIN albums al ON al.id = t.album_id
|
||||||
|
WHERE ($1 = '' OR t.title ILIKE '%' || $1 || '%')
|
||||||
|
AND ($2 = '' OR ar.name ILIKE '%' || $2 || '%')
|
||||||
|
AND ($3 = '' OR al.name ILIKE '%' || $3 || '%')"#,
|
||||||
|
)
|
||||||
|
.bind(q).bind(artist).bind(album)
|
||||||
|
.fetch_one(pool).await?;
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn search_albums(
|
||||||
|
pool: &PgPool,
|
||||||
|
q: &str, artist: &str,
|
||||||
|
limit: i64, offset: i64,
|
||||||
|
) -> Result<Vec<AlbumRow>, sqlx::Error> {
|
||||||
|
sqlx::query_as::<_, AlbumRow>(
|
||||||
|
r#"SELECT a.id, a.name, ar.name AS artist_name, a.year,
|
||||||
|
COUNT(t.id) AS track_count
|
||||||
|
FROM albums a
|
||||||
|
JOIN artists ar ON ar.id = a.artist_id
|
||||||
|
LEFT JOIN tracks t ON t.album_id = a.id
|
||||||
|
WHERE ($1 = '' OR a.name ILIKE '%' || $1 || '%')
|
||||||
|
AND ($2 = '' OR ar.name ILIKE '%' || $2 || '%')
|
||||||
|
GROUP BY a.id, a.name, ar.name, a.year
|
||||||
|
ORDER BY ar.name, a.year NULLS LAST, a.name
|
||||||
|
LIMIT $3 OFFSET $4"#,
|
||||||
|
)
|
||||||
|
.bind(q).bind(artist).bind(limit).bind(offset)
|
||||||
|
.fetch_all(pool).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn count_albums(pool: &PgPool, q: &str, artist: &str) -> Result<i64, sqlx::Error> {
|
||||||
|
let (n,): (i64,) = sqlx::query_as(
|
||||||
|
r#"SELECT COUNT(*) FROM albums a
|
||||||
|
JOIN artists ar ON ar.id = a.artist_id
|
||||||
|
WHERE ($1 = '' OR a.name ILIKE '%' || $1 || '%')
|
||||||
|
AND ($2 = '' OR ar.name ILIKE '%' || $2 || '%')"#,
|
||||||
|
)
|
||||||
|
.bind(q).bind(artist)
|
||||||
|
.fetch_one(pool).await?;
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn search_artists_lib(
|
||||||
|
pool: &PgPool,
|
||||||
|
q: &str,
|
||||||
|
limit: i64, offset: i64,
|
||||||
|
) -> Result<Vec<ArtistRow>, sqlx::Error> {
|
||||||
|
sqlx::query_as::<_, ArtistRow>(
|
||||||
|
r#"SELECT ar.id, ar.name,
|
||||||
|
COUNT(DISTINCT al.id) AS album_count,
|
||||||
|
COUNT(DISTINCT ta.track_id) AS track_count
|
||||||
|
FROM artists ar
|
||||||
|
LEFT JOIN albums al ON al.artist_id = ar.id
|
||||||
|
LEFT JOIN track_artists ta ON ta.artist_id = ar.id AND ta.role = 'primary'
|
||||||
|
WHERE ($1 = '' OR ar.name ILIKE '%' || $1 || '%')
|
||||||
|
GROUP BY ar.id, ar.name
|
||||||
|
ORDER BY ar.name
|
||||||
|
LIMIT $2 OFFSET $3"#,
|
||||||
|
)
|
||||||
|
.bind(q).bind(limit).bind(offset)
|
||||||
|
.fetch_all(pool).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn count_artists_lib(pool: &PgPool, q: &str) -> Result<i64, sqlx::Error> {
|
||||||
|
let (n,): (i64,) = sqlx::query_as(
|
||||||
|
"SELECT COUNT(*) FROM artists WHERE ($1 = '' OR name ILIKE '%' || $1 || '%')"
|
||||||
|
)
|
||||||
|
.bind(q)
|
||||||
|
.fetch_one(pool).await?;
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Track full details ---
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct TrackFull {
|
||||||
|
pub id: i64,
|
||||||
|
pub title: String,
|
||||||
|
pub artist_id: i64,
|
||||||
|
pub artist_name: String,
|
||||||
|
pub album_id: Option<i64>,
|
||||||
|
pub album_name: Option<String>,
|
||||||
|
pub track_number: Option<i32>,
|
||||||
|
pub duration_secs: Option<f64>,
|
||||||
|
pub genre: Option<String>,
|
||||||
|
pub file_hash: String,
|
||||||
|
pub file_size: i64,
|
||||||
|
pub storage_path: String,
|
||||||
|
pub featured_artists: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_track_full(pool: &PgPool, id: i64) -> Result<Option<TrackFull>, sqlx::Error> {
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct Row {
|
||||||
|
id: i64, title: String, artist_id: i64, artist_name: String,
|
||||||
|
album_id: Option<i64>, album_name: Option<String>,
|
||||||
|
track_number: Option<i32>, duration_secs: Option<f64>,
|
||||||
|
genre: Option<String>, file_hash: String, file_size: i64, storage_path: String,
|
||||||
|
}
|
||||||
|
let row: Option<Row> = sqlx::query_as(
|
||||||
|
r#"SELECT t.id, t.title,
|
||||||
|
ta_p.artist_id, ar.name AS artist_name,
|
||||||
|
t.album_id, al.name AS album_name,
|
||||||
|
t.track_number, t.duration_secs, t.genre,
|
||||||
|
t.file_hash, t.file_size, t.storage_path
|
||||||
|
FROM tracks t
|
||||||
|
JOIN track_artists ta_p ON ta_p.track_id = t.id AND ta_p.role = 'primary'
|
||||||
|
JOIN artists ar ON ar.id = ta_p.artist_id
|
||||||
|
LEFT JOIN albums al ON al.id = t.album_id
|
||||||
|
WHERE t.id = $1"#,
|
||||||
|
).bind(id).fetch_optional(pool).await?;
|
||||||
|
|
||||||
|
let row = match row { Some(r) => r, None => return Ok(None) };
|
||||||
|
|
||||||
|
let feat: Vec<(String,)> = sqlx::query_as(
|
||||||
|
"SELECT ar.name FROM track_artists ta JOIN artists ar ON ar.id=ta.artist_id WHERE ta.track_id=$1 AND ta.role='featured' ORDER BY ta.id"
|
||||||
|
).bind(id).fetch_all(pool).await?;
|
||||||
|
|
||||||
|
Ok(Some(TrackFull {
|
||||||
|
id: row.id, title: row.title, artist_id: row.artist_id, artist_name: row.artist_name,
|
||||||
|
album_id: row.album_id, album_name: row.album_name, track_number: row.track_number,
|
||||||
|
duration_secs: row.duration_secs, genre: row.genre, file_hash: row.file_hash,
|
||||||
|
file_size: row.file_size, storage_path: row.storage_path,
|
||||||
|
featured_artists: feat.into_iter().map(|(n,)| n).collect(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct TrackUpdateFields {
|
||||||
|
pub title: String,
|
||||||
|
pub artist_id: i64,
|
||||||
|
pub album_id: Option<i64>,
|
||||||
|
pub track_number: Option<i32>,
|
||||||
|
pub genre: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub featured_artists: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_track_metadata(pool: &PgPool, id: i64, f: &TrackUpdateFields) -> Result<(), sqlx::Error> {
|
||||||
|
sqlx::query("UPDATE tracks SET title=$2, album_id=$3, track_number=$4, genre=$5 WHERE id=$1")
|
||||||
|
.bind(id).bind(&f.title).bind(f.album_id).bind(f.track_number).bind(&f.genre)
|
||||||
|
.execute(pool).await?;
|
||||||
|
sqlx::query("UPDATE track_artists SET artist_id=$2 WHERE track_id=$1 AND role='primary'")
|
||||||
|
.bind(id).bind(f.artist_id).execute(pool).await?;
|
||||||
|
// Rebuild featured artists
|
||||||
|
sqlx::query("DELETE FROM track_artists WHERE track_id=$1 AND role='featured'")
|
||||||
|
.bind(id).execute(pool).await?;
|
||||||
|
for name in &f.featured_artists {
|
||||||
|
let feat_id = upsert_artist(pool, name).await?;
|
||||||
|
link_track_artist(pool, id, feat_id, "featured").await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Album full details ---
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct AlbumDetails {
|
||||||
|
pub id: i64,
|
||||||
|
pub name: String,
|
||||||
|
pub year: Option<i32>,
|
||||||
|
pub artist_id: i64,
|
||||||
|
pub artist_name: String,
|
||||||
|
pub tracks: Vec<AlbumTrackRow>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||||
|
pub struct AlbumTrackRow {
|
||||||
|
pub id: i64,
|
||||||
|
pub title: String,
|
||||||
|
pub track_number: Option<i32>,
|
||||||
|
pub duration_secs: Option<f64>,
|
||||||
|
pub artist_name: String,
|
||||||
|
pub genre: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_album_details(pool: &PgPool, id: i64) -> Result<Option<AlbumDetails>, sqlx::Error> {
|
||||||
|
let row: Option<(i64, String, Option<i32>, i64, String)> = sqlx::query_as(
|
||||||
|
"SELECT a.id, a.name, a.year, ar.id, ar.name FROM albums a JOIN artists ar ON ar.id=a.artist_id WHERE a.id=$1"
|
||||||
|
).bind(id).fetch_optional(pool).await?;
|
||||||
|
let (aid, aname, ayear, artist_id, artist_name) = match row { Some(r) => r, None => return Ok(None) };
|
||||||
|
let tracks: Vec<AlbumTrackRow> = sqlx::query_as(
|
||||||
|
r#"SELECT t.id, t.title, t.track_number, t.duration_secs, ar.name AS artist_name, t.genre
|
||||||
|
FROM tracks t
|
||||||
|
JOIN track_artists ta ON ta.track_id=t.id AND ta.role='primary'
|
||||||
|
JOIN artists ar ON ar.id=ta.artist_id
|
||||||
|
WHERE t.album_id=$1 ORDER BY t.track_number NULLS LAST, t.title"#
|
||||||
|
).bind(id).fetch_all(pool).await?;
|
||||||
|
Ok(Some(AlbumDetails { id: aid, name: aname, year: ayear, artist_id, artist_name, tracks }))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_album_full(pool: &PgPool, id: i64, name: &str, year: Option<i32>, artist_id: i64) -> Result<(), sqlx::Error> {
|
||||||
|
sqlx::query("UPDATE albums SET name=$2, year=$3, artist_id=$4 WHERE id=$1")
|
||||||
|
.bind(id).bind(name).bind(year).bind(artist_id).execute(pool).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reorder_tracks(pool: &PgPool, orders: &[(i64, i32)]) -> Result<(), sqlx::Error> {
|
||||||
|
for &(track_id, track_number) in orders {
|
||||||
|
sqlx::query("UPDATE tracks SET track_number=$2 WHERE id=$1")
|
||||||
|
.bind(track_id).bind(track_number).execute(pool).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_album_cover(pool: &PgPool, album_id: i64) -> Result<Option<(String, String)>, sqlx::Error> {
|
||||||
|
let row: Option<(String, String)> = sqlx::query_as(
|
||||||
|
"SELECT file_path, mime_type FROM album_images WHERE album_id=$1 LIMIT 1"
|
||||||
|
).bind(album_id).fetch_optional(pool).await?;
|
||||||
|
Ok(row)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn search_albums_for_artist(pool: &PgPool, q: &str, artist_id: Option<i64>) -> Result<Vec<(i64, String)>, sqlx::Error> {
|
||||||
|
if let Some(aid) = artist_id {
|
||||||
|
let rows: Vec<(i64, String)> = sqlx::query_as(
|
||||||
|
"SELECT id, name FROM albums WHERE artist_id=$1 AND ($2='' OR name ILIKE '%'||$2||'%') ORDER BY year NULLS LAST, name LIMIT 15"
|
||||||
|
).bind(aid).bind(q).fetch_all(pool).await?;
|
||||||
|
Ok(rows)
|
||||||
|
} else {
|
||||||
|
let rows: Vec<(i64, String)> = sqlx::query_as(
|
||||||
|
"SELECT id, name FROM albums WHERE $1='' OR name ILIKE '%'||$1||'%' ORDER BY name LIMIT 15"
|
||||||
|
).bind(q).fetch_all(pool).await?;
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================== Artist Merges ===================
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||||
|
pub struct ArtistMerge {
|
||||||
|
pub id: Uuid,
|
||||||
|
pub status: String,
|
||||||
|
pub source_artist_ids: String,
|
||||||
|
pub proposal: Option<String>,
|
||||||
|
pub llm_notes: Option<String>,
|
||||||
|
pub error_message: Option<String>,
|
||||||
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
pub updated_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct ArtistFullData {
|
||||||
|
pub id: i64,
|
||||||
|
pub name: String,
|
||||||
|
pub albums: Vec<AlbumFullData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct AlbumFullData {
|
||||||
|
pub id: i64,
|
||||||
|
pub name: String,
|
||||||
|
pub year: Option<i32>,
|
||||||
|
pub tracks: Vec<TrackBasic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||||
|
pub struct TrackBasic {
|
||||||
|
pub id: i64,
|
||||||
|
pub title: String,
|
||||||
|
pub track_number: Option<i32>,
|
||||||
|
pub storage_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, sqlx::FromRow)]
|
||||||
|
pub struct TrackWithAlbum {
|
||||||
|
pub id: i64,
|
||||||
|
pub storage_path: String,
|
||||||
|
pub album_name: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn insert_artist_merge(pool: &PgPool, source_artist_ids: &[i64]) -> Result<Uuid, sqlx::Error> {
|
||||||
|
let ids_json = serde_json::to_string(source_artist_ids).unwrap_or_default();
|
||||||
|
let row: (Uuid,) = sqlx::query_as(
|
||||||
|
"INSERT INTO artist_merges (source_artist_ids) VALUES ($1) RETURNING id"
|
||||||
|
).bind(&ids_json).fetch_one(pool).await?;
|
||||||
|
Ok(row.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_artist_merges(pool: &PgPool) -> Result<Vec<ArtistMerge>, sqlx::Error> {
|
||||||
|
sqlx::query_as::<_, ArtistMerge>("SELECT * FROM artist_merges ORDER BY created_at DESC")
|
||||||
|
.fetch_all(pool).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_artist_merge(pool: &PgPool, id: Uuid) -> Result<Option<ArtistMerge>, sqlx::Error> {
|
||||||
|
sqlx::query_as::<_, ArtistMerge>("SELECT * FROM artist_merges WHERE id = $1")
|
||||||
|
.bind(id).fetch_optional(pool).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_merge_status(pool: &PgPool, id: Uuid, status: &str, error: Option<&str>) -> Result<(), sqlx::Error> {
|
||||||
|
sqlx::query("UPDATE artist_merges SET status = $2, error_message = $3, updated_at = NOW() WHERE id = $1")
|
||||||
|
.bind(id).bind(status).bind(error).execute(pool).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_merge_proposal(pool: &PgPool, id: Uuid, proposal_json: &str, notes: &str) -> Result<(), sqlx::Error> {
|
||||||
|
sqlx::query("UPDATE artist_merges SET proposal = $2, llm_notes = $3, status = 'review', error_message = NULL, updated_at = NOW() WHERE id = $1")
|
||||||
|
.bind(id).bind(proposal_json).bind(notes).execute(pool).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_pending_merges_for_processing(pool: &PgPool) -> Result<Vec<Uuid>, sqlx::Error> {
|
||||||
|
let rows: Vec<(Uuid,)> = sqlx::query_as(
|
||||||
|
"SELECT id FROM artist_merges WHERE status = 'pending' ORDER BY created_at ASC LIMIT 5"
|
||||||
|
).fetch_all(pool).await?;
|
||||||
|
Ok(rows.into_iter().map(|(id,)| id).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_artists_full_data(pool: &PgPool, ids: &[i64]) -> Result<Vec<ArtistFullData>, sqlx::Error> {
|
||||||
|
let mut result = Vec::new();
|
||||||
|
for &id in ids {
|
||||||
|
let artist: Artist = sqlx::query_as("SELECT id, name FROM artists WHERE id = $1")
|
||||||
|
.bind(id).fetch_one(pool).await?;
|
||||||
|
let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE artist_id = $1 ORDER BY year NULLS LAST, name")
|
||||||
|
.bind(id).fetch_all(pool).await?;
|
||||||
|
let mut album_data = Vec::new();
|
||||||
|
for album in albums {
|
||||||
|
let tracks: Vec<TrackBasic> = sqlx::query_as(
|
||||||
|
"SELECT id, title, track_number, storage_path FROM tracks WHERE album_id = $1 ORDER BY track_number NULLS LAST, title"
|
||||||
|
).bind(album.id).fetch_all(pool).await?;
|
||||||
|
album_data.push(AlbumFullData { id: album.id, name: album.name, year: album.year, tracks });
|
||||||
|
}
|
||||||
|
result.push(ArtistFullData { id, name: artist.name, albums: album_data });
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_tracks_with_albums_for_artist(pool: &PgPool, artist_id: i64) -> Result<Vec<TrackWithAlbum>, sqlx::Error> {
|
||||||
|
sqlx::query_as::<_, TrackWithAlbum>(
|
||||||
|
r#"SELECT t.id, t.storage_path, a.name as album_name
|
||||||
|
FROM tracks t
|
||||||
|
LEFT JOIN albums a ON a.id = t.album_id
|
||||||
|
WHERE t.id IN (
|
||||||
|
SELECT track_id FROM track_artists WHERE artist_id = $1 AND role = 'primary'
|
||||||
|
)"#
|
||||||
|
).bind(artist_id).fetch_all(pool).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_track_storage_path(pool: &PgPool, track_id: i64, new_path: &str) -> Result<(), sqlx::Error> {
|
||||||
|
sqlx::query("UPDATE tracks SET storage_path = $2 WHERE id = $1")
|
||||||
|
.bind(track_id).bind(new_path).execute(pool).await?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,24 @@ pub async fn run(state: Arc<AppState>) {
|
|||||||
Ok(count) => tracing::info!(count, "processed new files"),
|
Ok(count) => tracing::info!(count, "processed new files"),
|
||||||
Err(e) => tracing::error!(?e, "inbox scan failed"),
|
Err(e) => tracing::error!(?e, "inbox scan failed"),
|
||||||
}
|
}
|
||||||
|
// Re-process pending tracks (e.g. retried from admin UI)
|
||||||
|
match reprocess_pending(&state).await {
|
||||||
|
Ok(0) => {}
|
||||||
|
Ok(count) => tracing::info!(count, "re-processed pending tracks"),
|
||||||
|
Err(e) => tracing::error!(?e, "pending re-processing failed"),
|
||||||
|
}
|
||||||
|
// Process pending merge proposals
|
||||||
|
match db::get_pending_merges_for_processing(&state.pool).await {
|
||||||
|
Ok(merge_ids) => {
|
||||||
|
for merge_id in merge_ids {
|
||||||
|
if let Err(e) = crate::merge::propose_merge(&state, merge_id).await {
|
||||||
|
tracing::error!(id = %merge_id, ?e, "Merge proposal failed");
|
||||||
|
let _ = db::update_merge_status(&state.pool, merge_id, "error", Some(&e.to_string())).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => tracing::error!(?e, "Failed to load pending merges"),
|
||||||
|
}
|
||||||
tokio::time::sleep(interval).await;
|
tokio::time::sleep(interval).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -61,6 +79,143 @@ async fn scan_inbox(state: &Arc<AppState>) -> anyhow::Result<usize> {
|
|||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Re-process pending tracks from DB (e.g. tracks retried via admin UI).
|
||||||
|
/// These already have raw metadata and path hints stored — just need RAG + LLM.
|
||||||
|
async fn reprocess_pending(state: &Arc<AppState>) -> anyhow::Result<usize> {
|
||||||
|
let pending = db::list_pending_for_processing(&state.pool, 10).await?;
|
||||||
|
if pending.is_empty() {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut count = 0;
|
||||||
|
for pt in &pending {
|
||||||
|
tracing::info!(id = %pt.id, title = pt.raw_title.as_deref().unwrap_or("?"), "Re-processing pending track");
|
||||||
|
|
||||||
|
db::update_pending_status(&state.pool, pt.id, "processing", None).await?;
|
||||||
|
|
||||||
|
// Build raw metadata and hints from stored DB fields
|
||||||
|
let raw_meta = metadata::RawMetadata {
|
||||||
|
title: pt.raw_title.clone(),
|
||||||
|
artist: pt.raw_artist.clone(),
|
||||||
|
album: pt.raw_album.clone(),
|
||||||
|
track_number: pt.raw_track_number.map(|n| n as u32),
|
||||||
|
year: pt.raw_year.map(|n| n as u32),
|
||||||
|
genre: pt.raw_genre.clone(),
|
||||||
|
duration_secs: pt.duration_secs,
|
||||||
|
};
|
||||||
|
|
||||||
|
let hints = db::PathHints {
|
||||||
|
title: pt.path_title.clone(),
|
||||||
|
artist: pt.path_artist.clone(),
|
||||||
|
album: pt.path_album.clone(),
|
||||||
|
year: pt.path_year,
|
||||||
|
track_number: pt.path_track_number,
|
||||||
|
};
|
||||||
|
|
||||||
|
// RAG lookup
|
||||||
|
let artist_query = raw_meta.artist.as_deref()
|
||||||
|
.or(hints.artist.as_deref())
|
||||||
|
.unwrap_or("");
|
||||||
|
let album_query = raw_meta.album.as_deref()
|
||||||
|
.or(hints.album.as_deref())
|
||||||
|
.unwrap_or("");
|
||||||
|
|
||||||
|
let similar_artists = if !artist_query.is_empty() {
|
||||||
|
db::find_similar_artists(&state.pool, artist_query, 5).await.unwrap_or_default()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
let similar_albums = if !album_query.is_empty() {
|
||||||
|
db::find_similar_albums(&state.pool, album_query, 5).await.unwrap_or_default()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
// LLM normalization
|
||||||
|
match normalize::normalize(state, &raw_meta, &hints, &similar_artists, &similar_albums).await {
|
||||||
|
Ok(normalized) => {
|
||||||
|
let confidence = normalized.confidence.unwrap_or(0.0);
|
||||||
|
let status = if confidence >= state.config.confidence_threshold {
|
||||||
|
"approved"
|
||||||
|
} else {
|
||||||
|
"review"
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
id = %pt.id,
|
||||||
|
norm_artist = normalized.artist.as_deref().unwrap_or("-"),
|
||||||
|
norm_title = normalized.title.as_deref().unwrap_or("-"),
|
||||||
|
confidence,
|
||||||
|
status,
|
||||||
|
"Re-processing complete"
|
||||||
|
);
|
||||||
|
|
||||||
|
db::update_pending_normalized(&state.pool, pt.id, status, &normalized, None).await?;
|
||||||
|
|
||||||
|
if status == "approved" {
|
||||||
|
let artist = normalized.artist.as_deref().unwrap_or("Unknown Artist");
|
||||||
|
let album = normalized.album.as_deref().unwrap_or("Unknown Album");
|
||||||
|
let title = normalized.title.as_deref().unwrap_or("Unknown Title");
|
||||||
|
let source = std::path::Path::new(&pt.inbox_path);
|
||||||
|
let ext = source.extension().and_then(|e| e.to_str()).unwrap_or("flac");
|
||||||
|
let track_num = normalized.track_number.unwrap_or(0);
|
||||||
|
|
||||||
|
let dest_filename = if track_num > 0 {
|
||||||
|
format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext)
|
||||||
|
} else {
|
||||||
|
format!("{}.{}", sanitize_filename(title), ext)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check if already moved
|
||||||
|
let dest = state.config.storage_dir
|
||||||
|
.join(sanitize_filename(artist))
|
||||||
|
.join(sanitize_filename(album))
|
||||||
|
.join(&dest_filename);
|
||||||
|
|
||||||
|
let (storage_path, was_merged) = if dest.exists() && !source.exists() {
|
||||||
|
(dest.to_string_lossy().to_string(), false)
|
||||||
|
} else if source.exists() {
|
||||||
|
match mover::move_to_storage(
|
||||||
|
&state.config.storage_dir, artist, album, &dest_filename, source,
|
||||||
|
).await {
|
||||||
|
Ok(mover::MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false),
|
||||||
|
Ok(mover::MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(id = %pt.id, ?e, "Failed to move file");
|
||||||
|
db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::error!(id = %pt.id, "Source file missing: {:?}", source);
|
||||||
|
db::update_pending_status(&state.pool, pt.id, "error", Some("Source file missing")).await?;
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
match db::approve_and_finalize(&state.pool, pt.id, &storage_path).await {
|
||||||
|
Ok(track_id) => {
|
||||||
|
if was_merged {
|
||||||
|
let _ = db::update_pending_status(&state.pool, pt.id, "merged", None).await;
|
||||||
|
}
|
||||||
|
tracing::info!(id = %pt.id, track_id, "Track finalized");
|
||||||
|
}
|
||||||
|
Err(e) => tracing::error!(id = %pt.id, ?e, "Failed to finalize"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(id = %pt.id, ?e, "LLM normalization failed");
|
||||||
|
db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
/// Recursively remove empty directories inside the inbox.
|
/// Recursively remove empty directories inside the inbox.
|
||||||
/// Does not remove the inbox root itself.
|
/// Does not remove the inbox root itself.
|
||||||
async fn cleanup_empty_dirs(dir: &std::path::Path) -> bool {
|
async fn cleanup_empty_dirs(dir: &std::path::Path) -> bool {
|
||||||
@@ -335,10 +490,17 @@ async fn process_file(state: &Arc<AppState>, file_path: &std::path::Path) -> any
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(storage_path) => {
|
Ok(outcome) => {
|
||||||
|
let (storage_path, was_merged) = match outcome {
|
||||||
|
mover::MoveOutcome::Moved(p) => (p, false),
|
||||||
|
mover::MoveOutcome::Merged(p) => (p, true),
|
||||||
|
};
|
||||||
let rel_path = storage_path.to_string_lossy().to_string();
|
let rel_path = storage_path.to_string_lossy().to_string();
|
||||||
match db::approve_and_finalize(&state.pool, pending_id, &rel_path).await {
|
match db::approve_and_finalize(&state.pool, pending_id, &rel_path).await {
|
||||||
Ok(track_id) => {
|
Ok(track_id) => {
|
||||||
|
if was_merged {
|
||||||
|
let _ = db::update_pending_status(&state.pool, pending_id, "merged", None).await;
|
||||||
|
}
|
||||||
tracing::info!(file = filename, track_id, storage = %rel_path, "Track finalized in database");
|
tracing::info!(file = filename, track_id, storage = %rel_path, "Track finalized in database");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -1,18 +1,27 @@
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
pub enum MoveOutcome {
|
||||||
|
/// File was moved/renamed to destination.
|
||||||
|
Moved(PathBuf),
|
||||||
|
/// Destination already existed; inbox duplicate was removed.
|
||||||
|
Merged(PathBuf),
|
||||||
|
}
|
||||||
|
|
||||||
/// Move a file from inbox to the permanent storage directory.
|
/// Move a file from inbox to the permanent storage directory.
|
||||||
///
|
///
|
||||||
/// Creates the directory structure: `storage_dir/artist/album/filename`
|
/// Creates the directory structure: `storage_dir/artist/album/filename`
|
||||||
/// Returns the full path of the moved file.
|
/// Returns the full path of the moved file.
|
||||||
///
|
///
|
||||||
/// If `rename` fails (cross-device), falls back to copy + remove.
|
/// If `rename` fails (cross-device), falls back to copy + remove.
|
||||||
|
/// If the destination already exists the inbox copy is removed and
|
||||||
|
/// `MoveOutcome::Merged` is returned instead of an error.
|
||||||
pub async fn move_to_storage(
|
pub async fn move_to_storage(
|
||||||
storage_dir: &Path,
|
storage_dir: &Path,
|
||||||
artist: &str,
|
artist: &str,
|
||||||
album: &str,
|
album: &str,
|
||||||
filename: &str,
|
filename: &str,
|
||||||
source: &Path,
|
source: &Path,
|
||||||
) -> anyhow::Result<PathBuf> {
|
) -> anyhow::Result<MoveOutcome> {
|
||||||
let artist_dir = sanitize_dir_name(artist);
|
let artist_dir = sanitize_dir_name(artist);
|
||||||
let album_dir = sanitize_dir_name(album);
|
let album_dir = sanitize_dir_name(album);
|
||||||
|
|
||||||
@@ -21,9 +30,13 @@ pub async fn move_to_storage(
|
|||||||
|
|
||||||
let dest = dest_dir.join(filename);
|
let dest = dest_dir.join(filename);
|
||||||
|
|
||||||
// Avoid overwriting existing files
|
// File already at destination — remove the inbox duplicate
|
||||||
if dest.exists() {
|
if dest.exists() {
|
||||||
anyhow::bail!("Destination already exists: {:?}", dest);
|
if source.exists() {
|
||||||
|
tokio::fs::remove_file(source).await?;
|
||||||
|
tracing::info!(from = ?source, to = ?dest, "merged duplicate into existing storage file");
|
||||||
|
}
|
||||||
|
return Ok(MoveOutcome::Merged(dest));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try atomic rename first (same filesystem)
|
// Try atomic rename first (same filesystem)
|
||||||
@@ -37,7 +50,7 @@ pub async fn move_to_storage(
|
|||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(from = ?source, to = ?dest, "moved file to storage");
|
tracing::info!(from = ?source, to = ?dest, "moved file to storage");
|
||||||
Ok(dest)
|
Ok(MoveOutcome::Moved(dest))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove characters that are unsafe for directory names.
|
/// Remove characters that are unsafe for directory names.
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ pub async fn normalize(
|
|||||||
&state.config.ollama_model,
|
&state.config.ollama_model,
|
||||||
&state.system_prompt,
|
&state.system_prompt,
|
||||||
&user_message,
|
&user_message,
|
||||||
|
state.config.ollama_auth.as_deref(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -120,11 +121,12 @@ struct OllamaResponseMessage {
|
|||||||
content: String,
|
content: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn call_ollama(
|
pub async fn call_ollama(
|
||||||
base_url: &str,
|
base_url: &str,
|
||||||
model: &str,
|
model: &str,
|
||||||
system_prompt: &str,
|
system_prompt: &str,
|
||||||
user_message: &str,
|
user_message: &str,
|
||||||
|
auth: Option<&str>,
|
||||||
) -> anyhow::Result<String> {
|
) -> anyhow::Result<String> {
|
||||||
let client = reqwest::Client::builder()
|
let client = reqwest::Client::builder()
|
||||||
.timeout(std::time::Duration::from_secs(120))
|
.timeout(std::time::Duration::from_secs(120))
|
||||||
@@ -151,7 +153,11 @@ async fn call_ollama(
|
|||||||
tracing::info!(%url, model, prompt_len = user_message.len(), "Calling Ollama API...");
|
tracing::info!(%url, model, prompt_len = user_message.len(), "Calling Ollama API...");
|
||||||
|
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
let resp = client.post(&url).json(&request).send().await?;
|
let mut req = client.post(&url).json(&request);
|
||||||
|
if let Some(auth_header) = auth {
|
||||||
|
req = req.header("Authorization", auth_header);
|
||||||
|
}
|
||||||
|
let resp = req.send().await?;
|
||||||
let elapsed = start.elapsed();
|
let elapsed = start.elapsed();
|
||||||
|
|
||||||
if !resp.status().is_success() {
|
if !resp.status().is_success() {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
mod config;
|
mod config;
|
||||||
mod db;
|
mod db;
|
||||||
mod ingest;
|
mod ingest;
|
||||||
|
mod merge;
|
||||||
mod web;
|
mod web;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -24,6 +25,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let system_prompt = args.load_system_prompt()?;
|
let system_prompt = args.load_system_prompt()?;
|
||||||
tracing::info!("System prompt loaded: {} chars", system_prompt.len());
|
tracing::info!("System prompt loaded: {} chars", system_prompt.len());
|
||||||
|
|
||||||
|
let merge_prompt = args.load_merge_prompt()?;
|
||||||
|
tracing::info!("Merge prompt loaded: {} chars", merge_prompt.len());
|
||||||
|
|
||||||
tracing::info!("Connecting to database...");
|
tracing::info!("Connecting to database...");
|
||||||
let pool = db::connect(&args.database_url).await?;
|
let pool = db::connect(&args.database_url).await?;
|
||||||
tracing::info!("Running database migrations...");
|
tracing::info!("Running database migrations...");
|
||||||
@@ -34,6 +38,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
pool: pool.clone(),
|
pool: pool.clone(),
|
||||||
config: Arc::new(args),
|
config: Arc::new(args),
|
||||||
system_prompt: Arc::new(system_prompt),
|
system_prompt: Arc::new(system_prompt),
|
||||||
|
merge_prompt: Arc::new(merge_prompt),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Spawn the ingest pipeline as a background task
|
// Spawn the ingest pipeline as a background task
|
||||||
|
|||||||
325
furumi-agent/src/merge.rs
Normal file
325
furumi-agent/src/merge.rs
Normal file
@@ -0,0 +1,325 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::db;
|
||||||
|
use crate::web::AppState;
|
||||||
|
use crate::ingest::normalize::call_ollama;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct MergeProposal {
|
||||||
|
pub canonical_artist_name: String,
|
||||||
|
pub winner_artist_id: i64,
|
||||||
|
pub album_mappings: Vec<AlbumMapping>,
|
||||||
|
pub notes: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct AlbumMapping {
|
||||||
|
pub source_album_id: i64,
|
||||||
|
pub canonical_name: String,
|
||||||
|
pub merge_into_album_id: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn propose_merge(state: &Arc<AppState>, merge_id: Uuid) -> anyhow::Result<()> {
|
||||||
|
db::update_merge_status(&state.pool, merge_id, "processing", None).await?;
|
||||||
|
|
||||||
|
let merge = db::get_artist_merge(&state.pool, merge_id).await?
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Merge not found: {}", merge_id))?;
|
||||||
|
|
||||||
|
let source_ids: Vec<i64> = serde_json::from_str(&merge.source_artist_ids)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Invalid source_artist_ids: {}", e))?;
|
||||||
|
|
||||||
|
let artists_data = db::get_artists_full_data(&state.pool, &source_ids).await?;
|
||||||
|
|
||||||
|
let user_message = build_merge_message(&artists_data);
|
||||||
|
|
||||||
|
let response = call_ollama(
|
||||||
|
&state.config.ollama_url,
|
||||||
|
&state.config.ollama_model,
|
||||||
|
&state.merge_prompt,
|
||||||
|
&user_message,
|
||||||
|
state.config.ollama_auth.as_deref(),
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
let proposal = parse_merge_response(&response)?;
|
||||||
|
let notes = proposal.notes.clone();
|
||||||
|
let proposal_json = serde_json::to_string(&proposal)?;
|
||||||
|
|
||||||
|
db::update_merge_proposal(&state.pool, merge_id, &proposal_json, ¬es).await?;
|
||||||
|
tracing::info!(id = %merge_id, "Merge proposal generated");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_merge_message(artists: &[db::ArtistFullData]) -> String {
|
||||||
|
let mut msg = String::from("## Artists to merge\n\n");
|
||||||
|
for artist in artists {
|
||||||
|
msg.push_str(&format!("### Artist ID {}: \"{}\"\n", artist.id, artist.name));
|
||||||
|
if artist.albums.is_empty() {
|
||||||
|
msg.push_str(" (no albums)\n");
|
||||||
|
}
|
||||||
|
for album in &artist.albums {
|
||||||
|
let year_str = album.year.map(|y| format!(" ({})", y)).unwrap_or_default();
|
||||||
|
msg.push_str(&format!(" Album ID {}: \"{}\"{}\n", album.id, album.name, year_str));
|
||||||
|
for track in &album.tracks {
|
||||||
|
let num = track.track_number.map(|n| format!("{:02}. ", n)).unwrap_or_default();
|
||||||
|
msg.push_str(&format!(" - {}\"{}\" [track_id={}]\n", num, track.title, track.id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg.push('\n');
|
||||||
|
}
|
||||||
|
msg
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_merge_response(response: &str) -> anyhow::Result<MergeProposal> {
|
||||||
|
let cleaned = response.trim();
|
||||||
|
let json_str = if cleaned.starts_with("```") {
|
||||||
|
let start = cleaned.find('{').unwrap_or(0);
|
||||||
|
let end = cleaned.rfind('}').map(|i| i + 1).unwrap_or(cleaned.len());
|
||||||
|
&cleaned[start..end]
|
||||||
|
} else {
|
||||||
|
cleaned
|
||||||
|
};
|
||||||
|
serde_json::from_str(json_str)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to parse merge LLM response: {} — raw: {}", e, response))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute_merge(state: &Arc<AppState>, merge_id: Uuid) -> anyhow::Result<()> {
|
||||||
|
let merge = db::get_artist_merge(&state.pool, merge_id).await?
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Merge not found"))?;
|
||||||
|
|
||||||
|
let proposal_str = merge.proposal.ok_or_else(|| anyhow::anyhow!("No proposal to execute"))?;
|
||||||
|
let proposal: MergeProposal = serde_json::from_str(&proposal_str)?;
|
||||||
|
|
||||||
|
let source_ids: Vec<i64> = serde_json::from_str(&merge.source_artist_ids)?;
|
||||||
|
let loser_ids: Vec<i64> = source_ids.iter().copied()
|
||||||
|
.filter(|&id| id != proposal.winner_artist_id).collect();
|
||||||
|
|
||||||
|
// Execute all DB mutations in a single atomic transaction.
|
||||||
|
// On error the transaction rolls back automatically (dropped without commit).
|
||||||
|
let mut tx = state.pool.begin().await?;
|
||||||
|
if let Err(e) = merge_db(&mut tx, &proposal, &loser_ids).await {
|
||||||
|
// tx is dropped here → auto-rollback
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
tx.commit().await?;
|
||||||
|
|
||||||
|
// Move files after commit (best-effort; storage_path updated per file)
|
||||||
|
let tracks = db::get_tracks_with_albums_for_artist(&state.pool, proposal.winner_artist_id).await?;
|
||||||
|
for track in &tracks {
|
||||||
|
let current = std::path::Path::new(&track.storage_path);
|
||||||
|
let filename = match current.file_name() {
|
||||||
|
Some(f) => f.to_string_lossy().to_string(),
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
let album_name = track.album_name.as_deref().unwrap_or("Unknown Album");
|
||||||
|
let new_path = state.config.storage_dir
|
||||||
|
.join(sanitize(&proposal.canonical_artist_name))
|
||||||
|
.join(sanitize(album_name))
|
||||||
|
.join(&filename);
|
||||||
|
|
||||||
|
if current != new_path.as_path() {
|
||||||
|
if current.exists() {
|
||||||
|
if let Some(parent) = new_path.parent() {
|
||||||
|
let _ = tokio::fs::create_dir_all(parent).await;
|
||||||
|
}
|
||||||
|
let moved = tokio::fs::rename(current, &new_path).await;
|
||||||
|
if moved.is_err() {
|
||||||
|
if let Ok(_) = tokio::fs::copy(current, &new_path).await {
|
||||||
|
let _ = tokio::fs::remove_file(current).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
db::update_track_storage_path(&state.pool, track.id, &new_path.to_string_lossy()).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
db::update_merge_status(&state.pool, merge_id, "approved", None).await?;
|
||||||
|
tracing::info!(id = %merge_id, "Merge executed successfully");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// All DB mutations for a merge, executed inside a single transaction.
|
||||||
|
/// `tx` is a `Transaction<'_, Postgres>` which derefs to `PgConnection`.
|
||||||
|
async fn merge_db(
|
||||||
|
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||||
|
proposal: &MergeProposal,
|
||||||
|
loser_ids: &[i64],
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// 1. Rename winner artist to canonical name
|
||||||
|
sqlx::query("UPDATE artists SET name = $2 WHERE id = $1")
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.bind(&proposal.canonical_artist_name)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
// 2. Process album mappings from the proposal
|
||||||
|
for mapping in &proposal.album_mappings {
|
||||||
|
// Skip if source was already processed (idempotent retry support)
|
||||||
|
let src_exists: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)")
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.fetch_one(&mut **tx).await?;
|
||||||
|
if !src_exists.0 { continue; }
|
||||||
|
|
||||||
|
if let Some(target_id) = mapping.merge_into_album_id {
|
||||||
|
album_merge_into(tx, mapping.source_album_id, target_id).await?;
|
||||||
|
} else {
|
||||||
|
// Rename first
|
||||||
|
sqlx::query("UPDATE albums SET name = $2 WHERE id = $1")
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.bind(&mapping.canonical_name)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
// Check if winner already has an album with this canonical name (excluding self)
|
||||||
|
let conflict: Option<(i64,)> = sqlx::query_as(
|
||||||
|
"SELECT id FROM albums WHERE artist_id = $1 AND name = $2 AND id != $3"
|
||||||
|
)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.bind(&mapping.canonical_name)
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.fetch_optional(&mut **tx).await?;
|
||||||
|
|
||||||
|
if let Some((existing_id,)) = conflict {
|
||||||
|
album_merge_into(tx, mapping.source_album_id, existing_id).await?;
|
||||||
|
} else {
|
||||||
|
// Just move to winner artist (only if not already there)
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE albums SET artist_id = $2 WHERE id = $1 AND artist_id != $2"
|
||||||
|
)
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Move all remaining albums from each loser to winner, merging name conflicts
|
||||||
|
for &loser_id in loser_ids {
|
||||||
|
loop {
|
||||||
|
// Fetch one album at a time; loop because merging changes the set
|
||||||
|
let album: Option<(i64, String)> = sqlx::query_as(
|
||||||
|
"SELECT id, name FROM albums WHERE artist_id = $1 LIMIT 1"
|
||||||
|
)
|
||||||
|
.bind(loser_id)
|
||||||
|
.fetch_optional(&mut **tx).await?;
|
||||||
|
|
||||||
|
let (album_id, album_name) = match album {
|
||||||
|
Some(a) => a,
|
||||||
|
None => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
let conflict: Option<(i64,)> = sqlx::query_as(
|
||||||
|
"SELECT id FROM albums WHERE artist_id = $1 AND name = $2"
|
||||||
|
)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.bind(&album_name)
|
||||||
|
.fetch_optional(&mut **tx).await?;
|
||||||
|
|
||||||
|
if let Some((existing_id,)) = conflict {
|
||||||
|
// Merge loser album into winner album
|
||||||
|
album_merge_into(tx, album_id, existing_id).await?;
|
||||||
|
} else {
|
||||||
|
sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1")
|
||||||
|
.bind(album_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Move track_artists from losers to winner
|
||||||
|
for &loser_id in loser_ids {
|
||||||
|
// Remove winner's entries that would conflict after the update
|
||||||
|
sqlx::query(
|
||||||
|
r#"DELETE FROM track_artists
|
||||||
|
WHERE artist_id = $2
|
||||||
|
AND (track_id, role) IN (
|
||||||
|
SELECT track_id, role FROM track_artists WHERE artist_id = $1
|
||||||
|
)"#
|
||||||
|
)
|
||||||
|
.bind(loser_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1")
|
||||||
|
.bind(loser_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Delete loser artists (should be empty of albums/tracks by now)
|
||||||
|
for &loser_id in loser_ids {
|
||||||
|
sqlx::query("DELETE FROM artists WHERE id = $1")
|
||||||
|
.bind(loser_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Merge source album into target within an open transaction:
|
||||||
|
/// deduplicate by file_hash, move the rest, delete source.
|
||||||
|
async fn album_merge_into(
|
||||||
|
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||||
|
source_id: i64,
|
||||||
|
target_id: i64,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// Verify target exists
|
||||||
|
let target_ok: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)")
|
||||||
|
.bind(target_id)
|
||||||
|
.fetch_one(&mut **tx).await?;
|
||||||
|
if !target_ok.0 {
|
||||||
|
anyhow::bail!("Target album {} does not exist", target_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete duplicate tracks from source (same file_hash already in target)
|
||||||
|
let dups: Vec<(i64,)> = sqlx::query_as(
|
||||||
|
r#"SELECT t1.id FROM tracks t1
|
||||||
|
JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2
|
||||||
|
WHERE t1.album_id = $1"#
|
||||||
|
)
|
||||||
|
.bind(source_id)
|
||||||
|
.bind(target_id)
|
||||||
|
.fetch_all(&mut **tx).await?;
|
||||||
|
|
||||||
|
for (dup_id,) in dups {
|
||||||
|
// Retrieve path for later file deletion (non-fatal if missing)
|
||||||
|
let path: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1")
|
||||||
|
.bind(dup_id).fetch_optional(&mut **tx).await?;
|
||||||
|
if let Some((p,)) = path {
|
||||||
|
// Schedule physical deletion after commit — store in a side channel;
|
||||||
|
// here we do a best-effort remove outside the tx scope via tokio::spawn.
|
||||||
|
let p = p.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = tokio::fs::remove_file(&p).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(dup_id).execute(&mut **tx).await?;
|
||||||
|
sqlx::query("DELETE FROM tracks WHERE id = $1").bind(dup_id).execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move remaining tracks from source to target
|
||||||
|
sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1")
|
||||||
|
.bind(source_id)
|
||||||
|
.bind(target_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
// Delete the now-empty source album
|
||||||
|
sqlx::query("DELETE FROM albums WHERE id = $1")
|
||||||
|
.bind(source_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sanitize(name: &str) -> String {
|
||||||
|
name.chars()
|
||||||
|
.map(|c| match c {
|
||||||
|
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' | '\0' => '_',
|
||||||
|
_ => c,
|
||||||
|
})
|
||||||
|
.collect::<String>()
|
||||||
|
.trim()
|
||||||
|
.trim_matches('.')
|
||||||
|
.to_owned()
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -82,21 +82,30 @@ pub async fn approve_queue_item(State(state): State<S>, Path(id): Path<Uuid>) ->
|
|||||||
format!("{}.{}", sanitize_filename(title), ext)
|
format!("{}.{}", sanitize_filename(title), ext)
|
||||||
};
|
};
|
||||||
|
|
||||||
match crate::ingest::mover::move_to_storage(
|
let artist_dir = sanitize_filename(artist);
|
||||||
&state.config.storage_dir,
|
let album_dir = sanitize_filename(album);
|
||||||
artist,
|
let dest = state.config.storage_dir.join(&artist_dir).join(&album_dir).join(&filename);
|
||||||
album,
|
|
||||||
&filename,
|
use crate::ingest::mover::MoveOutcome;
|
||||||
source,
|
let (storage_path, was_merged) = if dest.exists() && !source.exists() {
|
||||||
)
|
// File already moved (e.g. auto-approved earlier but DB not finalized)
|
||||||
.await
|
(dest.to_string_lossy().to_string(), false)
|
||||||
{
|
} else {
|
||||||
Ok(storage_path) => {
|
match crate::ingest::mover::move_to_storage(
|
||||||
let rel_path = storage_path.to_string_lossy().to_string();
|
&state.config.storage_dir, artist, album, &filename, source,
|
||||||
match db::approve_and_finalize(&state.pool, id, &rel_path).await {
|
).await {
|
||||||
Ok(track_id) => (StatusCode::OK, Json(serde_json::json!({"track_id": track_id}))).into_response(),
|
Ok(MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false),
|
||||||
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
Ok(MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true),
|
||||||
|
Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match db::approve_and_finalize(&state.pool, id, &storage_path).await {
|
||||||
|
Ok(track_id) => {
|
||||||
|
if was_merged {
|
||||||
|
let _ = db::update_pending_status(&state.pool, id, "merged", None).await;
|
||||||
}
|
}
|
||||||
|
(StatusCode::OK, Json(serde_json::json!({"track_id": track_id}))).into_response()
|
||||||
}
|
}
|
||||||
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
}
|
}
|
||||||
@@ -144,6 +153,105 @@ pub async fn update_queue_item(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Retry ---
|
||||||
|
|
||||||
|
pub async fn retry_queue_item(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
|
||||||
|
match db::update_pending_status(&state.pool, id, "pending", None).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Batch operations ---
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct BatchIds {
|
||||||
|
pub ids: Vec<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn batch_approve(State(state): State<S>, Json(body): Json<BatchIds>) -> impl IntoResponse {
|
||||||
|
let mut ok = 0u32;
|
||||||
|
let mut errors = Vec::new();
|
||||||
|
for id in &body.ids {
|
||||||
|
let pt = match db::get_pending(&state.pool, *id).await {
|
||||||
|
Ok(Some(pt)) => pt,
|
||||||
|
Ok(None) => { errors.push(format!("{}: not found", id)); continue; }
|
||||||
|
Err(e) => { errors.push(format!("{}: {}", id, e)); continue; }
|
||||||
|
};
|
||||||
|
|
||||||
|
let artist = pt.norm_artist.as_deref().unwrap_or("Unknown Artist");
|
||||||
|
let album = pt.norm_album.as_deref().unwrap_or("Unknown Album");
|
||||||
|
let title = pt.norm_title.as_deref().unwrap_or("Unknown Title");
|
||||||
|
let source = std::path::Path::new(&pt.inbox_path);
|
||||||
|
let ext = source.extension().and_then(|e| e.to_str()).unwrap_or("flac");
|
||||||
|
let track_num = pt.norm_track_number.unwrap_or(0);
|
||||||
|
|
||||||
|
let filename = if track_num > 0 {
|
||||||
|
format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext)
|
||||||
|
} else {
|
||||||
|
format!("{}.{}", sanitize_filename(title), ext)
|
||||||
|
};
|
||||||
|
|
||||||
|
let artist_dir = sanitize_filename(artist);
|
||||||
|
let album_dir = sanitize_filename(album);
|
||||||
|
let dest = state.config.storage_dir.join(&artist_dir).join(&album_dir).join(&filename);
|
||||||
|
|
||||||
|
use crate::ingest::mover::MoveOutcome;
|
||||||
|
let (rel_path, was_merged) = if dest.exists() && !source.exists() {
|
||||||
|
(dest.to_string_lossy().to_string(), false)
|
||||||
|
} else {
|
||||||
|
match crate::ingest::mover::move_to_storage(
|
||||||
|
&state.config.storage_dir, artist, album, &filename, source,
|
||||||
|
).await {
|
||||||
|
Ok(MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false),
|
||||||
|
Ok(MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true),
|
||||||
|
Err(e) => { errors.push(format!("{}: {}", id, e)); continue; }
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match db::approve_and_finalize(&state.pool, *id, &rel_path).await {
|
||||||
|
Ok(_) => {
|
||||||
|
if was_merged {
|
||||||
|
let _ = db::update_pending_status(&state.pool, *id, "merged", None).await;
|
||||||
|
}
|
||||||
|
ok += 1;
|
||||||
|
}
|
||||||
|
Err(e) => errors.push(format!("{}: {}", id, e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(StatusCode::OK, Json(serde_json::json!({"approved": ok, "errors": errors}))).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn batch_reject(State(state): State<S>, Json(body): Json<BatchIds>) -> impl IntoResponse {
|
||||||
|
let mut ok = 0u32;
|
||||||
|
for id in &body.ids {
|
||||||
|
if db::update_pending_status(&state.pool, *id, "rejected", None).await.is_ok() {
|
||||||
|
ok += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(StatusCode::OK, Json(serde_json::json!({"rejected": ok}))).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn batch_retry(State(state): State<S>, Json(body): Json<BatchIds>) -> impl IntoResponse {
|
||||||
|
let mut ok = 0u32;
|
||||||
|
for id in &body.ids {
|
||||||
|
if db::update_pending_status(&state.pool, *id, "pending", None).await.is_ok() {
|
||||||
|
ok += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(StatusCode::OK, Json(serde_json::json!({"retried": ok}))).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn batch_delete(State(state): State<S>, Json(body): Json<BatchIds>) -> impl IntoResponse {
|
||||||
|
let mut ok = 0u32;
|
||||||
|
for id in &body.ids {
|
||||||
|
if db::delete_pending(&state.pool, *id).await.unwrap_or(false) {
|
||||||
|
ok += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(StatusCode::OK, Json(serde_json::json!({"deleted": ok}))).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
// --- Artists ---
|
// --- Artists ---
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@@ -218,6 +326,254 @@ pub async fn update_album(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Merges ---
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct CreateMergeBody {
|
||||||
|
pub artist_ids: Vec<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_merge(State(state): State<S>, Json(body): Json<CreateMergeBody>) -> impl IntoResponse {
|
||||||
|
if body.artist_ids.len() < 2 {
|
||||||
|
return error_response(StatusCode::BAD_REQUEST, "need at least 2 artists to merge");
|
||||||
|
}
|
||||||
|
match db::insert_artist_merge(&state.pool, &body.artist_ids).await {
|
||||||
|
Ok(id) => (StatusCode::OK, Json(serde_json::json!({"id": id}))).into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_merges(State(state): State<S>) -> impl IntoResponse {
|
||||||
|
match db::list_artist_merges(&state.pool).await {
|
||||||
|
Ok(items) => (StatusCode::OK, Json(serde_json::to_value(items).unwrap())).into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
|
||||||
|
let merge = match db::get_artist_merge(&state.pool, id).await {
|
||||||
|
Ok(Some(m)) => m,
|
||||||
|
Ok(None) => return error_response(StatusCode::NOT_FOUND, "not found"),
|
||||||
|
Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let source_ids: Vec<i64> = serde_json::from_str(&merge.source_artist_ids).unwrap_or_default();
|
||||||
|
let artists = match db::get_artists_full_data(&state.pool, &source_ids).await {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let proposal: Option<serde_json::Value> = merge.proposal.as_deref()
|
||||||
|
.and_then(|p| serde_json::from_str(p).ok());
|
||||||
|
|
||||||
|
(StatusCode::OK, Json(serde_json::json!({
|
||||||
|
"merge": {
|
||||||
|
"id": merge.id,
|
||||||
|
"status": merge.status,
|
||||||
|
"source_artist_ids": source_ids,
|
||||||
|
"llm_notes": merge.llm_notes,
|
||||||
|
"error_message": merge.error_message,
|
||||||
|
"created_at": merge.created_at,
|
||||||
|
"updated_at": merge.updated_at,
|
||||||
|
},
|
||||||
|
"artists": artists,
|
||||||
|
"proposal": proposal,
|
||||||
|
}))).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct UpdateMergeBody {
|
||||||
|
pub proposal: serde_json::Value,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_merge(
|
||||||
|
State(state): State<S>,
|
||||||
|
Path(id): Path<Uuid>,
|
||||||
|
Json(body): Json<UpdateMergeBody>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let notes = body.proposal.get("notes")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_owned();
|
||||||
|
let proposal_json = match serde_json::to_string(&body.proposal) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => return error_response(StatusCode::BAD_REQUEST, &e.to_string()),
|
||||||
|
};
|
||||||
|
match db::update_merge_proposal(&state.pool, id, &proposal_json, ¬es).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn approve_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
|
||||||
|
match crate::merge::execute_merge(&state, id).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => {
|
||||||
|
let msg = e.to_string();
|
||||||
|
let _ = db::update_merge_status(&state.pool, id, "error", Some(&msg)).await;
|
||||||
|
error_response(StatusCode::INTERNAL_SERVER_ERROR, &msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reject_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
|
||||||
|
match db::update_merge_status(&state.pool, id, "rejected", None).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn retry_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
|
||||||
|
match db::update_merge_status(&state.pool, id, "pending", None).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Library search ---
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct LibraryQuery {
|
||||||
|
#[serde(default)]
|
||||||
|
pub q: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub artist: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub album: String,
|
||||||
|
#[serde(default = "default_lib_limit")]
|
||||||
|
pub limit: i64,
|
||||||
|
#[serde(default)]
|
||||||
|
pub offset: i64,
|
||||||
|
}
|
||||||
|
fn default_lib_limit() -> i64 { 50 }
|
||||||
|
|
||||||
|
pub async fn library_tracks(State(state): State<S>, Query(q): Query<LibraryQuery>) -> impl IntoResponse {
|
||||||
|
let (tracks, total) = tokio::join!(
|
||||||
|
db::search_tracks(&state.pool, &q.q, &q.artist, &q.album, q.limit, q.offset),
|
||||||
|
db::count_tracks(&state.pool, &q.q, &q.artist, &q.album),
|
||||||
|
);
|
||||||
|
match (tracks, total) {
|
||||||
|
(Ok(rows), Ok(n)) => (StatusCode::OK, Json(serde_json::json!({"total": n, "items": rows}))).into_response(),
|
||||||
|
(Err(e), _) | (_, Err(e)) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn library_albums(State(state): State<S>, Query(q): Query<LibraryQuery>) -> impl IntoResponse {
|
||||||
|
let (albums, total) = tokio::join!(
|
||||||
|
db::search_albums(&state.pool, &q.q, &q.artist, q.limit, q.offset),
|
||||||
|
db::count_albums(&state.pool, &q.q, &q.artist),
|
||||||
|
);
|
||||||
|
match (albums, total) {
|
||||||
|
(Ok(rows), Ok(n)) => (StatusCode::OK, Json(serde_json::json!({"total": n, "items": rows}))).into_response(),
|
||||||
|
(Err(e), _) | (_, Err(e)) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn library_artists(State(state): State<S>, Query(q): Query<LibraryQuery>) -> impl IntoResponse {
|
||||||
|
let (artists, total) = tokio::join!(
|
||||||
|
db::search_artists_lib(&state.pool, &q.q, q.limit, q.offset),
|
||||||
|
db::count_artists_lib(&state.pool, &q.q),
|
||||||
|
);
|
||||||
|
match (artists, total) {
|
||||||
|
(Ok(rows), Ok(n)) => (StatusCode::OK, Json(serde_json::json!({"total": n, "items": rows}))).into_response(),
|
||||||
|
(Err(e), _) | (_, Err(e)) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Track / Album detail & edit ---
|
||||||
|
|
||||||
|
pub async fn get_track(State(state): State<S>, Path(id): Path<i64>) -> impl IntoResponse {
|
||||||
|
match db::get_track_full(&state.pool, id).await {
|
||||||
|
Ok(Some(t)) => (StatusCode::OK, Json(serde_json::to_value(t).unwrap())).into_response(),
|
||||||
|
Ok(None) => error_response(StatusCode::NOT_FOUND, "not found"),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_track(
|
||||||
|
State(state): State<S>,
|
||||||
|
Path(id): Path<i64>,
|
||||||
|
Json(body): Json<db::TrackUpdateFields>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match db::update_track_metadata(&state.pool, id, &body).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_album_full(State(state): State<S>, Path(id): Path<i64>) -> impl IntoResponse {
|
||||||
|
match db::get_album_details(&state.pool, id).await {
|
||||||
|
Ok(Some(a)) => (StatusCode::OK, Json(serde_json::to_value(a).unwrap())).into_response(),
|
||||||
|
Ok(None) => error_response(StatusCode::NOT_FOUND, "not found"),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct AlbumUpdateBody {
|
||||||
|
pub name: String,
|
||||||
|
pub year: Option<i32>,
|
||||||
|
pub artist_id: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_album_full(
|
||||||
|
State(state): State<S>,
|
||||||
|
Path(id): Path<i64>,
|
||||||
|
Json(body): Json<AlbumUpdateBody>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match db::update_album_full(&state.pool, id, &body.name, body.year, body.artist_id).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct ReorderBody {
|
||||||
|
pub orders: Vec<(i64, i32)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reorder_album_tracks(
|
||||||
|
State(state): State<S>,
|
||||||
|
Path(_id): Path<i64>,
|
||||||
|
Json(body): Json<ReorderBody>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
match db::reorder_tracks(&state.pool, &body.orders).await {
|
||||||
|
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn album_cover(State(state): State<S>, Path(id): Path<i64>) -> impl IntoResponse {
|
||||||
|
let cover = match db::get_album_cover(&state.pool, id).await {
|
||||||
|
Ok(Some(c)) => c,
|
||||||
|
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
|
||||||
|
Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
};
|
||||||
|
match tokio::fs::read(&cover.0).await {
|
||||||
|
Ok(bytes) => (
|
||||||
|
[(axum::http::header::CONTENT_TYPE, cover.1)],
|
||||||
|
bytes,
|
||||||
|
).into_response(),
|
||||||
|
Err(_) => StatusCode::NOT_FOUND.into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct AlbumSearchQuery {
|
||||||
|
#[serde(default)]
|
||||||
|
pub q: String,
|
||||||
|
pub artist_id: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn search_albums_for_artist(State(state): State<S>, Query(q): Query<AlbumSearchQuery>) -> impl IntoResponse {
|
||||||
|
match db::search_albums_for_artist(&state.pool, &q.q, q.artist_id).await {
|
||||||
|
Ok(items) => (StatusCode::OK, Json(serde_json::to_value(
|
||||||
|
items.iter().map(|(id, name)| serde_json::json!({"id": id, "name": name})).collect::<Vec<_>>()
|
||||||
|
).unwrap())).into_response(),
|
||||||
|
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// --- Helpers ---
|
// --- Helpers ---
|
||||||
|
|
||||||
fn error_response(status: StatusCode, message: &str) -> axum::response::Response {
|
fn error_response(status: StatusCode, message: &str) -> axum::response::Response {
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ pub struct AppState {
|
|||||||
pub pool: PgPool,
|
pub pool: PgPool,
|
||||||
pub config: Arc<Args>,
|
pub config: Arc<Args>,
|
||||||
pub system_prompt: Arc<String>,
|
pub system_prompt: Arc<String>,
|
||||||
|
pub merge_prompt: Arc<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build_router(state: Arc<AppState>) -> Router {
|
pub fn build_router(state: Arc<AppState>) -> Router {
|
||||||
@@ -21,12 +22,31 @@ pub fn build_router(state: Arc<AppState>) -> Router {
|
|||||||
.route("/queue/:id", get(api::get_queue_item).delete(api::delete_queue_item))
|
.route("/queue/:id", get(api::get_queue_item).delete(api::delete_queue_item))
|
||||||
.route("/queue/:id/approve", post(api::approve_queue_item))
|
.route("/queue/:id/approve", post(api::approve_queue_item))
|
||||||
.route("/queue/:id/reject", post(api::reject_queue_item))
|
.route("/queue/:id/reject", post(api::reject_queue_item))
|
||||||
|
.route("/queue/:id/retry", post(api::retry_queue_item))
|
||||||
.route("/queue/:id/update", put(api::update_queue_item))
|
.route("/queue/:id/update", put(api::update_queue_item))
|
||||||
|
.route("/queue/batch/approve", post(api::batch_approve))
|
||||||
|
.route("/queue/batch/reject", post(api::batch_reject))
|
||||||
|
.route("/queue/batch/retry", post(api::batch_retry))
|
||||||
|
.route("/queue/batch/delete", post(api::batch_delete))
|
||||||
.route("/artists/search", get(api::search_artists))
|
.route("/artists/search", get(api::search_artists))
|
||||||
.route("/artists", get(api::list_artists))
|
.route("/artists", get(api::list_artists))
|
||||||
.route("/artists/:id", put(api::update_artist))
|
.route("/artists/:id", put(api::update_artist))
|
||||||
.route("/artists/:id/albums", get(api::list_albums))
|
.route("/artists/:id/albums", get(api::list_albums))
|
||||||
.route("/albums/:id", put(api::update_album));
|
.route("/tracks/:id", get(api::get_track).put(api::update_track))
|
||||||
|
.route("/albums/search", get(api::search_albums_for_artist))
|
||||||
|
.route("/albums/:id/cover", get(api::album_cover))
|
||||||
|
.route("/albums/:id/full", get(api::get_album_full))
|
||||||
|
.route("/albums/:id/reorder", put(api::reorder_album_tracks))
|
||||||
|
.route("/albums/:id/edit", put(api::update_album_full))
|
||||||
|
.route("/albums/:id", put(api::update_album))
|
||||||
|
.route("/merges", get(api::list_merges).post(api::create_merge))
|
||||||
|
.route("/merges/:id", get(api::get_merge).put(api::update_merge))
|
||||||
|
.route("/merges/:id/approve", post(api::approve_merge))
|
||||||
|
.route("/merges/:id/reject", post(api::reject_merge))
|
||||||
|
.route("/merges/:id/retry", post(api::retry_merge))
|
||||||
|
.route("/library/tracks", get(api::library_tracks))
|
||||||
|
.route("/library/albums", get(api::library_albums))
|
||||||
|
.route("/library/artists", get(api::library_artists));
|
||||||
|
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/", get(admin_html))
|
.route("/", get(admin_html))
|
||||||
|
|||||||
@@ -250,8 +250,10 @@ audio.addEventListener('pause', () => document.getElementById('btnPlayPause').in
|
|||||||
audio.addEventListener('error', () => { showToast('Playback error'); nextTrack(); });
|
audio.addEventListener('error', () => { showToast('Playback error'); nextTrack(); });
|
||||||
|
|
||||||
// --- API helper ---
|
// --- API helper ---
|
||||||
|
const _base = location.pathname.replace(/\/+$/, '');
|
||||||
|
const API = _base + '/api';
|
||||||
async function api(path) {
|
async function api(path) {
|
||||||
const r = await fetch('/api' + path);
|
const r = await fetch(API + path);
|
||||||
if (!r.ok) return null;
|
if (!r.ok) return null;
|
||||||
return r.json();
|
return r.json();
|
||||||
}
|
}
|
||||||
@@ -323,7 +325,7 @@ async function showAlbumTracks(albumSlug, albumName, artistSlug, artistName) {
|
|||||||
allBtn.onclick = () => addAlbumToQueue(albumSlug, true);
|
allBtn.onclick = () => addAlbumToQueue(albumSlug, true);
|
||||||
el.appendChild(allBtn);
|
el.appendChild(allBtn);
|
||||||
|
|
||||||
const coverUrl = '/api/albums/' + albumSlug + '/cover';
|
const coverUrl = API + '/albums/' + albumSlug + '/cover';
|
||||||
|
|
||||||
tracks.forEach(t => {
|
tracks.forEach(t => {
|
||||||
const div = document.createElement('div');
|
const div = document.createElement('div');
|
||||||
@@ -392,7 +394,7 @@ function playIndex(i) {
|
|||||||
if (i < 0 || i >= queue.length) return;
|
if (i < 0 || i >= queue.length) return;
|
||||||
queueIndex = i;
|
queueIndex = i;
|
||||||
const track = queue[i];
|
const track = queue[i];
|
||||||
audio.src = '/api/stream/' + track.slug;
|
audio.src = API + '/stream/' + track.slug;
|
||||||
audio.play().catch(() => {});
|
audio.play().catch(() => {});
|
||||||
updateNowPlaying(track);
|
updateNowPlaying(track);
|
||||||
renderQueue();
|
renderQueue();
|
||||||
@@ -407,7 +409,7 @@ function updateNowPlaying(track) {
|
|||||||
document.title = track.title + ' \u2014 Furumi';
|
document.title = track.title + ' \u2014 Furumi';
|
||||||
|
|
||||||
const cover = document.getElementById('npCover');
|
const cover = document.getElementById('npCover');
|
||||||
const coverUrl = '/api/tracks/' + track.slug + '/cover';
|
const coverUrl = API + '/tracks/' + track.slug + '/cover';
|
||||||
cover.innerHTML = `<img src="${coverUrl}" alt="" onerror="this.parentElement.innerHTML='🎵'">`;
|
cover.innerHTML = `<img src="${coverUrl}" alt="" onerror="this.parentElement.innerHTML='🎵'">`;
|
||||||
|
|
||||||
if ('mediaSession' in navigator) {
|
if ('mediaSession' in navigator) {
|
||||||
@@ -434,7 +436,7 @@ function renderQueue() {
|
|||||||
const div = document.createElement('div');
|
const div = document.createElement('div');
|
||||||
div.className = 'queue-item' + (isPlaying ? ' playing' : '');
|
div.className = 'queue-item' + (isPlaying ? ' playing' : '');
|
||||||
|
|
||||||
const coverSrc = t.album_slug ? `/api/tracks/${t.slug}/cover` : '';
|
const coverSrc = t.album_slug ? `${API}/tracks/${t.slug}/cover` : '';
|
||||||
const coverHtml = coverSrc
|
const coverHtml = coverSrc
|
||||||
? `<img src="${coverSrc}" alt="" onerror="this.parentElement.innerHTML='🎵'">`
|
? `<img src="${coverSrc}" alt="" onerror="this.parentElement.innerHTML='🎵'">`
|
||||||
: '🎵';
|
: '🎵';
|
||||||
|
|||||||
Reference in New Issue
Block a user