Compare commits

..

1 commit
main ... mopidy

Author SHA1 Message Date
Geoffrey Frogeye d58e5fc00c
mopidy attempt 2024-07-28 23:54:22 +02:00
33 changed files with 888 additions and 1183 deletions

View file

@ -1,12 +1,8 @@
{ ... }:
{ pkgs, lib, config, ... }:
{
config = {
frogeye = {
desktop.xorg = true;
dev = {
c = true;
vm = true;
};
extra = true;
};
};

View file

@ -1,11 +1,10 @@
{ pkgs, lib, config, ... }:
let
zytemp_mqtt_src = pkgs.fetchFromGitHub {
# owner = "patrislav1";
owner = "GeoffreyFrogeye";
owner = "patrislav1";
repo = "zytemp_mqtt";
rev = "push-nurpouorqoyr"; # Humidity + availability support
sha256 = "sha256-nOhyBAgvjeQh9ys3cBJOVR67SDs96zBzxIRGpaq4yoA=";
rev = "a6be5e3082e1e10dee435cfb9643fb13e9a71c34"; # PR that adds humidity
sha256 = "sha256-cMWDi20isnbB6jlMzut7YyYB4te4bVFYXSgCEQWQnts=";
};
zytemp_mqtt = pkgs.python3Packages.buildPythonPackage
rec {

View file

@ -145,9 +145,6 @@ class Desk:
delta_s = now - self.last_est
if delta_s > self.MAX_EST_INTERVAL:
# Attempt at fixing the issue of
# the service not working after the night
self._initialize()
self.log.warning(
"Too long without getting a report, "
"assuming the desk might be anywhere now."
@ -265,7 +262,7 @@ class Desk:
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
desk = Desk()

View file

@ -5,12 +5,17 @@
xorg = true;
};
dev = {
c = true;
docker = true;
vm = true;
};
extra = true;
gaming = true;
storageSize = "big";
};
# Shenanigans
# nixpkgs.config.allowUnfree = true;
# virtualisation.virtualbox.host.enable = true;
# virtualisation.virtualbox.host.enableExtensionPack = true;
# users.extraGroups.vboxusers.members = [ "geoffrey" ];
# TODO Convert Windows VM from virtualbox to libvirt
}

View file

@ -1,4 +1,4 @@
{ pkgs, lib, nixos-hardware, unixpkgs, ... }:
{ pkgs, lib, config, nixos-hardware, displaylinknixpkgs, ... }:
let
displays = {
embedded = {
@ -65,10 +65,7 @@ in
};
nixpkgs.overlays = [
(self: super: {
displaylink = (import unixpkgs {
inherit (super) system;
config.allowUnfree = true;
}).displaylink;
displaylink = (import displaylinknixpkgs { inherit (super) system; config.allowUnfree = true; }).displaylink;
})
];
services = {

View file

@ -9,6 +9,5 @@
'';
interfaces.enp3s0.wakeOnLan.enable = true;
};
services.tlp.settings.WOL_DISABLE = false;
};
}

View file

@ -116,17 +116,18 @@
},
"devshell": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": [
"nixvim",
"nixpkgs"
]
},
"locked": {
"lastModified": 1722113426,
"narHash": "sha256-Yo/3loq572A8Su6aY5GP56knpuKYRvM2a1meP9oJZCw=",
"lastModified": 1717408969,
"narHash": "sha256-Q0OEFqe35fZbbRPPRdrjTUUChKVhhWXz3T9ZSKmaoVY=",
"owner": "numtide",
"repo": "devshell",
"rev": "67cce7359e4cd3c45296fb4aaf6a19e2a9c757ae",
"rev": "1ebbe68d57457c8cae98145410b164b5477761f4",
"type": "github"
},
"original": {
@ -142,11 +143,11 @@
]
},
"locked": {
"lastModified": 1727531434,
"narHash": "sha256-b+GBgCWd2N6pkiTkRZaMFOPztPO4IVTaclYPrQl2uLk=",
"lastModified": 1719864345,
"narHash": "sha256-e4Pw+30vFAxuvkSTaTypd9zYemB/QlWcH186dsGT+Ms=",
"owner": "nix-community",
"repo": "disko",
"rev": "b709e1cc33fcde71c7db43850a55ebe6449d0959",
"rev": "544a80a69d6e2da04e4df7ec8210a858de8c7533",
"type": "github"
},
"original": {
@ -154,6 +155,22 @@
"type": "indirect"
}
},
"displaylinknixpkgs": {
"locked": {
"lastModified": 1717533296,
"narHash": "sha256-TOxOpOYy/tQB+eYQOTPQXNeUmkMghLVDBO0Gc2nj/vs=",
"owner": "GeoffreyFrogeye",
"repo": "nixpkgs",
"rev": "99006b6f4cd24796b1ff6b6981b8f44c9cebd301",
"type": "github"
},
"original": {
"owner": "GeoffreyFrogeye",
"ref": "displaylink-600",
"repo": "nixpkgs",
"type": "github"
}
},
"flake-compat": {
"locked": {
"lastModified": 1696426674,
@ -208,11 +225,11 @@
]
},
"locked": {
"lastModified": 1725234343,
"narHash": "sha256-+ebgonl3NbiKD2UD0x4BszCZQ6sTfL4xioaM49o5B3Y=",
"lastModified": 1719877454,
"narHash": "sha256-g5N1yyOSsPNiOlFfkuI/wcUjmtah+nxdImJqrSATjOU=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "567b938d64d4b4112ee253b9274472dc3a346eb6",
"rev": "4e3583423212f9303aa1a6337f8dffb415920e4f",
"type": "github"
},
"original": {
@ -226,11 +243,29 @@
"systems": "systems"
},
"locked": {
"lastModified": 1726560853,
"narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"type": "github"
},
"original": {
@ -269,11 +304,11 @@
]
},
"locked": {
"lastModified": 1724857454,
"narHash": "sha256-Qyl9Q4QMTLZnnBb/8OuQ9LSkzWjBU1T5l5zIzTxkkhk=",
"lastModified": 1719259945,
"narHash": "sha256-F1h+XIsGKT9TkGO3omxDLEb/9jOOsI6NnzsXFsZhry4=",
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "4509ca64f1084e73bc7a721b20c669a8d4c5ebe6",
"rev": "0ff4381bbb8f7a52ca4a851660fc7a437a4c6e07",
"type": "github"
},
"original": {
@ -328,11 +363,11 @@
]
},
"locked": {
"lastModified": 1726989464,
"narHash": "sha256-Vl+WVTJwutXkimwGprnEtXc/s/s8sMuXzqXaspIGlwM=",
"lastModified": 1719827385,
"narHash": "sha256-qs+nU20Sm8czHg3bhGCqiH+8e13BJyRrKONW34g3i50=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "2f23fa308a7c067e52dfcc30a0758f47043ec176",
"rev": "391ca6e950c2525b4f853cbe29922452c14eda82",
"type": "github"
},
"original": {
@ -349,11 +384,11 @@
]
},
"locked": {
"lastModified": 1720042825,
"narHash": "sha256-A0vrUB6x82/jvf17qPCpxaM+ulJnD8YZwH9Ci0BsAzE=",
"lastModified": 1719827385,
"narHash": "sha256-qs+nU20Sm8czHg3bhGCqiH+8e13BJyRrKONW34g3i50=",
"owner": "nix-community",
"repo": "home-manager",
"rev": "e1391fb22e18a36f57e6999c7a9f966dc80ac073",
"rev": "391ca6e950c2525b4f853cbe29922452c14eda82",
"type": "github"
},
"original": {
@ -392,11 +427,11 @@
]
},
"locked": {
"lastModified": 1725189302,
"narHash": "sha256-IhXok/kwQqtusPsoguQLCHA+h6gKvgdCrkhIaN+kByA=",
"lastModified": 1719845423,
"narHash": "sha256-ZLHDmWAsHQQKnmfyhYSHJDlt8Wfjv6SQhl2qek42O7A=",
"owner": "lnl7",
"repo": "nix-darwin",
"rev": "7c4b53a7d9f3a3df902b3fddf2ae245ef20ebcda",
"rev": "ec12b88104d6c117871fad55e931addac4626756",
"type": "github"
},
"original": {
@ -411,10 +446,7 @@
"nix-on-droid",
"nixpkgs"
],
"nmd": [
"nix-on-droid",
"nmd"
],
"nmd": "nmd",
"nmt": "nmt"
},
"locked": {
@ -442,14 +474,14 @@
],
"nixpkgs-docs": "nixpkgs-docs",
"nixpkgs-for-bootstrap": "nixpkgs-for-bootstrap",
"nmd": "nmd"
"nmd": "nmd_2"
},
"locked": {
"lastModified": 1725658585,
"narHash": "sha256-P29z4Gt89n5ps1U7+qmIrj0BuRXGZQSIaOe2+tsPgfw=",
"lastModified": 1719772177,
"narHash": "sha256-XJOxCLWQUn+3VQTQGRwMoz6nDjT+GoL0f4mVrUQdzfk=",
"owner": "nix-community",
"repo": "nix-on-droid",
"rev": "5d88ff2519e4952f8d22472b52c531bb5f1635fc",
"rev": "45fcd2da39a70a96752e17f85d7a843b3c4f67f4",
"type": "github"
},
"original": {
@ -460,11 +492,11 @@
},
"nixos-hardware": {
"locked": {
"lastModified": 1727665282,
"narHash": "sha256-oKtfbQB1MBypqIyzkC8QCQcVGOa1soaXaGgcBIoh14o=",
"lastModified": 1719895800,
"narHash": "sha256-xNbjISJTFailxass4LmdWeV4jNhAlmJPwj46a/GxE6M=",
"owner": "NixOS",
"repo": "nixos-hardware",
"rev": "11c43c830e533dad1be527ecce379fcf994fbbb5",
"rev": "6e253f12b1009053eff5344be5e835f604bb64cd",
"type": "github"
},
"original": {
@ -474,11 +506,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1727672256,
"narHash": "sha256-9/79hjQc9+xyH+QxeMcRsA6hDyw6Z9Eo1/oxjvwirLk=",
"lastModified": 1719838683,
"narHash": "sha256-Zw9rQjHz1ilNIimEXFeVa1ERNRBF8DoXDhLAZq5B4pE=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "1719f27dd95fd4206afb9cec9f415b539978827e",
"rev": "d032c1a6dfad4eedec7e35e91986becc699d7d69",
"type": "github"
},
"original": {
@ -505,17 +537,17 @@
},
"nixpkgs-for-bootstrap": {
"locked": {
"lastModified": 1720244366,
"narHash": "sha256-WrDV0FPMVd2Sq9hkR5LNHudS3OSMmUrs90JUTN+MXpA=",
"lastModified": 1708105575,
"narHash": "sha256-sS4AItZeUnAei6v8FqxNlm+/27MPlfoGym/TZP0rmH0=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "49ee0e94463abada1de470c9c07bfc12b36dcf40",
"rev": "1d1817869c47682a6bee85b5b0a6537b6c0fba26",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "49ee0e94463abada1de470c9c07bfc12b36dcf40",
"rev": "1d1817869c47682a6bee85b5b0a6537b6c0fba26",
"type": "github"
}
},
@ -533,11 +565,11 @@
"treefmt-nix": "treefmt-nix"
},
"locked": {
"lastModified": 1725350106,
"narHash": "sha256-TaMMlI2KPJ3wCyxJk6AShOLhNuTeabHCnvYRkLBlEFs=",
"lastModified": 1719923896,
"narHash": "sha256-/hfE2x9NbT13d53o9uq6MuMipV19pJUQzpsZIhlvsiM=",
"owner": "nix-community",
"repo": "nixvim",
"rev": "0f2c31e6a57a83ed4e6fa3adc76749620231055d",
"rev": "d384cf656cb1b21d90eee1b007a6ade6f90768f5",
"type": "github"
},
"original": {
@ -548,6 +580,22 @@
}
},
"nmd": {
"flake": false,
"locked": {
"lastModified": 1666190571,
"narHash": "sha256-Z1hc7M9X6L+H83o9vOprijpzhTfOBjd0KmUTnpHAVjA=",
"owner": "rycee",
"repo": "nmd",
"rev": "b75d312b4f33bd3294cd8ae5c2ca8c6da2afc169",
"type": "gitlab"
},
"original": {
"owner": "rycee",
"repo": "nmd",
"type": "gitlab"
}
},
"nmd_2": {
"inputs": {
"nixpkgs": [
"nix-on-droid",
@ -587,11 +635,11 @@
},
"nur": {
"locked": {
"lastModified": 1727779756,
"narHash": "sha256-KLeROOi6VYct8lP1TIAPABlKOsisecKZLOozD5W54PE=",
"lastModified": 1719925604,
"narHash": "sha256-cLmqi+P1sn+7497GS4fNWayoTDa0ZiJecjmEM4iQN9U=",
"owner": "nix-community",
"repo": "NUR",
"rev": "01c22fb0d5c07a4a2f9ffc4b132cfa4ee4e1cce2",
"rev": "574405811547dcec59e912c5e82bfd224648bd5e",
"type": "github"
},
"original": {
@ -603,6 +651,7 @@
"root": {
"inputs": {
"disko": "disko",
"displaylinknixpkgs": "displaylinknixpkgs",
"flake-utils": "flake-utils",
"home-manager": "home-manager",
"nix-on-droid": "nix-on-droid",
@ -610,8 +659,7 @@
"nixpkgs": "nixpkgs",
"nixvim": "nixvim",
"nur": "nur",
"stylix": "stylix",
"unixpkgs": "unixpkgs"
"stylix": "stylix"
}
},
"scss-reset": {
@ -676,6 +724,21 @@
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
@ -684,11 +747,11 @@
]
},
"locked": {
"lastModified": 1724833132,
"narHash": "sha256-F4djBvyNRAXGusJiNYInqR6zIMI3rvlp6WiKwsRISos=",
"lastModified": 1719887753,
"narHash": "sha256-p0B2r98UtZzRDM5miGRafL4h7TwGRC4DII+XXHDHqek=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "3ffd842a5f50f435d3e603312eefa4790db46af5",
"rev": "bdb6355009562d8f9313d9460c0d3860f525bc6c",
"type": "github"
},
"original": {
@ -696,21 +759,6 @@
"repo": "treefmt-nix",
"type": "github"
}
},
"unixpkgs": {
"locked": {
"lastModified": 1727785308,
"narHash": "sha256-t2PqANZPqbtiqOjiQFaHEXuMeG1laa1g+4OcQuo+MjE=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "1c9c0eabb80d35c24eb4e7968c9ee15641a3e0fd",
"type": "github"
},
"original": {
"id": "nixpkgs",
"ref": "master",
"type": "indirect"
}
}
},
"root": "root",

View file

@ -4,7 +4,7 @@
inputs = {
# Packages
nixpkgs.url = "nixpkgs/nixos-24.05";
unixpkgs.url = "nixpkgs/master";
displaylinknixpkgs.url = "github:GeoffreyFrogeye/nixpkgs/displaylink-600";
# OS
disko = {
url = "disko";

View file

@ -190,7 +190,6 @@
usbutils
dmidecode
lshw
labelle # Label printer
# Locker
(pkgs.writeShellApplication {

View file

@ -8,7 +8,7 @@
./frobar/module.nix
./i3.nix
./lock
./mpd
./mopidy
./presentation
./redness
./screenshots
@ -52,7 +52,7 @@
hwdec = "auto-safe";
profile = "gpu-hq";
};
scripts = with pkgs.mpvScripts; [ thumbnail mpris ];
scripts = with pkgs.mpvScripts; [ thumbnail ];
scriptOpts = {
mpv_thumbnail_script = {
autogenerate = false; # TODO It creates too many processes at once, crashing the system

94
hm/desktop/frobar/.dev/barng.py Executable file
View file

@ -0,0 +1,94 @@
#!/usr/bin/env python3
import typing
import subprocess
import time
# CORE
class Notifier:
pass
class Section:
def __init__(self) -> None:
self.text = b"(Loading)"
class Module:
def __init__(self) -> None:
self.bar: "Bar"
self.section = Section()
self.sections = [self.section]
class Alignment:
def __init__(self, *modules: Module) -> None:
self.bar: "Bar"
self.modules = modules
for module in modules:
module.bar = self.bar
class Screen:
def __init__(self, left: Alignment = Alignment(), right: Alignment = Alignment()) -> None:
self.bar: "Bar"
self.left = left
self.left.bar = self.bar
self.right = right or Alignment()
self.right.bar = self.bar
class Bar:
def __init__(self, *screens: Screen) -> None:
self.screens = screens
for screen in screens:
screen.bar = self
self.process = subprocess.Popen(["lemonbar"], stdin=subprocess.PIPE)
def display(self) -> None:
string = b""
for s, screen in enumerate(self.screens):
string += b"%%{S%d}" % s
for control, alignment in [(b'%{l}', screen.left), (b'%{r}', screen.right)]:
string += control
for module in alignment.modules:
for section in module.sections:
string += b"<%b> |" % section.text
string += b"\n"
print(string)
assert self.process.stdin
self.process.stdin.write(string)
self.process.stdin.flush()
def run(self) -> None:
while True:
self.display()
time.sleep(1)
# REUSABLE
class ClockNotifier(Notifier):
def run(self) -> None:
while True:
def __init__(self, text: bytes):
super().__init__()
self.section.text = text
class StaticModule(Module):
def __init__(self, text: bytes):
super().__init__()
self.section.text = text
# USER
if __name__ == "__main__":
bar = Bar(
Screen(Alignment(StaticModule(b"A"))),
Screen(Alignment(StaticModule(b"B"))),
)
bar.run()

View file

@ -1,759 +0,0 @@
#!/usr/bin/env python3
import asyncio
import datetime
import enum
import ipaddress
import logging
import random
import signal
import socket
import typing
import coloredlogs
import i3ipc
import i3ipc.aio
import psutil
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
T = typing.TypeVar("T", bound="ComposableText")
P = typing.TypeVar("P", bound="ComposableText")
C = typing.TypeVar("C", bound="ComposableText")
Sortable = str | int
def humanSize(numi: int) -> str:
"""
Returns a string of width 3+3
"""
num = float(numi)
for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"):
if abs(num) < 1000:
if num >= 10:
return "{:3d}{}".format(int(num), unit)
else:
return "{:.1f}{}".format(num, unit)
num /= 1024
return "{:d}YiB".format(numi)
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
def setParent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent
self.parent.updateMarkup()
def unsetParent(self) -> None:
assert self.parent
self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent = None
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
parent = self
while not isinstance(parent, typ):
assert parent.parent, f"{self} doesn't have a parent of {typ}"
parent = parent.parent
return parent
def updateMarkup(self) -> None:
self.bar.refresh.set()
# TODO OPTI See if worth caching the output
def generateMarkup(self) -> str:
raise NotImplementedError(f"{self} cannot generate markup")
def getMarkup(self) -> str:
return self.generateMarkup()
def randomColor(seed: int | bytes | None = None) -> str:
if seed is not None:
random.seed(seed)
return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3))
class Button(enum.Enum):
CLICK_LEFT = "1"
CLICK_MIDDLE = "2"
CLICK_RIGHT = "3"
SCROLL_UP = "4"
SCROLL_DOWN = "5"
class Section(ComposableText):
"""
Colorable block separated by chevrons
"""
def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
self.parent: "Module"
self.color = randomColor()
self.desiredText: str | None = None
self.text = ""
self.targetSize = -1
self.size = -1
self.animationTask: asyncio.Task | None = None
self.actions: dict[Button, str] = dict()
def isHidden(self) -> bool:
return self.size < 0
# Geometric series, with a cap
ANIM_A = 0.025
ANIM_R = 0.9
ANIM_MIN = 0.001
async def animate(self) -> None:
increment = 1 if self.size < self.targetSize else -1
loop = asyncio.get_running_loop()
frameTime = loop.time()
animTime = self.ANIM_A
while self.size != self.targetSize:
self.size += increment
self.updateMarkup()
animTime *= self.ANIM_R
animTime = max(self.ANIM_MIN, animTime)
frameTime += animTime
sleepTime = frameTime - loop.time()
# In case of stress, skip refreshing by not awaiting
if sleepTime > 0:
await asyncio.sleep(sleepTime)
else:
log.warning("Skipped an animation frame")
def setText(self, text: str | None) -> None:
# OPTI Don't redraw nor reset animation if setting the same text
if self.desiredText == text:
return
self.desiredText = text
if text is None:
self.text = ""
self.targetSize = -1
else:
self.text = f" {text} "
self.targetSize = len(self.text)
if self.animationTask:
self.animationTask.cancel()
# OPTI Skip the whole animation task if not required
if self.size == self.targetSize:
self.updateMarkup()
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate())
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.removeAction(command)
del self.actions[button]
if callback:
command = self.bar.addAction(callback)
self.actions[button] = command
def generateMarkup(self) -> str:
assert not self.isHidden()
pad = max(0, self.size - len(self.text))
text = self.text[: self.size] + " " * pad
for button, command in self.actions.items():
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
return text
class Module(ComposableText):
"""
Sections handled by a same updater
"""
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
self.parent: "Side"
self.children: typing.MutableSequence[Section]
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
def mirror(self, module: "Module") -> None:
self.mirroring = module
module.mirrors.append(self)
def getSections(self) -> typing.Sequence[Section]:
if self.mirroring:
return self.mirroring.children
else:
return self.children
def updateMarkup(self) -> None:
super().updateMarkup()
for mirror in self.mirrors:
mirror.updateMarkup()
class Alignment(enum.Enum):
LEFT = "l"
RIGHT = "r"
CENTER = "c"
class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent)
self.parent: Screen
self.children: typing.MutableSequence[Module] = []
self.alignment = alignment
def generateMarkup(self) -> str:
if not self.children:
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
for module in self.children:
for section in module.getSections():
if section.isHidden():
continue
if lastSection is None:
if self.alignment == Alignment.LEFT:
text += "%{B" + section.color + "}%{F-}"
else:
text += "%{B-}%{F" + section.color + "}%{R}%{F-}"
else:
if self.alignment == Alignment.RIGHT:
if lastSection.color == section.color:
text += ""
else:
text += "%{F" + section.color + "}%{R}"
else:
if lastSection.color == section.color:
text += ""
else:
text += "%{R}%{B" + section.color + "}"
text += "%{F-}"
text += section.getMarkup()
lastSection = section
if self.alignment != Alignment.RIGHT:
text += "%{R}%{B-}"
return text
class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent)
self.parent: "Bar"
self.children: typing.MutableSequence[Side]
self.output = output
for alignment in Alignment:
Side(parent=self, alignment=alignment)
def generateMarkup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join(
side.getMarkup() for side in self.children
)
class Bar(ComposableText):
"""
Top-level
"""
def __init__(self) -> None:
super().__init__()
self.parent: None
self.children: typing.MutableSequence[Screen]
self.longRunningTasks: list[asyncio.Task] = list()
self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list()
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
self.periodicProviderTask: typing.Coroutine | None = None
i3 = i3ipc.Connection()
for output in i3.get_outputs():
if not output.active:
continue
Screen(parent=self, output=output.name)
def addLongRunningTask(self, coro: typing.Coroutine) -> None:
task = self.taskGroup.create_task(coro)
self.longRunningTasks.append(task)
async def run(self) -> None:
cmd = [
"lemonbar",
"-b",
"-a",
"64",
"-f",
"DejaVuSansM Nerd Font:size=10",
]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
)
async def refresher() -> None:
assert proc.stdin
while True:
await self.refresh.wait()
self.refresh.clear()
markup = self.getMarkup()
proc.stdin.write(markup.encode())
async def actionHandler() -> None:
assert proc.stdout
while True:
line = await proc.stdout.readline()
command = line.decode().strip()
callback = self.actions[command]
callback()
async with self.taskGroup:
self.addLongRunningTask(refresher())
self.addLongRunningTask(actionHandler())
for provider in self.providers:
self.addLongRunningTask(provider.run())
def exit() -> None:
log.info("Terminating")
for task in self.longRunningTasks:
task.cancel()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, exit)
def generateMarkup(self) -> str:
return "".join(screen.getMarkup() for screen in self.children) + "\n"
def addProvider(
self,
provider: "Provider",
alignment: Alignment = Alignment.LEFT,
screenNum: int | None = None,
) -> None:
"""
screenNum: the provider will be added on this screen if set, all otherwise
"""
modules = list()
for s, screen in enumerate(self.children):
if screenNum is None or s == screenNum:
side = next(filter(lambda s: s.alignment == alignment, screen.children))
module = Module(parent=side)
modules.append(module)
provider.modules = modules
if modules:
self.providers.append(provider)
def addAction(self, callback: typing.Callable) -> str:
command = f"{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def removeAction(self, command: str) -> None:
del self.actions[command]
class Provider:
def __init__(self) -> None:
self.modules: list[Module] = list()
async def run(self) -> None:
# Not a NotImplementedError, otherwise can't combine all classes
pass
class MirrorProvider(Provider):
def __init__(self) -> None:
super().__init__()
self.module: Module
async def run(self) -> None:
await super().run()
self.module = self.modules[0]
for module in self.modules[1:]:
module.mirror(self.module)
class SingleSectionProvider(MirrorProvider):
async def run(self) -> None:
await super().run()
self.section = Section(parent=self.module)
class StaticProvider(SingleSectionProvider):
def __init__(self, text: str) -> None:
self.text = text
async def run(self) -> None:
await super().run()
self.section.setText(self.text)
class StatefulSection(Section):
def __init__(self, parent: Module, sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
self.state = 0
self.numberStates: int
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
def incrementState(self) -> None:
self.state += 1
self.changeState()
def decrementState(self) -> None:
self.state -= 1
self.changeState()
def setChangedState(self, callback: typing.Callable) -> None:
self.callback = callback
def changeState(self) -> None:
self.state %= self.numberStates
self.bar.taskGroup.create_task(self.callback())
class SingleStatefulSectionProvider(MirrorProvider):
async def run(self) -> None:
await super().run()
self.section = StatefulSection(parent=self.module)
class PeriodicProvider(Provider):
async def init(self) -> None:
pass
async def loop(self) -> None:
raise NotImplementedError()
@classmethod
async def task(cls, bar: Bar) -> None:
providers = list()
for provider in bar.providers:
if isinstance(provider, PeriodicProvider):
providers.append(provider)
await provider.init()
while True:
# TODO Block bar update during the periodic update of the loops
loops = [provider.loop() for provider in providers]
asyncio.gather(*loops)
now = datetime.datetime.now()
# Hardcoded to 1 second... not sure if we want some more than that,
# and if the logic to check if a task should run would be a win
# compared to the task itself
remaining = 1 - now.microsecond / 1000000
await asyncio.sleep(remaining)
async def run(self) -> None:
await super().run()
for module in self.modules:
bar = module.getFirstParentOfType(Bar)
assert bar
if not bar.periodicProviderTask:
bar.periodicProviderTask = PeriodicProvider.task(bar)
bar.addLongRunningTask(bar.periodicProviderTask)
class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider):
async def run(self) -> None:
await super().run()
self.section.setChangedState(self.loop)
# Providers
class I3ModeProvider(SingleSectionProvider):
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(None if e.change == "default" else e.change)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.MODE, self.on_mode)
await i3.main()
class I3WindowTitleProvider(SingleSectionProvider):
# TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(e.container.name)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WINDOW, self.on_window)
await i3.main()
class I3WorkspacesProvider(Provider):
# TODO Custom names
# TODO Colors
async def updateWorkspaces(self, i3: i3ipc.Connection) -> None:
"""
Since the i3 IPC interface cannot really tell you by events
when workspaces get invisible or not urgent anymore.
Relying on those exclusively would require reimplementing some of i3 logic.
Fetching all the workspaces on event looks ugly but is the most maintainable.
Times I tried to challenge this and failed: 2.
"""
workspaces = await i3.get_workspaces()
for workspace in workspaces:
module = self.modulesFromOutput[workspace.output]
if workspace.num in self.sections:
section = self.sections[workspace.num]
if section.parent != module:
section.unsetParent()
section.setParent(module)
else:
section = Section(parent=module, sortKey=workspace.num)
self.sections[workspace.num] = section
def generate_switch_workspace(num: int) -> typing.Callable:
def switch_workspace() -> None:
self.bar.taskGroup.create_task(
i3.command(f"workspace number {num}")
)
return switch_workspace
section.setAction(
Button.CLICK_LEFT, generate_switch_workspace(workspace.num)
)
name = workspace.name
if workspace.urgent:
name = f"{name} !"
elif workspace.focused:
name = f"{name} +"
elif workspace.visible:
name = f"{name} *"
section.setText(name)
workspacesNums = set(workspace.num for workspace in workspaces)
for num, section in self.sections.items():
if num not in workspacesNums:
# This should delete the Section but it turned out to be hard
section.setText(None)
def onWorkspaceChange(
self, i3: i3ipc.Connection, e: i3ipc.Event | None = None
) -> None:
# Cancelling the task doesn't seem to prevent performance double-events
self.bar.taskGroup.create_task(self.updateWorkspaces(i3))
def __init__(
self,
) -> None:
super().__init__()
self.sections: dict[int, Section] = dict()
self.modulesFromOutput: dict[str, Module] = dict()
self.bar: Bar
async def run(self) -> None:
for module in self.modules:
screen = module.getFirstParentOfType(Screen)
output = screen.output
self.modulesFromOutput[output] = module
self.bar = module.bar
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange)
self.onWorkspaceChange(i3)
await i3.main()
class NetworkProviderSection(StatefulSection):
def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None:
super().__init__(parent=parent, sortKey=iface)
self.iface = iface
self.provider = provider
self.ignore = False
self.icon = "?"
self.wifi = False
if iface == "lo":
self.ignore = True
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
self.icon = ""
else:
self.icon = ""
elif iface.startswith("wlan") or iface.startswith("wl"):
self.icon = ""
self.wifi = True
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
):
self.icon = ""
elif iface.startswith("docker"):
self.icon = ""
elif iface.startswith("veth"):
self.icon = ""
elif iface.startswith("vboxnet"):
self.icon = ""
self.numberStates = 5 if self.wifi else 4
self.state = 1 if self.wifi else 0
self.setChangedState(self.update)
async def update(self) -> None:
if self.ignore or not self.provider.if_stats[self.iface].isup:
self.setText(None)
return
text = self.icon
state = self.state + (0 if self.wifi else 1) # SSID
if self.wifi and state >= 1:
cmd = ["iwgetid", self.iface, "--raw"]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
text += f" {stdout.decode().strip()}"
if state >= 2: # Address
for address in self.provider.if_addrs[self.iface]:
if address.family == socket.AF_INET:
net = ipaddress.IPv4Network(
(address.address, address.netmask), strict=False
)
text += f" {net.with_prefixlen}"
break
if state >= 3: # Speed
prevRecv = self.provider.prev_io_counters[self.iface].bytes_recv
recv = self.provider.io_counters[self.iface].bytes_recv
prevSent = self.provider.prev_io_counters[self.iface].bytes_sent
sent = self.provider.io_counters[self.iface].bytes_sent
dt = self.provider.time - self.provider.prev_time
recvDiff = (recv - prevRecv) / dt
sentDiff = (sent - prevSent) / dt
text += f"{humanSize(recvDiff)}{humanSize(sentDiff)}"
if state >= 4: # Counter
text += f"{humanSize(recv)}{humanSize(sent)}"
self.setText(text)
class NetworkProvider(MirrorProvider, PeriodicProvider):
def __init__(self) -> None:
super().__init__()
self.sections: dict[str, NetworkProviderSection] = dict()
async def init(self) -> None:
loop = asyncio.get_running_loop()
self.time = loop.time()
self.io_counters = psutil.net_io_counters(pernic=True)
async def loop(self) -> None:
loop = asyncio.get_running_loop()
async with asyncio.TaskGroup() as tg:
self.prev_io_counters = self.io_counters
self.prev_time = self.time
# On-demand would only benefit if_addrs:
# stats are used to determine display,
# and we want to keep previous io_counters
# so displaying stats is ~instant.
self.time = loop.time()
self.if_stats = psutil.net_if_stats()
self.if_addrs = psutil.net_if_addrs()
self.io_counters = psutil.net_io_counters(pernic=True)
for iface in self.if_stats:
section = self.sections.get(iface)
if not section:
section = NetworkProviderSection(
parent=self.module, iface=iface, provider=self
)
self.sections[iface] = section
tg.create_task(section.update())
for iface, section in self.sections.items():
if iface not in self.if_stats:
section.setText(None)
async def onStateChange(self, section: StatefulSection) -> None:
assert isinstance(section, NetworkProviderSection)
await section.update()
class TimeProvider(PeriodicStatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
async def init(self) -> None:
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
async def loop(self) -> None:
now = datetime.datetime.now()
format = self.FORMATS[self.section.state]
self.section.setText(now.strftime(format))
async def main() -> None:
bar = Bar()
dualScreen = len(bar.children) > 1
bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT)
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
if dualScreen:
bar.addProvider(
I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER
)
bar.addProvider(
StaticProvider(text="mpris"),
screenNum=1 if dualScreen else None,
alignment=Alignment.CENTER,
)
bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT)
bar.addProvider(
StaticProvider("pulse"),
screenNum=1 if dualScreen else None,
alignment=Alignment.RIGHT,
)
bar.addProvider(
NetworkProvider(),
screenNum=0 if dualScreen else None,
alignment=Alignment.RIGHT,
)
bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
await bar.run()
asyncio.run(main())

199
hm/desktop/frobar/.dev/oldbar.py Executable file
View file

@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
Debugging script
"""
import i3ipc
import os
import psutil
# import alsaaudio
from time import time
import subprocess
i3 = i3ipc.Connection()
lemonbar = subprocess.Popen(["lemonbar", "-b"], stdin=subprocess.PIPE)
# Utils
def upChart(p):
block = " ▁▂▃▄▅▆▇█"
return block[round(p * (len(block) - 1))]
def humanSizeOf(num, suffix="B"): # TODO Credit
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.0f%2s%s" % (num, unit, suffix)
num /= 1024.0
return "%.0f%2s%s" % (num, "Yi", suffix)
# Values
mode = ""
container = i3.get_tree().find_focused()
workspaces = i3.get_workspaces()
outputs = i3.get_outputs()
username = os.environ["USER"]
hostname = os.environ["HOSTNAME"]
if "-" in hostname:
hostname = hostname.split("-")[-1]
oldNetIO = dict()
oldTime = time()
def update():
activeOutputs = sorted(
sorted(list(filter(lambda o: o.active, outputs)), key=lambda o: o.rect.y),
key=lambda o: o.rect.x,
)
z = ""
for aOutput in range(len(activeOutputs)):
output = activeOutputs[aOutput]
# Mode || Workspaces
t = []
if mode != "":
t.append(mode)
else:
t.append(
" ".join(
[
(w.name.upper() if w.focused else w.name)
for w in workspaces
if w.output == output.name
]
)
)
# Windows Title
# if container:
# t.append(container.name)
# CPU
t.append(
"C" + "".join([upChart(p / 100) for p in psutil.cpu_percent(percpu=True)])
)
# Memory
t.append(
"M"
+ str(round(psutil.virtual_memory().percent))
+ "% "
+ "S"
+ str(round(psutil.swap_memory().percent))
+ "%"
)
# Disks
d = []
for disk in psutil.disk_partitions():
e = ""
if disk.device.startswith("/dev/sd"):
e += "S" + disk.device[-2:].upper()
elif disk.device.startswith("/dev/mmcblk"):
e += "M" + disk.device[-3] + disk.device[-1]
else:
e += "?"
e += " "
e += str(round(psutil.disk_usage(disk.mountpoint).percent)) + "%"
d.append(e)
t.append(" ".join(d))
# Network
netStats = psutil.net_if_stats()
netIO = psutil.net_io_counters(pernic=True)
net = []
for iface in filter(lambda i: i != "lo" and netStats[i].isup, netStats.keys()):
s = ""
if iface.startswith("eth"):
s += "E"
elif iface.startswith("wlan"):
s += "W"
else:
s += "?"
s += " "
now = time()
global oldNetIO, oldTime
sent = (
(oldNetIO[iface].bytes_sent if iface in oldNetIO else 0)
- (netIO[iface].bytes_sent if iface in netIO else 0)
) / (oldTime - now)
recv = (
(oldNetIO[iface].bytes_recv if iface in oldNetIO else 0)
- (netIO[iface].bytes_recv if iface in netIO else 0)
) / (oldTime - now)
s += (
""
+ humanSizeOf(abs(recv), "B/s")
+ ""
+ humanSizeOf(abs(sent), "B/s")
)
oldNetIO = netIO
oldTime = now
net.append(s)
t.append(" ".join(net))
# Battery
if os.path.isdir("/sys/class/power_supply/BAT0"):
with open("/sys/class/power_supply/BAT0/charge_now") as f:
charge_now = int(f.read())
with open("/sys/class/power_supply/BAT0/charge_full_design") as f:
charge_full = int(f.read())
t.append("B" + str(round(100 * charge_now / charge_full)) + "%")
# Volume
# t.append('V ' + str(alsaaudio.Mixer('Master').getvolume()[0]) + '%')
t.append(username + "@" + hostname)
# print(' - '.join(t))
# t = [output.name]
z += " - ".join(t) + "%{S" + str(aOutput + 1) + "}"
# lemonbar.stdin.write(bytes(' - '.join(t), 'utf-8'))
# lemonbar.stdin.write(bytes('%{S' + str(aOutput + 1) + '}', 'utf-8'))
lemonbar.stdin.write(bytes(z + "\n", "utf-8"))
lemonbar.stdin.flush()
# Event listeners
def on_mode(i3, e):
global mode
if e.change == "default":
mode = ""
else:
mode = e.change
update()
i3.on("mode", on_mode)
# def on_window_focus(i3, e):
# global container
# container = e.container
# update()
#
# i3.on("window::focus", on_window_focus)
def on_workspace_focus(i3, e):
global workspaces
workspaces = i3.get_workspaces()
update()
i3.on("workspace::focus", on_workspace_focus)
# Starting
update()
i3.main()

327
hm/desktop/frobar/.dev/pip.py Executable file
View file

@ -0,0 +1,327 @@
#!/usr/bin/env python3
"""
Beautiful script
"""
import subprocess
import time
import datetime
import os
import multiprocessing
import i3ipc
import difflib
# Constants
FONT = "DejaVuSansMono Nerd Font Mono"
# TODO Update to be in sync with base16
thm = [
"#002b36",
"#dc322f",
"#859900",
"#b58900",
"#268bd2",
"#6c71c4",
"#2aa198",
"#93a1a1",
"#657b83",
"#dc322f",
"#859900",
"#b58900",
"#268bd2",
"#6c71c4",
"#2aa198",
"#fdf6e3",
]
fg = "#93a1a1"
bg = "#002b36"
THEMES = {
"CENTER": (fg, bg),
"DEFAULT": (thm[0], thm[8]),
"1": (thm[0], thm[9]),
"2": (thm[0], thm[10]),
"3": (thm[0], thm[11]),
"4": (thm[0], thm[12]),
"5": (thm[0], thm[13]),
"6": (thm[0], thm[14]),
"7": (thm[0], thm[15]),
}
# Utils
def fitText(text, size):
"""
Add spaces or cut a string to be `size` characters long
"""
if size > 0:
t = len(text)
if t >= size:
return text[:size]
else:
diff = size - t
return text + " " * diff
else:
return ""
def fgColor(theme):
global THEMES
return THEMES[theme][0]
def bgColor(theme):
global THEMES
return THEMES[theme][1]
class Section:
def __init__(self, theme="DEFAULT"):
self.text = ""
self.size = 0
self.toSize = 0
self.theme = theme
self.visible = False
self.name = ""
def update(self, text):
if text == "":
self.toSize = 0
else:
if len(text) < len(self.text):
self.text = text + self.text[len(text) :]
else:
self.text = text
self.toSize = len(text) + 3
def updateSize(self):
"""
Set the size for the next frame of animation
Return if another frame is needed
"""
if self.toSize > self.size:
self.size += 1
elif self.toSize < self.size:
self.size -= 1
self.visible = self.size
return self.toSize == self.size
def draw(self, left=True, nextTheme="DEFAULT"):
s = ""
if self.visible:
if not left:
if self.theme == nextTheme:
s += ""
else:
s += "%{F" + bgColor(self.theme) + "}"
s += "%{B" + bgColor(nextTheme) + "}"
s += ""
s += "%{F" + fgColor(self.theme) + "}"
s += "%{B" + bgColor(self.theme) + "}"
s += " " if self.size > 1 else ""
s += fitText(self.text, self.size - 3)
s += " " if self.size > 2 else ""
if left:
if self.theme == nextTheme:
s += ""
else:
s += "%{F" + bgColor(self.theme) + "}"
s += "%{B" + bgColor(nextTheme) + "}"
s += ""
return s
# Section definition
sTime = Section("3")
hostname = os.environ["HOSTNAME"].split(".")[0]
sHost = Section("2")
sHost.update(
os.environ["USER"] + "@" + hostname.split("-")[-1] if "-" in hostname else hostname
)
# Groups definition
gLeft = []
gRight = [sTime, sHost]
# Bar handling
bar = subprocess.Popen(["lemonbar", "-f", FONT, "-b"], stdin=subprocess.PIPE)
def updateBar():
global timeLastUpdate, timeUpdate
global gLeft, gRight
global outputs
text = ""
for oi in range(len(outputs)):
output = outputs[oi]
gLeftFiltered = list(
filter(
lambda s: s.visible and (not s.output or s.output == output.name), gLeft
)
)
tLeft = ""
l = len(gLeftFiltered)
for gi in range(l):
g = gLeftFiltered[gi]
# Next visible section for transition
nextTheme = gLeftFiltered[gi + 1].theme if gi + 1 < l else "CENTER"
tLeft = tLeft + g.draw(True, nextTheme)
tRight = ""
for gi in range(len(gRight)):
g = gRight[gi]
nextTheme = "CENTER"
for gn in gRight[gi + 1 :]:
if gn.visible:
nextTheme = gn.theme
break
tRight = g.draw(False, nextTheme) + tRight
text += (
"%{l}"
+ tLeft
+ "%{r}"
+ tRight
+ "%{B"
+ bgColor("CENTER")
+ "}"
+ "%{S"
+ str(oi + 1)
+ "}"
)
bar.stdin.write(bytes(text + "\n", "utf-8"))
bar.stdin.flush()
# Values
i3 = i3ipc.Connection()
outputs = []
def on_output():
global outputs
outputs = sorted(
sorted(
list(filter(lambda o: o.active, i3.get_outputs())), key=lambda o: o.rect.y
),
key=lambda o: o.rect.x,
)
on_output()
def on_workspace_focus():
global i3
global gLeft
workspaces = i3.get_workspaces()
wNames = [w.name for w in workspaces]
sNames = [s.name for s in gLeft]
newGLeft = []
def actuate(section, workspace):
if workspace:
section.name = workspace.name
section.output = workspace.output
if workspace.visible:
section.update(workspace.name)
else:
section.update(workspace.name.split(" ")[0])
if workspace.focused:
section.theme = "4"
elif workspace.urgent:
section.theme = "1"
else:
section.theme = "6"
else:
section.update("")
section.theme = "6"
for tag, i, j, k, l in difflib.SequenceMatcher(None, sNames, wNames).get_opcodes():
if tag == "equal": # If the workspaces didn't changed
for a in range(j - i):
workspace = workspaces[k + a]
section = gLeft[i + a]
actuate(section, workspace)
newGLeft.append(section)
if tag in ("delete", "replace"): # If the workspaces were removed
for section in gLeft[i:j]:
if section.visible:
actuate(section, None)
newGLeft.append(section)
else:
del section
if tag in ("insert", "replace"): # If the workspaces were removed
for workspace in workspaces[k:l]:
section = Section()
actuate(section, workspace)
newGLeft.append(section)
gLeft = newGLeft
updateBar()
on_workspace_focus()
def i3events(i3childPipe):
global i3
# Proxy functions
def on_workspace_focus(i3, e):
global i3childPipe
i3childPipe.send("on_workspace_focus")
i3.on("workspace::focus", on_workspace_focus)
def on_output(i3, e):
global i3childPipe
i3childPipe.send("on_output")
i3.on("output", on_output)
i3.main()
i3parentPipe, i3childPipe = multiprocessing.Pipe()
i3process = multiprocessing.Process(target=i3events, args=(i3childPipe,))
i3process.start()
def updateValues():
# Time
now = datetime.datetime.now()
sTime.update(now.strftime("%x %X"))
def updateAnimation():
for s in set(gLeft + gRight):
s.updateSize()
updateBar()
lastUpdate = 0
while True:
now = time.time()
if i3parentPipe.poll():
msg = i3parentPipe.recv()
if msg == "on_workspace_focus":
on_workspace_focus()
elif msg == "on_output":
on_output()
# TODO Restart lemonbar
else:
print(msg)
updateAnimation()
if now >= lastUpdate + 1:
updateValues()
lastUpdate = now
time.sleep(0.05)

10
hm/desktop/frobar/.dev/x.py Executable file
View file

@ -0,0 +1,10 @@
#!/usr/bin/env python3
import Xlib.display
dis = Xlib.display.Display()
nb = dis.screen_count()
for s in range(nb):
print(s)

View file

@ -12,7 +12,7 @@ let
in
# Tried using pyproject.nix but mpd2 dependency wouldn't resolve,
# is called pyton-mpd2 on PyPi but mpd2 in nixpkgs.
pkgs.python3Packages.buildPythonApplication rec {
pkgs.python3Packages.buildPythonApplication {
pname = "frobar";
version = "2.0";
@ -25,8 +25,7 @@ pkgs.python3Packages.buildPythonApplication rec {
pulsectl
pyinotify
];
nativeBuildInputs = [ lemonbar ] ++ (with pkgs; [ wirelesstools playerctl ]);
makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}" ];
makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath ([ lemonbar ] ++ (with pkgs; [ wirelesstools ]))}" ];
src = ./.;
}

View file

@ -37,8 +37,7 @@ def run() -> None:
)
# TODO Middle
Bar.addSectionAll(fp.MprisProvider(theme=9), BarGroupType.LEFT)
# Bar.addSectionAll(fp.MpdProvider(theme=9), BarGroupType.LEFT)
Bar.addSectionAll(fp.MpdProvider(theme=9), BarGroupType.LEFT)
# Bar.addSectionAll(I3WindowTitleProvider(), BarGroupType.LEFT)
# TODO Computer modes

View file

@ -274,17 +274,11 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater):
sinks = []
with pulsectl.Pulse("list-sinks") as pulse:
for sink in pulse.sink_list():
if (
sink.port_active.name == "analog-output-headphones"
or sink.port_active.description == "Headphones"
):
if sink.port_active.name == "analog-output-headphones":
icon = ""
elif (
sink.port_active.name == "analog-output-speaker"
or sink.port_active.description == "Speaker"
):
elif sink.port_active.name == "analog-output-speaker":
icon = "" if sink.mute else ""
elif sink.port_active.name in ("headset-output", "headphone-output"):
elif sink.port_active.name == "headset-output":
icon = ""
else:
icon = "?"
@ -855,100 +849,3 @@ class MpdProvider(Section, ThreadedUpdater):
self.connect()
except BaseException as e:
log.error(e, exc_info=True)
class MprisProviderSection(Section, Updater):
def __init__(self, parent: "MprisProvider"):
Updater.__init__(self)
Section.__init__(self, theme=parent.theme)
self.parent = parent
class MprisProvider(Section, ThreadedUpdater):
# TODO Controls (select player at least)
# TODO Use the Python native thing for it:
# https://github.com/altdesktop/playerctl?tab=readme-ov-file#using-the-library
# TODO Make it less sucky
SECTIONS = [
"{{ playerName }} {{ status }}",
"{{ album }}",
"{{ artist }}",
"{{ duration(position) }}|{{ duration(mpris:length) }}" " {{ title }}",
]
# nf-fd icons don't work (UTF-16?)
SUBSTITUTIONS = {
"Playing": "",
"Paused": "",
"Stopped": "",
"mpd": "",
"firefox": "",
"chromium": "",
"mpv": "",
}
ICONS = {
1: "",
2: "",
3: "",
}
def __init__(self, theme: int | None = None):
ThreadedUpdater.__init__(self)
Section.__init__(self, theme)
self.line = ""
self.start()
self.sections: list[Section] = []
def fetcher(self) -> Element:
create = not len(self.sections)
populate = self.line
split = self.line.split("\t")
lastSection: Section = self
for i in range(len(self.SECTIONS)):
if create:
section = Section(theme=self.theme)
lastSection.appendAfter(section)
lastSection = section
self.sections.append(section)
else:
section = self.sections[i]
if populate:
text = split[i]
if i == 0:
for key, val in self.SUBSTITUTIONS.items():
text = text.replace(key, val)
if text:
if i in self.ICONS:
text = f"{self.ICONS[i]} {text}"
section.updateText(text)
else:
section.updateText(None)
else:
section.updateText(None)
return None
def loop(self) -> None:
cmd = [
"playerctl",
"metadata",
"--format",
"\t".join(self.SECTIONS),
"--follow",
]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
assert p.stdout
while p.poll() is None:
self.line = p.stdout.readline().decode().strip()
self.refreshData()
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
assert p.stdout
while p.poll() is None:
self.line = p.stdout.readline().decode().strip()
self.refreshData()

View file

@ -4,7 +4,6 @@ import functools
import logging
import math
import os
import subprocess
import threading
import time
@ -12,8 +11,8 @@ import coloredlogs
import i3ipc
import pyinotify
from frobar.common import notBusy
from frobar.display import Element
from frobar.common import notBusy
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()

View file

@ -0,0 +1,53 @@
{ pkgs, lib, config, ... }:
{
config = lib.mkIf config.frogeye.desktop.xorg {
home = {
packages = with pkgs; [
ashuffle # Probs don't work
mpc-cli # mpc add completion only works partially
vimpc # library straight up doesn't work
ncmpcpp # This one is fine but I don't know the shortcuts :(
];
};
services.mopidy = {
enable = true;
extensionPackages = with pkgs; [
mopidy-mpd # Compat
mopidy-iris # Web UI
mopidy-local
mopidy-jellyfin
# Break in case of emergency: (I one of those cause issues, but not sure)
mopidy-youtube
mopidy-bandcamp
mopidy-soundcloud
];
settings = {
local = {
media_dir = [ "${config.home.homeDirectory}/Musiques" ];
};
};
};
xdg = {
configFile = {
"vimpc/vimpcrc" = {
text = ''
map FF :browse<C-M>gg/
map à :set add next<C-M>a:set add end<C-M>
map @ :set add next<C-M>a:set add end<C-M>:next<C-M>
map ° D:browse<C-M>A:shuffle<C-M>:play<C-M>:playlist<C-M>
set songformat {%a - %b: %t}|{%f}$E$R $H[$H%l$H]$H
set libraryformat %n \| {%t}|{%f}$E$R $H[$H%l$H]$H
set ignorecase
set sort library
'';
};
};
};
xsession.windowManager.i3.config.keybindings =
{
"XF86AudioPrev" = "exec ${pkgs.mpc-cli}/bin/mpc prev";
"XF86AudioPlay" = "exec ${pkgs.mpc-cli}/bin/mpc toggle";
"XF86AudioNext" = "exec ${pkgs.mpc-cli}/bin/mpc next";
};
};
}

View file

@ -1,63 +0,0 @@
{ pkgs, lib, config, ... }:
{
config = lib.mkIf config.frogeye.desktop.xorg {
home = {
packages = with pkgs; [
ashuffle
mpc-cli
vimpc
playerctl
];
sessionVariables = {
MPD_PORT = "${toString config.services.mpd.network.port}";
};
};
services = {
mpd = {
enable = true;
network = {
listenAddress = "0.0.0.0"; # Can be controlled remotely, determined with firewall
startWhenNeeded = true;
};
extraConfig = ''
restore_paused "yes"
audio_output {
type "pipewire"
name "PipeWire Sound Server"
}
'';
# UPST auto audio_output ?
musicDirectory = "${config.home.homeDirectory}/Musiques";
};
# Expose mpd to mpris
# mpd-mpris also exists but is MIT and make playerctld not pick up on play/pause events
mpdris2.enable = true;
# Allow control from headset
mpris-proxy.enable = true;
# Remember the last player
playerctld.enable = true;
};
xdg = {
configFile = {
"vimpc/vimpcrc" = {
text = ''
map FF :browse<C-M>gg/
map à :set add next<C-M>a:set add end<C-M>
map @ :set add next<C-M>a:set add end<C-M>:next<C-M>
map ° D:browse<C-M>A:shuffle<C-M>:play<C-M>:playlist<C-M>
set songformat {%a - %b: %t}|{%f}$E$R $H[$H%l$H]$H
set libraryformat %n \| {%t}|{%f}$E$R $H[$H%l$H]$H
set ignorecase
set sort library
'';
};
};
};
xsession.windowManager.i3.config.keybindings =
{
"XF86AudioPrev" = "exec ${lib.getExe pkgs.playerctl} previous";
"XF86AudioPlay" = "exec ${lib.getExe pkgs.playerctl} play-pause";
"XF86AudioNext" = "exec ${lib.getExe pkgs.playerctl} next";
};
};
}

View file

@ -20,8 +20,6 @@
cmake
ddd
gdb
gnumake
valgrind
];
sessionVariables = {
CCACHE_CONFIGPATH = "${config.xdg.configHome}/ccache.conf";
@ -30,10 +28,7 @@
programs.bash.shellAliases = {
gdb = "gdb -x ${config.xdg.configHome}/gdbinit";
};
programs.nixvim.plugins = {
dap.enable = true; # Debug Adapter Protocol client
lsp.servers.clangd.enable = true;
};
programs.nixvim.plugins.dap.enable = true; # Debug Adapter Protocol client
xdg.configFile = {
"ccache.conf" = {
text = "ccache_dir = ${config.xdg.cacheHome}/ccache";

View file

@ -22,7 +22,6 @@
# nix
lix
nixpkgs-fmt
# Always on (graphical)
] ++ lib.optionals config.frogeye.desktop.xorg [
@ -54,10 +53,6 @@
] ++ lib.optionals (config.frogeye.desktop.xorg && config.frogeye.dev.fpga) [
yosys
gtkwave
# VM (graphical)
] ++ lib.optionals (config.frogeye.desktop.xorg && config.frogeye.dev.vm) [
virt-manager
];
programs.nixvim.plugins.lsp.servers = {
@ -67,16 +62,9 @@
lua-ls.enable = true; # Lua (for Neovim debugging)
perlpls.enable = config.frogeye.dev.perl; # Perl
phpactor.enable = config.frogeye.dev.php; # PHP
# Nix
nil-ls = {
nixd = {
enable = true;
settings = {
formatting.command = [ "nixpkgs-fmt" ];
nix.flake = {
autoArchive = true;
autoEvalInputs = true;
};
};
settings.formatting.command = [ "nixpkgs-fmt" ];
};
# TODO Something for SQL. sqls is deprecated, sqlls is not in Nixpkgs. Probably needs a DB connection configured anyways?
yamlls.enable = true; # YAML

View file

@ -34,11 +34,9 @@
# TODO Convert existing LaTeX documents into using Nix build system
# texlive is big and not that much used, sooo
pdftk
pdfgrep
# Misc
haskellPackages.dice
rustdesk-flutter
] ++ lib.optionals config.frogeye.desktop.xorg [

View file

@ -1,9 +1,6 @@
{ pkgs, lib, config, ... }:
let
cfg = config.programs.git;
in
{
config = lib.mkIf cfg.enable {
config = lib.mkIf config.programs.git.enable {
home.packages = [
(pkgs.writeShellApplication {
name = "git-sync";
@ -17,18 +14,10 @@ in
else
(
cd "${r.path}"
if [ -d .jj ]
then
${lib.getExe pkgs.jujutsu} git fetch
${lib.getExe pkgs.jujutsu} rebase -d main@origin
${lib.getExe pkgs.jujutsu} branch set main -r @-
${lib.getExe pkgs.jujutsu} git push
else
${pkgs.git}/bin/git --no-optional-locks diff --quiet || echo "Repository is dirty!"
${pkgs.git}/bin/git pull || true
# Only push if there's something to push. Also prevents from trying to push on repos where we don't have rights.
(${pkgs.git}/bin/git --no-optional-locks status --porcelain -b --ignore-submodules | grep ' \[ahead [0-9]\+\]' && ${pkgs.git}/bin/git push) || true
fi
)
fi
'')
@ -37,8 +26,7 @@ in
);
})
];
programs = {
git = {
programs.git = {
package = pkgs.gitFull;
aliases = {
"git" = "!exec git"; # In case I write one too many git
@ -79,32 +67,6 @@ in
# This escapes quotes, which isn't the case in the original, hoping this isn't an issue.
};
};
jujutsu = {
enable = true;
settings = {
git = {
auto-local-bookmark = true;
auto-local-branch = true;
};
user = {
email = cfg.userEmail;
name = cfg.userName;
};
ui = {
pager = "delta";
diff.format = "git";
diff-editor = "meld-3";
merge-editor = "meld";
};
signing = {
sign-all = true;
backend = "gpg";
inherit (cfg.signing) key;
backends.gpg.allow-expired-keys = false;
};
};
};
};
services = {
git-sync = {
enable = false; # The real thing syncs too quickly and asks for passphrase, which is annoying

View file

@ -1,8 +1,9 @@
{ config, ... }:
{ pkgs, lib, config, ... }:
{
config = {
home = {
sessionVariables = {
PAGER = "less";
LESSHISTFILE = "${config.xdg.stateHome}/lesshst";
LESS = "-R";
LESS_TERMCAP_mb = "$(echo $'\\E[1;31m')"; # begin blink

View file

@ -62,7 +62,7 @@ in
description = "Path to the password store entry";
};
selector = lib.mkOption {
type = lib.types.nullOr lib.types.str;
type = lib.types.str;
default = null;
description = "If set, will parse the password metadata as YML and use selector (yq) instead of the password.";
};

View file

@ -1,9 +1,7 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash
#! nix-shell -i bash --pure
#! nix-shell -p bash pdftk inkscape gnused coreutils file
set -euxo pipefail
# Utility to write over a PDF file pages
# TODO Inkscape vodoo: Put the original in its own layer and skip when merging

View file

@ -1,6 +1,6 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.configargparse python3Packages.filelock python3Packages.requests python3Packages.yt-dlp ffmpeg
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.configargparse python3Packages.filelock python3Packages.filelock python3Packages.requests python3Packages.yt-dlp ffmpeg
# Also needs mpv but if I put it there it's not using the configured one
@ -189,13 +189,7 @@ class RVElement:
def ytdl_infos(self) -> typing.Optional[dict]:
try:
return self.metafile_read("ytdl")
# If the metafile doesn't exist or is corrupted
except (FileNotFoundError, TypeError, AttributeError, EOFError):
# Delete file otherwise __str__ won't be happy
metafile = self.metafile("ytdl")
if os.path.isfile(metafile):
os.unlink(metafile)
infos = self._ytdl_infos()
self.metafile_write("ytdl", infos)
return infos

View file

@ -50,7 +50,6 @@ in
php = lib.mkEnableOption "PHP dev stuff";
prose = lib.mkEnableOption "Writing stuff";
python = lib.mkEnableOption "Python dev stuff";
vm = lib.mkEnableOption "Virtual machines";
};
storageSize = lib.mkOption {
default = "small";

View file

@ -76,11 +76,6 @@
# on others I sometimes turn it off when unsuspending.
logind.extraConfig = "HandlePowerKey=ignore";
udev.extraRules = ''
# DYMO LabelPOINT 350
ACTION=="add", SUBSYSTEMS=="usb", ATTRS{idVendor}=="0922", ATTRS{idProduct}=="0015", MODE="0666"
'';
};
# TODO Hibernation?

View file

@ -23,7 +23,7 @@
actions.update-props = {
"session.suspend-timeout-seconds" = 0;
"dither.method" = "wannamaker3";
"dither.noise" = 4;
"dither.noise" = 2;
};
}
];
@ -41,11 +41,9 @@
languages = [ "fr" ];
symbolsFile = "${pkgs.stdenv.mkDerivation {
name = "qwerty-fr-keypad";
src = pkgs.fetchFromGitHub {
owner = "qwerty-fr";
repo = "qwerty-fr";
src = builtins.fetchGit {
url = "https://github.com/qwerty-fr/qwerty-fr.git";
rev = "3a4d13089e8ef016aa20baf6b2bf3ea53de674b8";
sha256 = "sha256-wn5n6jJVDrQWJze8xYF2nEY8a7mHI3hVO4xsT4LMo9c=";
};
patches = [ ./qwerty-fr-keypad.diff ];
# TODO This doesn't seem to be applied... it's the whole point of the derivation :(

View file

@ -1,25 +1,7 @@
{ pkgs, lib, config, ... }:
{
config = lib.mkMerge [
(lib.mkIf config.frogeye.dev.docker {
config = lib.mkIf config.frogeye.dev.docker {
virtualisation.docker.enable = true;
users.users.geoffrey.extraGroups = [ "docker" ];
})
(lib.mkIf config.frogeye.dev.vm {
# Required to mount filesystems. Also need to add
# <binary path="/run/current-system/sw/bin/virtiofsd"/>
# to the VM's filesystem XML config
environment.systemPackages = [ pkgs.virtiofsd ];
virtualisation = {
libvirtd.enable = true;
spiceUSBRedirection.enable = true;
};
users.extraGroups.libvirtd.members = [ "geoffrey" ];
})
{
users.extraGroups.dialout.members = [ "geoffrey" ];
# Always comes in handy, not enabled by default anyways
programs.nix-ld.enable = true;
}
];
}