PLAYER: improved jam feature
Build and Publish / Build and Publish Docker Image (push) Successful in 3m3s

This commit is contained in:
2026-05-29 13:08:33 +03:00
parent 97c82b4ba2
commit dedddc7cd8
6 changed files with 495 additions and 28 deletions
+19
View File
@@ -183,6 +183,16 @@ pub(super) struct PlayerJamDto {
pub(super) member_count: i64,
pub(super) host_last_seen_ms: i64,
pub(super) host_device_online: bool,
pub(super) members: Vec<PlayerJamMemberDto>,
}
#[derive(Debug, Serialize, JsonSchema)]
pub(super) struct PlayerJamMemberDto {
pub(super) user_id: i64,
pub(super) name: String,
pub(super) is_joined: bool,
pub(super) is_current_user: bool,
pub(super) last_seen_ms: i64,
}
#[derive(Debug, Deserialize, JsonSchema)]
@@ -192,6 +202,14 @@ pub(super) struct PlayerJamCreateRequest {
pub(super) invitee_user_ids: Vec<i64>,
}
#[derive(Debug, Deserialize, JsonSchema)]
pub(super) struct PlayerJamInviteRequest {
pub(super) jam_id: String,
pub(super) device_id: String,
#[serde(default)]
pub(super) invitee_user_ids: Vec<i64>,
}
#[derive(Debug, Deserialize, JsonSchema)]
pub(super) struct PlayerJamJoinRequest {
pub(super) jam_id: String,
@@ -286,6 +304,7 @@ pub(super) struct UserStats {
#[derive(Debug, Serialize, JsonSchema)]
pub(super) struct UserProfile {
pub(super) id: i64,
pub(super) name: String,
pub(super) role: String,
pub(super) stats: UserStats,
+178 -15
View File
@@ -81,6 +81,7 @@ enum PlayerJamMemberStatus {
#[derive(Debug, Clone)]
struct PlayerJamMember {
name: String,
status: PlayerJamMemberStatus,
last_seen_ms: i64,
}
@@ -393,6 +394,10 @@ impl PlayerDeviceHub {
let mut state = self.state.lock().expect("player device hub lock");
self.prune_locked(&mut state, now);
if self.user_has_joined_jam_locked(&state, host_user_id) {
return Err("leave the current jam before creating a new one");
}
let devices = state
.devices_by_user
.get(&host_user_id)
@@ -410,19 +415,21 @@ impl PlayerDeviceHub {
members.insert(
host_user_id,
PlayerJamMember {
name: host_name.to_string(),
status: PlayerJamMemberStatus::Joined,
last_seen_ms: now,
},
);
seen.insert(host_user_id);
for (user_id, _name) in invitees.into_iter().take(PLAYER_JAM_MAX_INVITEES) {
for (user_id, name) in invitees.into_iter().take(PLAYER_JAM_MAX_INVITEES) {
if !seen.insert(user_id) {
continue;
}
members.insert(
user_id,
PlayerJamMember {
name,
status: PlayerJamMemberStatus::Invited,
last_seen_ms: 0,
},
@@ -444,6 +451,7 @@ impl PlayerDeviceHub {
fn join_jam(
&self,
user_id: i64,
user_name: &str,
device_id: &str,
jam_id: &str,
) -> Result<PlayerDevicesResponse, &'static str> {
@@ -457,6 +465,7 @@ impl PlayerDeviceHub {
let Some(member) = jam.members.get_mut(&user_id) else {
return Err("jam is not available");
};
member.name = user_name.to_string();
member.status = PlayerJamMemberStatus::Joined;
member.last_seen_ms = now;
if user_id == jam.host_user_id {
@@ -466,6 +475,51 @@ impl PlayerDeviceHub {
Ok(self.snapshot_locked(&state, user_id, device_id, Some(jam_id), now))
}
fn invite_to_jam(
&self,
inviter_user_id: i64,
device_id: &str,
jam_id: &str,
invitees: Vec<(i64, String)>,
) -> Result<PlayerDevicesResponse, &'static str> {
let now = current_millis();
let mut state = self.state.lock().expect("player device hub lock");
self.prune_locked(&mut state, now);
let Some(jam) = state.jams_by_id.get_mut(jam_id) else {
return Err("jam is not available");
};
let Some(inviter) = jam.members.get(&inviter_user_id) else {
return Err("jam is not available");
};
if inviter.status != PlayerJamMemberStatus::Joined {
return Err("join the jam first");
}
if let Some(inviter) = jam.members.get_mut(&inviter_user_id) {
inviter.last_seen_ms = now;
}
if inviter_user_id == jam.host_user_id {
jam.host_last_seen_ms = now;
}
let available_slots = PLAYER_JAM_MAX_INVITEES.saturating_sub(jam.members.len());
for (user_id, name) in invitees.into_iter().take(available_slots) {
if user_id == inviter_user_id || jam.members.contains_key(&user_id) {
continue;
}
jam.members.insert(
user_id,
PlayerJamMember {
name,
status: PlayerJamMemberStatus::Invited,
last_seen_ms: 0,
},
);
}
Ok(self.snapshot_locked(&state, inviter_user_id, device_id, Some(jam_id), now))
}
fn leave_jam(
&self,
user_id: i64,
@@ -561,6 +615,14 @@ impl PlayerDeviceHub {
!require_joined || member.status == PlayerJamMemberStatus::Joined
}
fn user_has_joined_jam_locked(&self, state: &PlayerDeviceHubState, user_id: i64) -> bool {
state.jams_by_id.values().any(|jam| {
jam.members
.get(&user_id)
.is_some_and(|member| member.status == PlayerJamMemberStatus::Joined)
})
}
fn jam_target_device_id_locked(
&self,
state: &PlayerDeviceHubState,
@@ -612,6 +674,23 @@ impl PlayerDeviceHub {
.values()
.filter(|member| member.status == PlayerJamMemberStatus::Joined)
.count() as i64;
let mut members = jam
.members
.iter()
.map(|(member_user_id, member)| PlayerJamMemberDto {
user_id: *member_user_id,
name: member.name.clone(),
is_joined: member.status == PlayerJamMemberStatus::Joined,
is_current_user: *member_user_id == user_id,
last_seen_ms: now.saturating_sub(member.last_seen_ms),
})
.collect::<Vec<_>>();
members.sort_by(|a, b| {
b.is_joined
.cmp(&a.is_joined)
.then_with(|| b.is_current_user.cmp(&a.is_current_user))
.then_with(|| a.name.cmp(&b.name))
});
let host_device_online = self.jam_target_device_id_locked(state, jam).is_some();
Some(PlayerJamDto {
id: jam.id.clone(),
@@ -625,6 +704,7 @@ impl PlayerDeviceHub {
member_count,
host_last_seen_ms: now.saturating_sub(jam.host_last_seen_ms),
host_device_online,
members,
})
})
.collect();
@@ -849,6 +929,7 @@ async fn me_handler(
.map_err(|e| cot::Error::internal(e.to_string()))?;
Json(UserProfile {
id: user.id,
name: user.name,
role: user.role.code().to_string(),
stats: UserStats {
@@ -3899,18 +3980,42 @@ async fn devices_command_handler(
None => None,
};
let mut payload = dto.payload;
if jam_id.is_some() && matches!(command, "queue_add_end" | "queue_add_next") {
stamp_jam_queue_tracks(&mut payload, user.id, &user.name);
}
match hub.enqueue_command(
user.id,
target_device_id.as_deref(),
jam_id.as_deref(),
command,
dto.payload,
payload,
) {
Ok(()) => Json(serde_json::json!({"ok": true})).into_response(),
Err(message) => Ok(json_error(StatusCode::BAD_REQUEST, message)),
}
}
fn stamp_jam_queue_tracks(payload: &mut serde_json::Value, user_id: i64, user_name: &str) {
let Some(tracks) = payload.get_mut("tracks").and_then(serde_json::Value::as_array_mut) else {
return;
};
for track in tracks {
let Some(track_object) = track.as_object_mut() else {
continue;
};
track_object.insert(
"added_by_user_id".to_string(),
serde_json::Value::Number(user_id.into()),
);
track_object.insert(
"added_by_user_name".to_string(),
serde_json::Value::String(user_name.to_string()),
);
}
}
async fn jam_users_search_handler(
session: Session,
db: Database,
@@ -3984,19 +4089,31 @@ async fn jam_create_handler(
return Ok(json_error(StatusCode::BAD_REQUEST, "invalid device id"));
};
let mut invitee_ids = dto
.invitee_user_ids
let invitees = load_jam_invitees(pool, user.id, dto.invitee_user_ids).await?;
match hub.create_jam(user.id, &user.name, &device_id, invitees) {
Ok(response) => Json(response).into_response(),
Err(message) => Ok(json_error(StatusCode::BAD_REQUEST, message)),
}
}
async fn load_jam_invitees(
pool: &sqlx::PgPool,
current_user_id: i64,
invitee_user_ids: Vec<i64>,
) -> cot::Result<Vec<(i64, String)>> {
let mut invitee_ids = invitee_user_ids
.into_iter()
.filter(|id| *id > 0 && *id != user.id)
.filter(|id| *id > 0 && *id != current_user_id)
.collect::<Vec<_>>();
invitee_ids.sort_unstable();
invitee_ids.dedup();
invitee_ids.truncate(PLAYER_JAM_MAX_INVITEES);
let invitees = if invitee_ids.is_empty() {
Vec::new()
if invitee_ids.is_empty() {
Ok(Vec::new())
} else {
sqlx::query_as::<_, PlayerJamUserRow>(
let invitees = sqlx::query_as::<_, PlayerJamUserRow>(
r#"SELECT id, username::text AS username, display_name, email
FROM furumusic__user
WHERE is_active = true AND id = ANY($1)"#,
@@ -4015,12 +4132,8 @@ async fn jam_create_handler(
.to_string();
(row.id, name)
})
.collect::<Vec<_>>()
};
match hub.create_jam(user.id, &user.name, &device_id, invitees) {
Ok(response) => Json(response).into_response(),
Err(message) => Ok(json_error(StatusCode::BAD_REQUEST, message)),
.collect::<Vec<_>>();
Ok(invitees)
}
}
@@ -4040,7 +4153,31 @@ async fn jam_join_handler(
return Ok(json_error(StatusCode::BAD_REQUEST, "invalid device id"));
};
match hub.join_jam(user.id, &device_id, &jam_id) {
match hub.join_jam(user.id, &user.name, &device_id, &jam_id) {
Ok(response) => Json(response).into_response(),
Err(message) => Ok(json_error(StatusCode::BAD_REQUEST, message)),
}
}
async fn jam_invite_handler(
session: Session,
db: Database,
pool: &sqlx::PgPool,
hub: Arc<PlayerDeviceHub>,
Json(dto): Json<PlayerJamInviteRequest>,
) -> cot::Result<cot::response::Response> {
let Some(user) = auth::get_session_user(&session, &db).await else {
return Ok(json_error(StatusCode::UNAUTHORIZED, "not authenticated"));
};
let Some(jam_id) = normalize_device_id(&dto.jam_id) else {
return Ok(json_error(StatusCode::BAD_REQUEST, "invalid jam id"));
};
let Some(device_id) = normalize_device_id(&dto.device_id) else {
return Ok(json_error(StatusCode::BAD_REQUEST, "invalid device id"));
};
let invitees = load_jam_invitees(pool, user.id, dto.invitee_user_ids).await?;
match hub.invite_to_jam(user.id, &device_id, &jam_id, invitees) {
Ok(response) => Json(response).into_response(),
Err(message) => Ok(json_error(StatusCode::BAD_REQUEST, message)),
}
@@ -6571,6 +6708,32 @@ impl App for PlayerApp {
}),
"player_jams_join",
),
Route::with_handler_and_name(
"/jams/invite",
post({
let pool = Arc::clone(&pool);
let pool_config = Arc::clone(&pool_config);
let device_hub = Arc::clone(&device_hub);
move |session: Session, db: Database, json: Json<PlayerJamInviteRequest>| {
let pool = Arc::clone(&pool);
let pool_config = Arc::clone(&pool_config);
let device_hub = Arc::clone(&device_hub);
async move {
let pg_pool = pool
.get_or_init(|| async {
sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&pool_config.database_url)
.await
.expect("player pool")
})
.await;
jam_invite_handler(session, db, pg_pool, device_hub, json).await
}
}
}),
"player_jams_invite",
),
Route::with_handler_and_name(
"/jams/leave",
post({