X-Git-Url: https://git.josue.xyz/?a=blobdiff_plain;ds=sidebyside;f=.local%2Fbin%2Frpyc_registry.py;fp=.local%2Fbin%2Frpyc_registry.py;h=ec61e04a9181fb98817bdbd26d3ea6da7e3b3db5;hb=4d07c77cf4d78cab8639e13ddc3c22495e585b0b;hp=0000000000000000000000000000000000000000;hpb=b3950616b54221c40a7dab9099bda675007e5b6e;p=dotfiles%2F.git diff --git a/.local/bin/rpyc_registry.py b/.local/bin/rpyc_registry.py new file mode 100755 index 00000000..ec61e04a --- /dev/null +++ b/.local/bin/rpyc_registry.py @@ -0,0 +1,42 @@ +#!/usr/bin/python +""" +The registry server listens to broadcasts on UDP port 18812, answering to +discovery queries by clients and registering keepalives from all running +servers. In order for clients to use discovery, a registry service must +be running somewhere on their local network. +""" +from plumbum import cli +from rpyc.utils.registry import REGISTRY_PORT, DEFAULT_PRUNING_TIMEOUT +from rpyc.utils.registry import UDPRegistryServer, TCPRegistryServer +from rpyc.lib import setup_logger + + +class RegistryServer(cli.Application): + mode = cli.SwitchAttr(["-m", "--mode"], cli.Set("UDP", "TCP"), default="UDP", + help="Serving mode") + + ipv6 = cli.Flag(["-6", "--ipv6"], help="use ipv6 instead of ipv4") + + port = cli.SwitchAttr(["-p", "--port"], cli.Range(0, 65535), default=REGISTRY_PORT, + help="The UDP/TCP listener port") + + logfile = cli.SwitchAttr(["--logfile"], str, default=None, + help="The log file to use; the default is stderr") + + quiet = cli.SwitchAttr(["-q", "--quiet"], help="Quiet mode (only errors are logged)") + + pruning_timeout = cli.SwitchAttr(["-t", "--timeout"], int, + default=DEFAULT_PRUNING_TIMEOUT, help="Set a custom pruning timeout (in seconds)") + + def main(self): + if self.mode == "UDP": + server = UDPRegistryServer(host='::' if self.ipv6 else '0.0.0.0', port=self.port, + pruning_timeout=self.pruning_timeout) + elif self.mode == "TCP": + server = TCPRegistryServer(port=self.port, pruning_timeout=self.pruning_timeout) + setup_logger(self.quiet, self.logfile) + server.start() + + +if __name__ == "__main__": + RegistryServer.run()