Compare commits

...

39 commits
mopidy ... main

Author SHA1 Message Date
Geoffrey Frogeye 441177a263
zytempo: Report availability 2024-10-19 13:34:56 +02:00
Geoffrey Frogeye cde7df57eb
Also doing C dev on cranberry 2024-10-09 19:00:28 +02:00
Geoffrey Frogeye 1615af8730
Remove fetchGit for performance 2024-10-09 19:00:17 +02:00
Geoffrey Frogeye 9389d1d284
git-sync: Use proper thing 2024-10-06 22:07:49 +02:00
Geoffrey Frogeye a7e2b49bea
git-sync supports jj
So much nicer!
2024-10-06 22:02:00 +02:00
Geoffrey Frogeye d2dbb5bbde
More dev things 2024-10-06 21:51:55 +02:00
Geoffrey Frogeye 64ab21f7dd
I'm working on micro-controllers again, can you tell? 2024-10-04 14:41:43 +02:00
Geoffrey Frogeye c2068a30ff
vm: Fix virtio 2024-10-03 14:20:46 +02:00
Geoffrey Frogeye e305c6b1de
Add rustdesk 2024-10-03 13:55:28 +02:00
Geoffrey Frogeye 5804ff31ff
Convert Virtualbox to KVM 2024-10-03 12:49:36 +02:00
Geoffrey Frogeye 6618fbee9d
rssVideos: Handle corrupt metadata better 2024-10-01 15:17:06 +02:00
Geoffrey Frogeye dee90d9a96
vim: Another nix lsp
Might be a tad too powerful for its own good, let's see
2024-10-01 14:35:28 +02:00
Geoffrey Frogeye 812e5acf6c
Update
Cups or something (I do print from time to time ><")
2024-10-01 14:24:16 +02:00
Geoffrey Frogeye 768b38b87f
Merge curacao and cranberry work 2024-10-01 14:22:16 +02:00
Geoffrey Frogeye 6644e85c30
frobarng: Some refactor 2024-10-01 14:12:24 +02:00
Geoffrey Frogeye 1ae7d6b447
frobarng: NetworkProvider done 2024-09-27 23:06:06 +02:00
Geoffrey Frogeye 01934563a5
Label printer support
Such a good name, Labelle. Glad they didn't go with the AI-generated one.
2024-09-15 22:31:01 +02:00
Geoffrey Frogeye 171232cb2e
jj config from curacao 2024-09-15 22:19:12 +02:00
Geoffrey Frogeye 36df032ecd
Hmm, jujutsu merge commit I guess? 2024-09-15 15:42:14 +02:00
Geoffrey Frogeye a43209a902
Add JJ 2024-09-15 00:41:42 +02:00
Geoffrey Frogeye 38ff39bc78
Remove pager by default
Hopefully this should remove the pesky auto pager for stuff I don't need
it for.
2024-09-11 00:38:02 +02:00
Geoffrey Frogeye fc744fd73b Merge remote-tracking branch 'origin/main' 2024-09-10 23:16:46 +02:00
Geoffrey Frogeye f633761d54
Add pdfgrep 2024-09-10 23:16:18 +02:00
Geoffrey Frogeye ae61f0c6d4 Merge remote-tracking branch 'origin/main' 2024-09-02 16:28:40 +02:00
Geoffrey Frogeye 532b3628d3
frobar: More pulseaudio device descriptions
Is there not a standard thing I can check?
2024-09-02 16:26:40 +02:00
Geoffrey Frogeye 83c24f2fb2
scripts: Fix overpdf in Nix 2024-09-02 16:26:21 +02:00
Geoffrey Frogeye 830552f8c3
Upgrade 2024-09-01 14:17:35 +02:00
Geoffrey Frogeye 4d39ac0769
displaylink: Use upstream 2024-08-25 10:43:05 +02:00
Geoffrey Frogeye c375cb2e11
frobarng: Forgotten dev 2024-08-25 09:48:36 +02:00
Geoffrey Frogeye f81fd6bfd2
frobarng: Even more dev 2024-08-17 00:57:06 +02:00
Geoffrey Frogeye c7535d8ed8
frobarng: Graceful exit 2024-08-14 03:29:34 +02:00
Geoffrey Frogeye 6690f3aa0d
frobarng: Mirroring and more 2024-08-14 02:45:25 +02:00
Geoffrey Frogeye 7d60269b49
frobarng: animations 2024-08-14 00:28:17 +02:00
Geoffrey Frogeye 139d3a3ea8
frobar: Some dev 2024-08-13 01:58:51 +02:00
Geoffrey Frogeye 86f9f75bd7
Add mpris support 2024-08-11 14:54:51 +02:00
Geoffrey Frogeye f011b376bb
Fix wol 2024-08-11 10:01:36 +02:00
Geoffrey Frogeye ce636f08ff
mpd: Fix remote volume control 2024-08-11 10:01:20 +02:00
Geoffrey Frogeye 0fc6a51cb0
Upgrade 2024-07-30 22:36:55 +02:00
Geoffrey Frogeye c90590640f
Attempt at stuff 2024-07-30 20:24:44 +02:00
32 changed files with 1146 additions and 846 deletions

View file

@ -1,8 +1,12 @@
{ pkgs, lib, config, ... }:
{ ... }:
{
config = {
frogeye = {
desktop.xorg = true;
dev = {
c = true;
vm = true;
};
extra = true;
};
};

View file

@ -1,10 +1,11 @@
{ pkgs, lib, config, ... }:
let
zytemp_mqtt_src = pkgs.fetchFromGitHub {
owner = "patrislav1";
# owner = "patrislav1";
owner = "GeoffreyFrogeye";
repo = "zytemp_mqtt";
rev = "a6be5e3082e1e10dee435cfb9643fb13e9a71c34"; # PR that adds humidity
sha256 = "sha256-cMWDi20isnbB6jlMzut7YyYB4te4bVFYXSgCEQWQnts=";
rev = "push-nurpouorqoyr"; # Humidity + availability support
sha256 = "sha256-nOhyBAgvjeQh9ys3cBJOVR67SDs96zBzxIRGpaq4yoA=";
};
zytemp_mqtt = pkgs.python3Packages.buildPythonPackage
rec {

View file

@ -145,6 +145,9 @@ class Desk:
delta_s = now - self.last_est
if delta_s > self.MAX_EST_INTERVAL:
# Attempt at fixing the issue of
# the service not working after the night
self._initialize()
self.log.warning(
"Too long without getting a report, "
"assuming the desk might be anywhere now."
@ -262,7 +265,7 @@ class Desk:
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
desk = Desk()

View file

@ -5,17 +5,12 @@
xorg = true;
};
dev = {
c = true;
docker = true;
vm = true;
};
extra = true;
gaming = true;
storageSize = "big";
};
# Shenanigans
# nixpkgs.config.allowUnfree = true;
# virtualisation.virtualbox.host.enable = true;
# virtualisation.virtualbox.host.enableExtensionPack = true;
# users.extraGroups.vboxusers.members = [ "geoffrey" ];
# TODO Convert Windows VM from virtualbox to libvirt
}

View file

@ -1,4 +1,4 @@
{ pkgs, lib, config, nixos-hardware, displaylinknixpkgs, ... }:
{ pkgs, lib, nixos-hardware, unixpkgs, ... }:
let
displays = {
embedded = {
@ -65,7 +65,10 @@ in
};
nixpkgs.overlays = [
(self: super: {
displaylink = (import displaylinknixpkgs { inherit (super) system; config.allowUnfree = true; }).displaylink;
displaylink = (import unixpkgs {
inherit (super) system;
config.allowUnfree = true;
}).displaylink;
})
];
services = {

View file

@ -9,5 +9,6 @@
'';
interfaces.enp3s0.wakeOnLan.enable = true;
};
services.tlp.settings.WOL_DISABLE = false;
};
}

View file

@ -116,18 +116,17 @@
},
"devshell": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": [
"nixvim",
"nixpkgs"
]
},
"locked": {
"lastModified": 1717408969,
"narHash": "sha256-Q0OEFqe35fZbbRPPRdrjTUUChKVhhWXz3T9ZSKmaoVY=",
"lastModified": 1722113426,
"narHash": "sha256-Yo/3loq572A8Su6aY5GP56knpuKYRvM2a1meP9oJZCw=",
"owner": "numtide",
"repo": "devshell",
"rev": "1ebbe68d57457c8cae98145410b164b5477761f4",
"rev": "67cce7359e4cd3c45296fb4aaf6a19e2a9c757ae",
"type": "github"
},
"original": {
@ -143,11 +142,11 @@
]
},
"locked": {
"lastModified": 1719864345,
"narHash": "sha256-e4Pw+30vFAxuvkSTaTypd9zYemB/QlWcH186dsGT+Ms=",
"lastModified": 1727531434,
"narHash": "sha256-b+GBgCWd2N6pkiTkRZaMFOPztPO4IVTaclYPrQl2uLk=",
"owner": "nix-community",
"repo": "disko",
"rev": "544a80a69d6e2da04e4df7ec8210a858de8c7533",
"rev": "b709e1cc33fcde71c7db43850a55ebe6449d0959",
"type": "github"
},
"original": {
@ -155,22 +154,6 @@
"type": "indirect"
}
},
"displaylinknixpkgs": {
"locked": {
"lastModified": 1717533296,
"narHash": "sha256-TOxOpOYy/tQB+eYQOTPQXNeUmkMghLVDBO0Gc2nj/vs=",
"owner": "GeoffreyFrogeye",
"repo": "nixpkgs",
"rev": "99006b6f4cd24796b1ff6b6981b8f44c9cebd301",
"type": "github"
},
"original": {
"owner": "GeoffreyFrogeye",
"ref": "displaylink-600",
"repo": "nixpkgs",
"type": "github"
}
},
"flake-compat": {
"locked": {
"lastModified": 1696426674,
@ -225,11 +208,11 @@
]
},
"locked": {
"lastModified": 1719877454,
"narHash": "sha256-g5N1yyOSsPNiOlFfkuI/wcUjmtah+nxdImJqrSATjOU=",
"lastModified": 1725234343,
"narHash": "sha256-+ebgonl3NbiKD2UD0x4BszCZQ6sTfL4xioaM49o5B3Y=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "4e3583423212f9303aa1a6337f8dffb415920e4f",
"rev": "567b938d64d4b4112ee253b9274472dc3a346eb6",
"type": "github"
},
"original": {
@ -243,29 +226,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"lastModified": 1726560853,
"narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a",
"type": "github"
},
"original": {
@ -304,11 +269,11 @@
]
},
"locked": {
"lastModified": 1719259945,
"narHash": "sha256-F1h+XIsGKT9TkGO3omxDLEb/9jOOsI6NnzsXFsZhry4=",
"lastModified": 1724857454,
"narHash": "sha256-Qyl9Q4QMTLZnnBb/8OuQ9LSkzWjBU1T5l5zIzTxkkhk=",
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "0ff4381bbb8f7a52ca4a851660fc7a437a4c6e07",
"rev": "4509ca64f1084e73bc7a721b20c669a8d4c5ebe6",
"type": "github"
},
"original": {
@ -363,11 +328,11 @@
]
},
"locked": {
"lastModified": 1719827385,
"narHash": "sha256-qs+nU20Sm8czHg3bhGCqiH+8e13BJyRrKONW34g3i50=",
"lastModified": 1726989464,
"narHash": "sha256-Vl+WVTJwutXkimwGprnEtXc/s/s8sMuXzqXaspIGlwM=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "391ca6e950c2525b4f853cbe29922452c14eda82",
"rev": "2f23fa308a7c067e52dfcc30a0758f47043ec176",
"type": "github"
},
"original": {
@ -384,11 +349,11 @@
]
},
"locked": {
"lastModified": 1719827385,
"narHash": "sha256-qs+nU20Sm8czHg3bhGCqiH+8e13BJyRrKONW34g3i50=",
"lastModified": 1720042825,
"narHash": "sha256-A0vrUB6x82/jvf17qPCpxaM+ulJnD8YZwH9Ci0BsAzE=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "391ca6e950c2525b4f853cbe29922452c14eda82",
"rev": "e1391fb22e18a36f57e6999c7a9f966dc80ac073",
"type": "github"
},
"original": {
@ -427,11 +392,11 @@
]
},
"locked": {
"lastModified": 1719845423,
"narHash": "sha256-ZLHDmWAsHQQKnmfyhYSHJDlt8Wfjv6SQhl2qek42O7A=",
"lastModified": 1725189302,
"narHash": "sha256-IhXok/kwQqtusPsoguQLCHA+h6gKvgdCrkhIaN+kByA=",
"owner": "lnl7",
"repo": "nix-darwin",
"rev": "ec12b88104d6c117871fad55e931addac4626756",
"rev": "7c4b53a7d9f3a3df902b3fddf2ae245ef20ebcda",
"type": "github"
},
"original": {
@ -446,7 +411,10 @@
"nix-on-droid",
"nixpkgs"
],
"nmd": "nmd",
"nmd": [
"nix-on-droid",
"nmd"
],
"nmt": "nmt"
},
"locked": {
@ -474,14 +442,14 @@
],
"nixpkgs-docs": "nixpkgs-docs",
"nixpkgs-for-bootstrap": "nixpkgs-for-bootstrap",
"nmd": "nmd_2"
"nmd": "nmd"
},
"locked": {
"lastModified": 1719772177,
"narHash": "sha256-XJOxCLWQUn+3VQTQGRwMoz6nDjT+GoL0f4mVrUQdzfk=",
"lastModified": 1725658585,
"narHash": "sha256-P29z4Gt89n5ps1U7+qmIrj0BuRXGZQSIaOe2+tsPgfw=",
"owner": "nix-community",
"repo": "nix-on-droid",
"rev": "45fcd2da39a70a96752e17f85d7a843b3c4f67f4",
"rev": "5d88ff2519e4952f8d22472b52c531bb5f1635fc",
"type": "github"
},
"original": {
@ -492,11 +460,11 @@
},
"nixos-hardware": {
"locked": {
"lastModified": 1719895800,
"narHash": "sha256-xNbjISJTFailxass4LmdWeV4jNhAlmJPwj46a/GxE6M=",
"lastModified": 1727665282,
"narHash": "sha256-oKtfbQB1MBypqIyzkC8QCQcVGOa1soaXaGgcBIoh14o=",
"owner": "NixOS",
"repo": "nixos-hardware",
"rev": "6e253f12b1009053eff5344be5e835f604bb64cd",
"rev": "11c43c830e533dad1be527ecce379fcf994fbbb5",
"type": "github"
},
"original": {
@ -506,11 +474,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1719838683,
"narHash": "sha256-Zw9rQjHz1ilNIimEXFeVa1ERNRBF8DoXDhLAZq5B4pE=",
"lastModified": 1727672256,
"narHash": "sha256-9/79hjQc9+xyH+QxeMcRsA6hDyw6Z9Eo1/oxjvwirLk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "d032c1a6dfad4eedec7e35e91986becc699d7d69",
"rev": "1719f27dd95fd4206afb9cec9f415b539978827e",
"type": "github"
},
"original": {
@ -537,17 +505,17 @@
},
"nixpkgs-for-bootstrap": {
"locked": {
"lastModified": 1708105575,
"narHash": "sha256-sS4AItZeUnAei6v8FqxNlm+/27MPlfoGym/TZP0rmH0=",
"lastModified": 1720244366,
"narHash": "sha256-WrDV0FPMVd2Sq9hkR5LNHudS3OSMmUrs90JUTN+MXpA=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "1d1817869c47682a6bee85b5b0a6537b6c0fba26",
"rev": "49ee0e94463abada1de470c9c07bfc12b36dcf40",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "1d1817869c47682a6bee85b5b0a6537b6c0fba26",
"rev": "49ee0e94463abada1de470c9c07bfc12b36dcf40",
"type": "github"
}
},
@ -565,11 +533,11 @@
"treefmt-nix": "treefmt-nix"
},
"locked": {
"lastModified": 1719923896,
"narHash": "sha256-/hfE2x9NbT13d53o9uq6MuMipV19pJUQzpsZIhlvsiM=",
"lastModified": 1725350106,
"narHash": "sha256-TaMMlI2KPJ3wCyxJk6AShOLhNuTeabHCnvYRkLBlEFs=",
"owner": "nix-community",
"repo": "nixvim",
"rev": "d384cf656cb1b21d90eee1b007a6ade6f90768f5",
"rev": "0f2c31e6a57a83ed4e6fa3adc76749620231055d",
"type": "github"
},
"original": {
@ -580,22 +548,6 @@
}
},
"nmd": {
"flake": false,
"locked": {
"lastModified": 1666190571,
"narHash": "sha256-Z1hc7M9X6L+H83o9vOprijpzhTfOBjd0KmUTnpHAVjA=",
"owner": "rycee",
"repo": "nmd",
"rev": "b75d312b4f33bd3294cd8ae5c2ca8c6da2afc169",
"type": "gitlab"
},
"original": {
"owner": "rycee",
"repo": "nmd",
"type": "gitlab"
}
},
"nmd_2": {
"inputs": {
"nixpkgs": [
"nix-on-droid",
@ -635,11 +587,11 @@
},
"nur": {
"locked": {
"lastModified": 1719925604,
"narHash": "sha256-cLmqi+P1sn+7497GS4fNWayoTDa0ZiJecjmEM4iQN9U=",
"lastModified": 1727779756,
"narHash": "sha256-KLeROOi6VYct8lP1TIAPABlKOsisecKZLOozD5W54PE=",
"owner": "nix-community",
"repo": "NUR",
"rev": "574405811547dcec59e912c5e82bfd224648bd5e",
"rev": "01c22fb0d5c07a4a2f9ffc4b132cfa4ee4e1cce2",
"type": "github"
},
"original": {
@ -651,7 +603,6 @@
"root": {
"inputs": {
"disko": "disko",
"displaylinknixpkgs": "displaylinknixpkgs",
"flake-utils": "flake-utils",
"home-manager": "home-manager",
"nix-on-droid": "nix-on-droid",
@ -659,7 +610,8 @@
"nixpkgs": "nixpkgs",
"nixvim": "nixvim",
"nur": "nur",
"stylix": "stylix"
"stylix": "stylix",
"unixpkgs": "unixpkgs"
}
},
"scss-reset": {
@ -724,21 +676,6 @@
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
@ -747,11 +684,11 @@
]
},
"locked": {
"lastModified": 1719887753,
"narHash": "sha256-p0B2r98UtZzRDM5miGRafL4h7TwGRC4DII+XXHDHqek=",
"lastModified": 1724833132,
"narHash": "sha256-F4djBvyNRAXGusJiNYInqR6zIMI3rvlp6WiKwsRISos=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "bdb6355009562d8f9313d9460c0d3860f525bc6c",
"rev": "3ffd842a5f50f435d3e603312eefa4790db46af5",
"type": "github"
},
"original": {
@ -759,6 +696,21 @@
"repo": "treefmt-nix",
"type": "github"
}
},
"unixpkgs": {
"locked": {
"lastModified": 1727785308,
"narHash": "sha256-t2PqANZPqbtiqOjiQFaHEXuMeG1laa1g+4OcQuo+MjE=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "1c9c0eabb80d35c24eb4e7968c9ee15641a3e0fd",
"type": "github"
},
"original": {
"id": "nixpkgs",
"ref": "master",
"type": "indirect"
}
}
},
"root": "root",

View file

@ -4,7 +4,7 @@
inputs = {
# Packages
nixpkgs.url = "nixpkgs/nixos-24.05";
displaylinknixpkgs.url = "github:GeoffreyFrogeye/nixpkgs/displaylink-600";
unixpkgs.url = "nixpkgs/master";
# OS
disko = {
url = "disko";

View file

@ -190,6 +190,7 @@
usbutils
dmidecode
lshw
labelle # Label printer
# Locker
(pkgs.writeShellApplication {

View file

@ -52,7 +52,7 @@
hwdec = "auto-safe";
profile = "gpu-hq";
};
scripts = with pkgs.mpvScripts; [ thumbnail ];
scripts = with pkgs.mpvScripts; [ thumbnail mpris ];
scriptOpts = {
mpv_thumbnail_script = {
autogenerate = false; # TODO It creates too many processes at once, crashing the system

View file

@ -1,94 +0,0 @@
#!/usr/bin/env python3
import typing
import subprocess
import time
# CORE
class Notifier:
pass
class Section:
def __init__(self) -> None:
self.text = b"(Loading)"
class Module:
def __init__(self) -> None:
self.bar: "Bar"
self.section = Section()
self.sections = [self.section]
class Alignment:
def __init__(self, *modules: Module) -> None:
self.bar: "Bar"
self.modules = modules
for module in modules:
module.bar = self.bar
class Screen:
def __init__(self, left: Alignment = Alignment(), right: Alignment = Alignment()) -> None:
self.bar: "Bar"
self.left = left
self.left.bar = self.bar
self.right = right or Alignment()
self.right.bar = self.bar
class Bar:
def __init__(self, *screens: Screen) -> None:
self.screens = screens
for screen in screens:
screen.bar = self
self.process = subprocess.Popen(["lemonbar"], stdin=subprocess.PIPE)
def display(self) -> None:
string = b""
for s, screen in enumerate(self.screens):
string += b"%%{S%d}" % s
for control, alignment in [(b'%{l}', screen.left), (b'%{r}', screen.right)]:
string += control
for module in alignment.modules:
for section in module.sections:
string += b"<%b> |" % section.text
string += b"\n"
print(string)
assert self.process.stdin
self.process.stdin.write(string)
self.process.stdin.flush()
def run(self) -> None:
while True:
self.display()
time.sleep(1)
# REUSABLE
class ClockNotifier(Notifier):
def run(self) -> None:
while True:
def __init__(self, text: bytes):
super().__init__()
self.section.text = text
class StaticModule(Module):
def __init__(self, text: bytes):
super().__init__()
self.section.text = text
# USER
if __name__ == "__main__":
bar = Bar(
Screen(Alignment(StaticModule(b"A"))),
Screen(Alignment(StaticModule(b"B"))),
)
bar.run()

View file

@ -0,0 +1,759 @@
#!/usr/bin/env python3
import asyncio
import datetime
import enum
import ipaddress
import logging
import random
import signal
import socket
import typing
import coloredlogs
import i3ipc
import i3ipc.aio
import psutil
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
T = typing.TypeVar("T", bound="ComposableText")
P = typing.TypeVar("P", bound="ComposableText")
C = typing.TypeVar("C", bound="ComposableText")
Sortable = str | int
def humanSize(numi: int) -> str:
"""
Returns a string of width 3+3
"""
num = float(numi)
for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"):
if abs(num) < 1000:
if num >= 10:
return "{:3d}{}".format(int(num), unit)
else:
return "{:.1f}{}".format(num, unit)
num /= 1024
return "{:d}YiB".format(numi)
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
def setParent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent
self.parent.updateMarkup()
def unsetParent(self) -> None:
assert self.parent
self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent = None
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
parent = self
while not isinstance(parent, typ):
assert parent.parent, f"{self} doesn't have a parent of {typ}"
parent = parent.parent
return parent
def updateMarkup(self) -> None:
self.bar.refresh.set()
# TODO OPTI See if worth caching the output
def generateMarkup(self) -> str:
raise NotImplementedError(f"{self} cannot generate markup")
def getMarkup(self) -> str:
return self.generateMarkup()
def randomColor(seed: int | bytes | None = None) -> str:
if seed is not None:
random.seed(seed)
return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3))
class Button(enum.Enum):
CLICK_LEFT = "1"
CLICK_MIDDLE = "2"
CLICK_RIGHT = "3"
SCROLL_UP = "4"
SCROLL_DOWN = "5"
class Section(ComposableText):
"""
Colorable block separated by chevrons
"""
def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
self.parent: "Module"
self.color = randomColor()
self.desiredText: str | None = None
self.text = ""
self.targetSize = -1
self.size = -1
self.animationTask: asyncio.Task | None = None
self.actions: dict[Button, str] = dict()
def isHidden(self) -> bool:
return self.size < 0
# Geometric series, with a cap
ANIM_A = 0.025
ANIM_R = 0.9
ANIM_MIN = 0.001
async def animate(self) -> None:
increment = 1 if self.size < self.targetSize else -1
loop = asyncio.get_running_loop()
frameTime = loop.time()
animTime = self.ANIM_A
while self.size != self.targetSize:
self.size += increment
self.updateMarkup()
animTime *= self.ANIM_R
animTime = max(self.ANIM_MIN, animTime)
frameTime += animTime
sleepTime = frameTime - loop.time()
# In case of stress, skip refreshing by not awaiting
if sleepTime > 0:
await asyncio.sleep(sleepTime)
else:
log.warning("Skipped an animation frame")
def setText(self, text: str | None) -> None:
# OPTI Don't redraw nor reset animation if setting the same text
if self.desiredText == text:
return
self.desiredText = text
if text is None:
self.text = ""
self.targetSize = -1
else:
self.text = f" {text} "
self.targetSize = len(self.text)
if self.animationTask:
self.animationTask.cancel()
# OPTI Skip the whole animation task if not required
if self.size == self.targetSize:
self.updateMarkup()
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate())
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.removeAction(command)
del self.actions[button]
if callback:
command = self.bar.addAction(callback)
self.actions[button] = command
def generateMarkup(self) -> str:
assert not self.isHidden()
pad = max(0, self.size - len(self.text))
text = self.text[: self.size] + " " * pad
for button, command in self.actions.items():
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
return text
class Module(ComposableText):
"""
Sections handled by a same updater
"""
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
self.parent: "Side"
self.children: typing.MutableSequence[Section]
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
def mirror(self, module: "Module") -> None:
self.mirroring = module
module.mirrors.append(self)
def getSections(self) -> typing.Sequence[Section]:
if self.mirroring:
return self.mirroring.children
else:
return self.children
def updateMarkup(self) -> None:
super().updateMarkup()
for mirror in self.mirrors:
mirror.updateMarkup()
class Alignment(enum.Enum):
LEFT = "l"
RIGHT = "r"
CENTER = "c"
class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent)
self.parent: Screen
self.children: typing.MutableSequence[Module] = []
self.alignment = alignment
def generateMarkup(self) -> str:
if not self.children:
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
for module in self.children:
for section in module.getSections():
if section.isHidden():
continue
if lastSection is None:
if self.alignment == Alignment.LEFT:
text += "%{B" + section.color + "}%{F-}"
else:
text += "%{B-}%{F" + section.color + "}%{R}%{F-}"
else:
if self.alignment == Alignment.RIGHT:
if lastSection.color == section.color:
text += ""
else:
text += "%{F" + section.color + "}%{R}"
else:
if lastSection.color == section.color:
text += ""
else:
text += "%{R}%{B" + section.color + "}"
text += "%{F-}"
text += section.getMarkup()
lastSection = section
if self.alignment != Alignment.RIGHT:
text += "%{R}%{B-}"
return text
class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent)
self.parent: "Bar"
self.children: typing.MutableSequence[Side]
self.output = output
for alignment in Alignment:
Side(parent=self, alignment=alignment)
def generateMarkup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join(
side.getMarkup() for side in self.children
)
class Bar(ComposableText):
"""
Top-level
"""
def __init__(self) -> None:
super().__init__()
self.parent: None
self.children: typing.MutableSequence[Screen]
self.longRunningTasks: list[asyncio.Task] = list()
self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list()
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
self.periodicProviderTask: typing.Coroutine | None = None
i3 = i3ipc.Connection()
for output in i3.get_outputs():
if not output.active:
continue
Screen(parent=self, output=output.name)
def addLongRunningTask(self, coro: typing.Coroutine) -> None:
task = self.taskGroup.create_task(coro)
self.longRunningTasks.append(task)
async def run(self) -> None:
cmd = [
"lemonbar",
"-b",
"-a",
"64",
"-f",
"DejaVuSansM Nerd Font:size=10",
]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
)
async def refresher() -> None:
assert proc.stdin
while True:
await self.refresh.wait()
self.refresh.clear()
markup = self.getMarkup()
proc.stdin.write(markup.encode())
async def actionHandler() -> None:
assert proc.stdout
while True:
line = await proc.stdout.readline()
command = line.decode().strip()
callback = self.actions[command]
callback()
async with self.taskGroup:
self.addLongRunningTask(refresher())
self.addLongRunningTask(actionHandler())
for provider in self.providers:
self.addLongRunningTask(provider.run())
def exit() -> None:
log.info("Terminating")
for task in self.longRunningTasks:
task.cancel()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, exit)
def generateMarkup(self) -> str:
return "".join(screen.getMarkup() for screen in self.children) + "\n"
def addProvider(
self,
provider: "Provider",
alignment: Alignment = Alignment.LEFT,
screenNum: int | None = None,
) -> None:
"""
screenNum: the provider will be added on this screen if set, all otherwise
"""
modules = list()
for s, screen in enumerate(self.children):
if screenNum is None or s == screenNum:
side = next(filter(lambda s: s.alignment == alignment, screen.children))
module = Module(parent=side)
modules.append(module)
provider.modules = modules
if modules:
self.providers.append(provider)
def addAction(self, callback: typing.Callable) -> str:
command = f"{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def removeAction(self, command: str) -> None:
del self.actions[command]
class Provider:
def __init__(self) -> None:
self.modules: list[Module] = list()
async def run(self) -> None:
# Not a NotImplementedError, otherwise can't combine all classes
pass
class MirrorProvider(Provider):
def __init__(self) -> None:
super().__init__()
self.module: Module
async def run(self) -> None:
await super().run()
self.module = self.modules[0]
for module in self.modules[1:]:
module.mirror(self.module)
class SingleSectionProvider(MirrorProvider):
async def run(self) -> None:
await super().run()
self.section = Section(parent=self.module)
class StaticProvider(SingleSectionProvider):
def __init__(self, text: str) -> None:
self.text = text
async def run(self) -> None:
await super().run()
self.section.setText(self.text)
class StatefulSection(Section):
def __init__(self, parent: Module, sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
self.state = 0
self.numberStates: int
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
def incrementState(self) -> None:
self.state += 1
self.changeState()
def decrementState(self) -> None:
self.state -= 1
self.changeState()
def setChangedState(self, callback: typing.Callable) -> None:
self.callback = callback
def changeState(self) -> None:
self.state %= self.numberStates
self.bar.taskGroup.create_task(self.callback())
class SingleStatefulSectionProvider(MirrorProvider):
async def run(self) -> None:
await super().run()
self.section = StatefulSection(parent=self.module)
class PeriodicProvider(Provider):
async def init(self) -> None:
pass
async def loop(self) -> None:
raise NotImplementedError()
@classmethod
async def task(cls, bar: Bar) -> None:
providers = list()
for provider in bar.providers:
if isinstance(provider, PeriodicProvider):
providers.append(provider)
await provider.init()
while True:
# TODO Block bar update during the periodic update of the loops
loops = [provider.loop() for provider in providers]
asyncio.gather(*loops)
now = datetime.datetime.now()
# Hardcoded to 1 second... not sure if we want some more than that,
# and if the logic to check if a task should run would be a win
# compared to the task itself
remaining = 1 - now.microsecond / 1000000
await asyncio.sleep(remaining)
async def run(self) -> None:
await super().run()
for module in self.modules:
bar = module.getFirstParentOfType(Bar)
assert bar
if not bar.periodicProviderTask:
bar.periodicProviderTask = PeriodicProvider.task(bar)
bar.addLongRunningTask(bar.periodicProviderTask)
class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider):
async def run(self) -> None:
await super().run()
self.section.setChangedState(self.loop)
# Providers
class I3ModeProvider(SingleSectionProvider):
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(None if e.change == "default" else e.change)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.MODE, self.on_mode)
await i3.main()
class I3WindowTitleProvider(SingleSectionProvider):
# TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(e.container.name)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WINDOW, self.on_window)
await i3.main()
class I3WorkspacesProvider(Provider):
# TODO Custom names
# TODO Colors
async def updateWorkspaces(self, i3: i3ipc.Connection) -> None:
"""
Since the i3 IPC interface cannot really tell you by events
when workspaces get invisible or not urgent anymore.
Relying on those exclusively would require reimplementing some of i3 logic.
Fetching all the workspaces on event looks ugly but is the most maintainable.
Times I tried to challenge this and failed: 2.
"""
workspaces = await i3.get_workspaces()
for workspace in workspaces:
module = self.modulesFromOutput[workspace.output]
if workspace.num in self.sections:
section = self.sections[workspace.num]
if section.parent != module:
section.unsetParent()
section.setParent(module)
else:
section = Section(parent=module, sortKey=workspace.num)
self.sections[workspace.num] = section
def generate_switch_workspace(num: int) -> typing.Callable:
def switch_workspace() -> None:
self.bar.taskGroup.create_task(
i3.command(f"workspace number {num}")
)
return switch_workspace
section.setAction(
Button.CLICK_LEFT, generate_switch_workspace(workspace.num)
)
name = workspace.name
if workspace.urgent:
name = f"{name} !"
elif workspace.focused:
name = f"{name} +"
elif workspace.visible:
name = f"{name} *"
section.setText(name)
workspacesNums = set(workspace.num for workspace in workspaces)
for num, section in self.sections.items():
if num not in workspacesNums:
# This should delete the Section but it turned out to be hard
section.setText(None)
def onWorkspaceChange(
self, i3: i3ipc.Connection, e: i3ipc.Event | None = None
) -> None:
# Cancelling the task doesn't seem to prevent performance double-events
self.bar.taskGroup.create_task(self.updateWorkspaces(i3))
def __init__(
self,
) -> None:
super().__init__()
self.sections: dict[int, Section] = dict()
self.modulesFromOutput: dict[str, Module] = dict()
self.bar: Bar
async def run(self) -> None:
for module in self.modules:
screen = module.getFirstParentOfType(Screen)
output = screen.output
self.modulesFromOutput[output] = module
self.bar = module.bar
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange)
self.onWorkspaceChange(i3)
await i3.main()
class NetworkProviderSection(StatefulSection):
def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None:
super().__init__(parent=parent, sortKey=iface)
self.iface = iface
self.provider = provider
self.ignore = False
self.icon = "?"
self.wifi = False
if iface == "lo":
self.ignore = True
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
self.icon = ""
else:
self.icon = ""
elif iface.startswith("wlan") or iface.startswith("wl"):
self.icon = ""
self.wifi = True
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
):
self.icon = ""
elif iface.startswith("docker"):
self.icon = ""
elif iface.startswith("veth"):
self.icon = ""
elif iface.startswith("vboxnet"):
self.icon = ""
self.numberStates = 5 if self.wifi else 4
self.state = 1 if self.wifi else 0
self.setChangedState(self.update)
async def update(self) -> None:
if self.ignore or not self.provider.if_stats[self.iface].isup:
self.setText(None)
return
text = self.icon
state = self.state + (0 if self.wifi else 1) # SSID
if self.wifi and state >= 1:
cmd = ["iwgetid", self.iface, "--raw"]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
text += f" {stdout.decode().strip()}"
if state >= 2: # Address
for address in self.provider.if_addrs[self.iface]:
if address.family == socket.AF_INET:
net = ipaddress.IPv4Network(
(address.address, address.netmask), strict=False
)
text += f" {net.with_prefixlen}"
break
if state >= 3: # Speed
prevRecv = self.provider.prev_io_counters[self.iface].bytes_recv
recv = self.provider.io_counters[self.iface].bytes_recv
prevSent = self.provider.prev_io_counters[self.iface].bytes_sent
sent = self.provider.io_counters[self.iface].bytes_sent
dt = self.provider.time - self.provider.prev_time
recvDiff = (recv - prevRecv) / dt
sentDiff = (sent - prevSent) / dt
text += f"{humanSize(recvDiff)}{humanSize(sentDiff)}"
if state >= 4: # Counter
text += f"{humanSize(recv)}{humanSize(sent)}"
self.setText(text)
class NetworkProvider(MirrorProvider, PeriodicProvider):
def __init__(self) -> None:
super().__init__()
self.sections: dict[str, NetworkProviderSection] = dict()
async def init(self) -> None:
loop = asyncio.get_running_loop()
self.time = loop.time()
self.io_counters = psutil.net_io_counters(pernic=True)
async def loop(self) -> None:
loop = asyncio.get_running_loop()
async with asyncio.TaskGroup() as tg:
self.prev_io_counters = self.io_counters
self.prev_time = self.time
# On-demand would only benefit if_addrs:
# stats are used to determine display,
# and we want to keep previous io_counters
# so displaying stats is ~instant.
self.time = loop.time()
self.if_stats = psutil.net_if_stats()
self.if_addrs = psutil.net_if_addrs()
self.io_counters = psutil.net_io_counters(pernic=True)
for iface in self.if_stats:
section = self.sections.get(iface)
if not section:
section = NetworkProviderSection(
parent=self.module, iface=iface, provider=self
)
self.sections[iface] = section
tg.create_task(section.update())
for iface, section in self.sections.items():
if iface not in self.if_stats:
section.setText(None)
async def onStateChange(self, section: StatefulSection) -> None:
assert isinstance(section, NetworkProviderSection)
await section.update()
class TimeProvider(PeriodicStatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
async def init(self) -> None:
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
async def loop(self) -> None:
now = datetime.datetime.now()
format = self.FORMATS[self.section.state]
self.section.setText(now.strftime(format))
async def main() -> None:
bar = Bar()
dualScreen = len(bar.children) > 1
bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT)
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
if dualScreen:
bar.addProvider(
I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER
)
bar.addProvider(
StaticProvider(text="mpris"),
screenNum=1 if dualScreen else None,
alignment=Alignment.CENTER,
)
bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT)
bar.addProvider(
StaticProvider("pulse"),
screenNum=1 if dualScreen else None,
alignment=Alignment.RIGHT,
)
bar.addProvider(
NetworkProvider(),
screenNum=0 if dualScreen else None,
alignment=Alignment.RIGHT,
)
bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
await bar.run()
asyncio.run(main())

View file

@ -1,199 +0,0 @@
#!/usr/bin/env python3
"""
Debugging script
"""
import i3ipc
import os
import psutil
# import alsaaudio
from time import time
import subprocess
i3 = i3ipc.Connection()
lemonbar = subprocess.Popen(["lemonbar", "-b"], stdin=subprocess.PIPE)
# Utils
def upChart(p):
block = " ▁▂▃▄▅▆▇█"
return block[round(p * (len(block) - 1))]
def humanSizeOf(num, suffix="B"): # TODO Credit
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.0f%2s%s" % (num, unit, suffix)
num /= 1024.0
return "%.0f%2s%s" % (num, "Yi", suffix)
# Values
mode = ""
container = i3.get_tree().find_focused()
workspaces = i3.get_workspaces()
outputs = i3.get_outputs()
username = os.environ["USER"]
hostname = os.environ["HOSTNAME"]
if "-" in hostname:
hostname = hostname.split("-")[-1]
oldNetIO = dict()
oldTime = time()
def update():
activeOutputs = sorted(
sorted(list(filter(lambda o: o.active, outputs)), key=lambda o: o.rect.y),
key=lambda o: o.rect.x,
)
z = ""
for aOutput in range(len(activeOutputs)):
output = activeOutputs[aOutput]
# Mode || Workspaces
t = []
if mode != "":
t.append(mode)
else:
t.append(
" ".join(
[
(w.name.upper() if w.focused else w.name)
for w in workspaces
if w.output == output.name
]
)
)
# Windows Title
# if container:
# t.append(container.name)
# CPU
t.append(
"C" + "".join([upChart(p / 100) for p in psutil.cpu_percent(percpu=True)])
)
# Memory
t.append(
"M"
+ str(round(psutil.virtual_memory().percent))
+ "% "
+ "S"
+ str(round(psutil.swap_memory().percent))
+ "%"
)
# Disks
d = []
for disk in psutil.disk_partitions():
e = ""
if disk.device.startswith("/dev/sd"):
e += "S" + disk.device[-2:].upper()
elif disk.device.startswith("/dev/mmcblk"):
e += "M" + disk.device[-3] + disk.device[-1]
else:
e += "?"
e += " "
e += str(round(psutil.disk_usage(disk.mountpoint).percent)) + "%"
d.append(e)
t.append(" ".join(d))
# Network
netStats = psutil.net_if_stats()
netIO = psutil.net_io_counters(pernic=True)
net = []
for iface in filter(lambda i: i != "lo" and netStats[i].isup, netStats.keys()):
s = ""
if iface.startswith("eth"):
s += "E"
elif iface.startswith("wlan"):
s += "W"
else:
s += "?"
s += " "
now = time()
global oldNetIO, oldTime
sent = (
(oldNetIO[iface].bytes_sent if iface in oldNetIO else 0)
- (netIO[iface].bytes_sent if iface in netIO else 0)
) / (oldTime - now)
recv = (
(oldNetIO[iface].bytes_recv if iface in oldNetIO else 0)
- (netIO[iface].bytes_recv if iface in netIO else 0)
) / (oldTime - now)
s += (
""
+ humanSizeOf(abs(recv), "B/s")
+ ""
+ humanSizeOf(abs(sent), "B/s")
)
oldNetIO = netIO
oldTime = now
net.append(s)
t.append(" ".join(net))
# Battery
if os.path.isdir("/sys/class/power_supply/BAT0"):
with open("/sys/class/power_supply/BAT0/charge_now") as f:
charge_now = int(f.read())
with open("/sys/class/power_supply/BAT0/charge_full_design") as f:
charge_full = int(f.read())
t.append("B" + str(round(100 * charge_now / charge_full)) + "%")
# Volume
# t.append('V ' + str(alsaaudio.Mixer('Master').getvolume()[0]) + '%')
t.append(username + "@" + hostname)
# print(' - '.join(t))
# t = [output.name]
z += " - ".join(t) + "%{S" + str(aOutput + 1) + "}"
# lemonbar.stdin.write(bytes(' - '.join(t), 'utf-8'))
# lemonbar.stdin.write(bytes('%{S' + str(aOutput + 1) + '}', 'utf-8'))
lemonbar.stdin.write(bytes(z + "\n", "utf-8"))
lemonbar.stdin.flush()
# Event listeners
def on_mode(i3, e):
global mode
if e.change == "default":
mode = ""
else:
mode = e.change
update()
i3.on("mode", on_mode)
# def on_window_focus(i3, e):
# global container
# container = e.container
# update()
#
# i3.on("window::focus", on_window_focus)
def on_workspace_focus(i3, e):
global workspaces
workspaces = i3.get_workspaces()
update()
i3.on("workspace::focus", on_workspace_focus)
# Starting
update()
i3.main()

View file

@ -1,327 +0,0 @@
#!/usr/bin/env python3
"""
Beautiful script
"""
import subprocess
import time
import datetime
import os
import multiprocessing
import i3ipc
import difflib
# Constants
FONT = "DejaVuSansMono Nerd Font Mono"
# TODO Update to be in sync with base16
thm = [
"#002b36",
"#dc322f",
"#859900",
"#b58900",
"#268bd2",
"#6c71c4",
"#2aa198",
"#93a1a1",
"#657b83",
"#dc322f",
"#859900",
"#b58900",
"#268bd2",
"#6c71c4",
"#2aa198",
"#fdf6e3",
]
fg = "#93a1a1"
bg = "#002b36"
THEMES = {
"CENTER": (fg, bg),
"DEFAULT": (thm[0], thm[8]),
"1": (thm[0], thm[9]),
"2": (thm[0], thm[10]),
"3": (thm[0], thm[11]),
"4": (thm[0], thm[12]),
"5": (thm[0], thm[13]),
"6": (thm[0], thm[14]),
"7": (thm[0], thm[15]),
}
# Utils
def fitText(text, size):
"""
Add spaces or cut a string to be `size` characters long
"""
if size > 0:
t = len(text)
if t >= size:
return text[:size]
else:
diff = size - t
return text + " " * diff
else:
return ""
def fgColor(theme):
global THEMES
return THEMES[theme][0]
def bgColor(theme):
global THEMES
return THEMES[theme][1]
class Section:
def __init__(self, theme="DEFAULT"):
self.text = ""
self.size = 0
self.toSize = 0
self.theme = theme
self.visible = False
self.name = ""
def update(self, text):
if text == "":
self.toSize = 0
else:
if len(text) < len(self.text):
self.text = text + self.text[len(text) :]
else:
self.text = text
self.toSize = len(text) + 3
def updateSize(self):
"""
Set the size for the next frame of animation
Return if another frame is needed
"""
if self.toSize > self.size:
self.size += 1
elif self.toSize < self.size:
self.size -= 1
self.visible = self.size
return self.toSize == self.size
def draw(self, left=True, nextTheme="DEFAULT"):
s = ""
if self.visible:
if not left:
if self.theme == nextTheme:
s += ""
else:
s += "%{F" + bgColor(self.theme) + "}"
s += "%{B" + bgColor(nextTheme) + "}"
s += ""
s += "%{F" + fgColor(self.theme) + "}"
s += "%{B" + bgColor(self.theme) + "}"
s += " " if self.size > 1 else ""
s += fitText(self.text, self.size - 3)
s += " " if self.size > 2 else ""
if left:
if self.theme == nextTheme:
s += ""
else:
s += "%{F" + bgColor(self.theme) + "}"
s += "%{B" + bgColor(nextTheme) + "}"
s += ""
return s
# Section definition
sTime = Section("3")
hostname = os.environ["HOSTNAME"].split(".")[0]
sHost = Section("2")
sHost.update(
os.environ["USER"] + "@" + hostname.split("-")[-1] if "-" in hostname else hostname
)
# Groups definition
gLeft = []
gRight = [sTime, sHost]
# Bar handling
bar = subprocess.Popen(["lemonbar", "-f", FONT, "-b"], stdin=subprocess.PIPE)
def updateBar():
global timeLastUpdate, timeUpdate
global gLeft, gRight
global outputs
text = ""
for oi in range(len(outputs)):
output = outputs[oi]
gLeftFiltered = list(
filter(
lambda s: s.visible and (not s.output or s.output == output.name), gLeft
)
)
tLeft = ""
l = len(gLeftFiltered)
for gi in range(l):
g = gLeftFiltered[gi]
# Next visible section for transition
nextTheme = gLeftFiltered[gi + 1].theme if gi + 1 < l else "CENTER"
tLeft = tLeft + g.draw(True, nextTheme)
tRight = ""
for gi in range(len(gRight)):
g = gRight[gi]
nextTheme = "CENTER"
for gn in gRight[gi + 1 :]:
if gn.visible:
nextTheme = gn.theme
break
tRight = g.draw(False, nextTheme) + tRight
text += (
"%{l}"
+ tLeft
+ "%{r}"
+ tRight
+ "%{B"
+ bgColor("CENTER")
+ "}"
+ "%{S"
+ str(oi + 1)
+ "}"
)
bar.stdin.write(bytes(text + "\n", "utf-8"))
bar.stdin.flush()
# Values
i3 = i3ipc.Connection()
outputs = []
def on_output():
global outputs
outputs = sorted(
sorted(
list(filter(lambda o: o.active, i3.get_outputs())), key=lambda o: o.rect.y
),
key=lambda o: o.rect.x,
)
on_output()
def on_workspace_focus():
global i3
global gLeft
workspaces = i3.get_workspaces()
wNames = [w.name for w in workspaces]
sNames = [s.name for s in gLeft]
newGLeft = []
def actuate(section, workspace):
if workspace:
section.name = workspace.name
section.output = workspace.output
if workspace.visible:
section.update(workspace.name)
else:
section.update(workspace.name.split(" ")[0])
if workspace.focused:
section.theme = "4"
elif workspace.urgent:
section.theme = "1"
else:
section.theme = "6"
else:
section.update("")
section.theme = "6"
for tag, i, j, k, l in difflib.SequenceMatcher(None, sNames, wNames).get_opcodes():
if tag == "equal": # If the workspaces didn't changed
for a in range(j - i):
workspace = workspaces[k + a]
section = gLeft[i + a]
actuate(section, workspace)
newGLeft.append(section)
if tag in ("delete", "replace"): # If the workspaces were removed
for section in gLeft[i:j]:
if section.visible:
actuate(section, None)
newGLeft.append(section)
else:
del section
if tag in ("insert", "replace"): # If the workspaces were removed
for workspace in workspaces[k:l]:
section = Section()
actuate(section, workspace)
newGLeft.append(section)
gLeft = newGLeft
updateBar()
on_workspace_focus()
def i3events(i3childPipe):
global i3
# Proxy functions
def on_workspace_focus(i3, e):
global i3childPipe
i3childPipe.send("on_workspace_focus")
i3.on("workspace::focus", on_workspace_focus)
def on_output(i3, e):
global i3childPipe
i3childPipe.send("on_output")
i3.on("output", on_output)
i3.main()
i3parentPipe, i3childPipe = multiprocessing.Pipe()
i3process = multiprocessing.Process(target=i3events, args=(i3childPipe,))
i3process.start()
def updateValues():
# Time
now = datetime.datetime.now()
sTime.update(now.strftime("%x %X"))
def updateAnimation():
for s in set(gLeft + gRight):
s.updateSize()
updateBar()
lastUpdate = 0
while True:
now = time.time()
if i3parentPipe.poll():
msg = i3parentPipe.recv()
if msg == "on_workspace_focus":
on_workspace_focus()
elif msg == "on_output":
on_output()
# TODO Restart lemonbar
else:
print(msg)
updateAnimation()
if now >= lastUpdate + 1:
updateValues()
lastUpdate = now
time.sleep(0.05)

View file

@ -1,10 +0,0 @@
#!/usr/bin/env python3
import Xlib.display
dis = Xlib.display.Display()
nb = dis.screen_count()
for s in range(nb):
print(s)

View file

@ -12,7 +12,7 @@ let
in
# Tried using pyproject.nix but mpd2 dependency wouldn't resolve,
# is called pyton-mpd2 on PyPi but mpd2 in nixpkgs.
pkgs.python3Packages.buildPythonApplication {
pkgs.python3Packages.buildPythonApplication rec {
pname = "frobar";
version = "2.0";
@ -25,7 +25,8 @@ pkgs.python3Packages.buildPythonApplication {
pulsectl
pyinotify
];
makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath ([ lemonbar ] ++ (with pkgs; [ wirelesstools ]))}" ];
nativeBuildInputs = [ lemonbar ] ++ (with pkgs; [ wirelesstools playerctl ]);
makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}" ];
src = ./.;
}

View file

@ -37,7 +37,8 @@ def run() -> None:
)
# TODO Middle
Bar.addSectionAll(fp.MpdProvider(theme=9), BarGroupType.LEFT)
Bar.addSectionAll(fp.MprisProvider(theme=9), BarGroupType.LEFT)
# Bar.addSectionAll(fp.MpdProvider(theme=9), BarGroupType.LEFT)
# Bar.addSectionAll(I3WindowTitleProvider(), BarGroupType.LEFT)
# TODO Computer modes

View file

@ -274,11 +274,17 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater):
sinks = []
with pulsectl.Pulse("list-sinks") as pulse:
for sink in pulse.sink_list():
if sink.port_active.name == "analog-output-headphones":
if (
sink.port_active.name == "analog-output-headphones"
or sink.port_active.description == "Headphones"
):
icon = ""
elif sink.port_active.name == "analog-output-speaker":
elif (
sink.port_active.name == "analog-output-speaker"
or sink.port_active.description == "Speaker"
):
icon = "" if sink.mute else ""
elif sink.port_active.name == "headset-output":
elif sink.port_active.name in ("headset-output", "headphone-output"):
icon = ""
else:
icon = "?"
@ -849,3 +855,100 @@ class MpdProvider(Section, ThreadedUpdater):
self.connect()
except BaseException as e:
log.error(e, exc_info=True)
class MprisProviderSection(Section, Updater):
def __init__(self, parent: "MprisProvider"):
Updater.__init__(self)
Section.__init__(self, theme=parent.theme)
self.parent = parent
class MprisProvider(Section, ThreadedUpdater):
# TODO Controls (select player at least)
# TODO Use the Python native thing for it:
# https://github.com/altdesktop/playerctl?tab=readme-ov-file#using-the-library
# TODO Make it less sucky
SECTIONS = [
"{{ playerName }} {{ status }}",
"{{ album }}",
"{{ artist }}",
"{{ duration(position) }}|{{ duration(mpris:length) }}" " {{ title }}",
]
# nf-fd icons don't work (UTF-16?)
SUBSTITUTIONS = {
"Playing": "",
"Paused": "",
"Stopped": "",
"mpd": "",
"firefox": "",
"chromium": "",
"mpv": "",
}
ICONS = {
1: "",
2: "",
3: "",
}
def __init__(self, theme: int | None = None):
ThreadedUpdater.__init__(self)
Section.__init__(self, theme)
self.line = ""
self.start()
self.sections: list[Section] = []
def fetcher(self) -> Element:
create = not len(self.sections)
populate = self.line
split = self.line.split("\t")
lastSection: Section = self
for i in range(len(self.SECTIONS)):
if create:
section = Section(theme=self.theme)
lastSection.appendAfter(section)
lastSection = section
self.sections.append(section)
else:
section = self.sections[i]
if populate:
text = split[i]
if i == 0:
for key, val in self.SUBSTITUTIONS.items():
text = text.replace(key, val)
if text:
if i in self.ICONS:
text = f"{self.ICONS[i]} {text}"
section.updateText(text)
else:
section.updateText(None)
else:
section.updateText(None)
return None
def loop(self) -> None:
cmd = [
"playerctl",
"metadata",
"--format",
"\t".join(self.SECTIONS),
"--follow",
]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
assert p.stdout
while p.poll() is None:
self.line = p.stdout.readline().decode().strip()
self.refreshData()
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
assert p.stdout
while p.poll() is None:
self.line = p.stdout.readline().decode().strip()
self.refreshData()

View file

@ -4,6 +4,7 @@ import functools
import logging
import math
import os
import subprocess
import threading
import time
@ -11,8 +12,8 @@ import coloredlogs
import i3ipc
import pyinotify
from frobar.display import Element
from frobar.common import notBusy
from frobar.display import Element
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()

View file

@ -6,12 +6,14 @@
ashuffle
mpc-cli
vimpc
playerctl
];
sessionVariables = {
MPD_PORT = "${toString config.services.mpd.network.port}";
};
};
services.mpd = {
services = {
mpd = {
enable = true;
network = {
listenAddress = "0.0.0.0"; # Can be controlled remotely, determined with firewall
@ -19,9 +21,22 @@
};
extraConfig = ''
restore_paused "yes"
audio_output {
type "pipewire"
name "PipeWire Sound Server"
}
'';
# UPST auto audio_output ?
musicDirectory = "${config.home.homeDirectory}/Musiques";
};
# Expose mpd to mpris
# mpd-mpris also exists but is MIT and make playerctld not pick up on play/pause events
mpdris2.enable = true;
# Allow control from headset
mpris-proxy.enable = true;
# Remember the last player
playerctld.enable = true;
};
xdg = {
configFile = {
"vimpc/vimpcrc" = {
@ -40,9 +55,9 @@
};
xsession.windowManager.i3.config.keybindings =
{
"XF86AudioPrev" = "exec ${pkgs.mpc-cli}/bin/mpc prev";
"XF86AudioPlay" = "exec ${pkgs.mpc-cli}/bin/mpc toggle";
"XF86AudioNext" = "exec ${pkgs.mpc-cli}/bin/mpc next";
"XF86AudioPrev" = "exec ${lib.getExe pkgs.playerctl} previous";
"XF86AudioPlay" = "exec ${lib.getExe pkgs.playerctl} play-pause";
"XF86AudioNext" = "exec ${lib.getExe pkgs.playerctl} next";
};
};
}

View file

@ -20,6 +20,8 @@
cmake
ddd
gdb
gnumake
valgrind
];
sessionVariables = {
CCACHE_CONFIGPATH = "${config.xdg.configHome}/ccache.conf";
@ -28,7 +30,10 @@
programs.bash.shellAliases = {
gdb = "gdb -x ${config.xdg.configHome}/gdbinit";
};
programs.nixvim.plugins.dap.enable = true; # Debug Adapter Protocol client
programs.nixvim.plugins = {
dap.enable = true; # Debug Adapter Protocol client
lsp.servers.clangd.enable = true;
};
xdg.configFile = {
"ccache.conf" = {
text = "ccache_dir = ${config.xdg.cacheHome}/ccache";

View file

@ -22,6 +22,7 @@
# nix
lix
nixpkgs-fmt
# Always on (graphical)
] ++ lib.optionals config.frogeye.desktop.xorg [
@ -53,6 +54,10 @@
] ++ lib.optionals (config.frogeye.desktop.xorg && config.frogeye.dev.fpga) [
yosys
gtkwave
# VM (graphical)
] ++ lib.optionals (config.frogeye.desktop.xorg && config.frogeye.dev.vm) [
virt-manager
];
programs.nixvim.plugins.lsp.servers = {
@ -62,9 +67,16 @@
lua-ls.enable = true; # Lua (for Neovim debugging)
perlpls.enable = config.frogeye.dev.perl; # Perl
phpactor.enable = config.frogeye.dev.php; # PHP
nixd = {
# Nix
nil-ls = {
enable = true;
settings.formatting.command = [ "nixpkgs-fmt" ];
settings = {
formatting.command = [ "nixpkgs-fmt" ];
nix.flake = {
autoArchive = true;
autoEvalInputs = true;
};
};
};
# TODO Something for SQL. sqls is deprecated, sqlls is not in Nixpkgs. Probably needs a DB connection configured anyways?
yamlls.enable = true; # YAML

View file

@ -34,9 +34,11 @@
# TODO Convert existing LaTeX documents into using Nix build system
# texlive is big and not that much used, sooo
pdftk
pdfgrep
# Misc
haskellPackages.dice
rustdesk-flutter
] ++ lib.optionals config.frogeye.desktop.xorg [

View file

@ -1,6 +1,9 @@
{ pkgs, lib, config, ... }:
let
cfg = config.programs.git;
in
{
config = lib.mkIf config.programs.git.enable {
config = lib.mkIf cfg.enable {
home.packages = [
(pkgs.writeShellApplication {
name = "git-sync";
@ -14,10 +17,18 @@
else
(
cd "${r.path}"
if [ -d .jj ]
then
${lib.getExe pkgs.jujutsu} git fetch
${lib.getExe pkgs.jujutsu} rebase -d main@origin
${lib.getExe pkgs.jujutsu} branch set main -r @-
${lib.getExe pkgs.jujutsu} git push
else
${pkgs.git}/bin/git --no-optional-locks diff --quiet || echo "Repository is dirty!"
${pkgs.git}/bin/git pull || true
# Only push if there's something to push. Also prevents from trying to push on repos where we don't have rights.
(${pkgs.git}/bin/git --no-optional-locks status --porcelain -b --ignore-submodules | grep ' \[ahead [0-9]\+\]' && ${pkgs.git}/bin/git push) || true
fi
)
fi
'')
@ -26,7 +37,8 @@
);
})
];
programs.git = {
programs = {
git = {
package = pkgs.gitFull;
aliases = {
"git" = "!exec git"; # In case I write one too many git
@ -67,6 +79,32 @@
# This escapes quotes, which isn't the case in the original, hoping this isn't an issue.
};
};
jujutsu = {
enable = true;
settings = {
git = {
auto-local-bookmark = true;
auto-local-branch = true;
};
user = {
email = cfg.userEmail;
name = cfg.userName;
};
ui = {
pager = "delta";
diff.format = "git";
diff-editor = "meld-3";
merge-editor = "meld";
};
signing = {
sign-all = true;
backend = "gpg";
inherit (cfg.signing) key;
backends.gpg.allow-expired-keys = false;
};
};
};
};
services = {
git-sync = {
enable = false; # The real thing syncs too quickly and asks for passphrase, which is annoying

View file

@ -1,9 +1,8 @@
{ pkgs, lib, config, ... }:
{ config, ... }:
{
config = {
home = {
sessionVariables = {
PAGER = "less";
LESSHISTFILE = "${config.xdg.stateHome}/lesshst";
LESS = "-R";
LESS_TERMCAP_mb = "$(echo $'\\E[1;31m')"; # begin blink

View file

@ -62,7 +62,7 @@ in
description = "Path to the password store entry";
};
selector = lib.mkOption {
type = lib.types.str;
type = lib.types.nullOr lib.types.str;
default = null;
description = "If set, will parse the password metadata as YML and use selector (yq) instead of the password.";
};

View file

@ -1,7 +1,9 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -i bash
#! nix-shell -p bash pdftk inkscape gnused coreutils file
set -euxo pipefail
# Utility to write over a PDF file pages
# TODO Inkscape vodoo: Put the original in its own layer and skip when merging

View file

@ -1,6 +1,6 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.configargparse python3Packages.filelock python3Packages.filelock python3Packages.requests python3Packages.yt-dlp ffmpeg
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.configargparse python3Packages.filelock python3Packages.requests python3Packages.yt-dlp ffmpeg
# Also needs mpv but if I put it there it's not using the configured one
@ -189,7 +189,13 @@ class RVElement:
def ytdl_infos(self) -> typing.Optional[dict]:
try:
return self.metafile_read("ytdl")
# If the metafile doesn't exist or is corrupted
except (FileNotFoundError, TypeError, AttributeError, EOFError):
# Delete file otherwise __str__ won't be happy
metafile = self.metafile("ytdl")
if os.path.isfile(metafile):
os.unlink(metafile)
infos = self._ytdl_infos()
self.metafile_write("ytdl", infos)
return infos

View file

@ -50,6 +50,7 @@ in
php = lib.mkEnableOption "PHP dev stuff";
prose = lib.mkEnableOption "Writing stuff";
python = lib.mkEnableOption "Python dev stuff";
vm = lib.mkEnableOption "Virtual machines";
};
storageSize = lib.mkOption {
default = "small";

View file

@ -76,6 +76,11 @@
# on others I sometimes turn it off when unsuspending.
logind.extraConfig = "HandlePowerKey=ignore";
udev.extraRules = ''
# DYMO LabelPOINT 350
ACTION=="add", SUBSYSTEMS=="usb", ATTRS{idVendor}=="0922", ATTRS{idProduct}=="0015", MODE="0666"
'';
};
# TODO Hibernation?

View file

@ -23,7 +23,7 @@
actions.update-props = {
"session.suspend-timeout-seconds" = 0;
"dither.method" = "wannamaker3";
"dither.noise" = 2;
"dither.noise" = 4;
};
}
];
@ -41,9 +41,11 @@
languages = [ "fr" ];
symbolsFile = "${pkgs.stdenv.mkDerivation {
name = "qwerty-fr-keypad";
src = builtins.fetchGit {
url = "https://github.com/qwerty-fr/qwerty-fr.git";
src = pkgs.fetchFromGitHub {
owner = "qwerty-fr";
repo = "qwerty-fr";
rev = "3a4d13089e8ef016aa20baf6b2bf3ea53de674b8";
sha256 = "sha256-wn5n6jJVDrQWJze8xYF2nEY8a7mHI3hVO4xsT4LMo9c=";
};
patches = [ ./qwerty-fr-keypad.diff ];
# TODO This doesn't seem to be applied... it's the whole point of the derivation :(

View file

@ -1,7 +1,25 @@
{ pkgs, lib, config, ... }:
{
config = lib.mkIf config.frogeye.dev.docker {
config = lib.mkMerge [
(lib.mkIf config.frogeye.dev.docker {
virtualisation.docker.enable = true;
users.users.geoffrey.extraGroups = [ "docker" ];
})
(lib.mkIf config.frogeye.dev.vm {
# Required to mount filesystems. Also need to add
# <binary path="/run/current-system/sw/bin/virtiofsd"/>
# to the VM's filesystem XML config
environment.systemPackages = [ pkgs.virtiofsd ];
virtualisation = {
libvirtd.enable = true;
spiceUSBRedirection.enable = true;
};
users.extraGroups.libvirtd.members = [ "geoffrey" ];
})
{
users.extraGroups.dialout.members = [ "geoffrey" ];
# Always comes in handy, not enabled by default anyways
programs.nix-ld.enable = true;
}
];
}