nixos/hadoop: refactor HDFS configs

This commit is contained in:
illustris 2022-01-08 16:02:32 +05:30
parent dd5f004b06
commit f6cf1ced33

View file

@ -1,47 +1,89 @@
{ config, lib, pkgs, ...}: { config, lib, pkgs, ... }:
with lib; with lib;
let let
cfg = config.services.hadoop; cfg = config.services.hadoop;
# Config files for hadoop services
hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/"; hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/";
restartIfChanged = mkOption {
type = types.bool; # Generator for HDFS service options
description = ''
Automatically restart the service on config change.
This can be set to false to defer restarts on clusters running critical applications.
Please consider the security implications of inadvertently running an older version,
and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
'';
default = false;
};
openFirewall = serviceName: mkOption {
type = types.bool;
default = true;
description = "Open firewall ports for ${serviceName}.";
};
hadoopServiceOption = { serviceName, firewallOption ? true }: { hadoopServiceOption = { serviceName, firewallOption ? true }: {
enable = mkEnableOption serviceName; enable = mkEnableOption serviceName;
inherit restartIfChanged; restartIfChanged = mkOption {
} // (if firewallOption then {openFirewall = openFirewall serviceName;} else {}); type = types.bool;
description = ''
Automatically restart the service on config change.
This can be set to false to defer restarts on clusters running critical applications.
Please consider the security implications of inadvertently running an older version,
and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
'';
default = false;
};
} // (optionalAttrs firewallOption {
openFirewall = mkOption {
type = types.bool;
default = true;
description = "Open firewall ports for ${serviceName}.";
};
});
# Generator for HDFS service configs
hadoopServiceConfig =
{ name
, serviceOptions ? cfg.hdfs."${toLower name}"
, description ? "Hadoop HDFS ${name}"
, User ? "hdfs"
, allowedTCPPorts ? [ ]
, preStart ? ""
, environment ? { }
}: (
mkIf serviceOptions.enable {
systemd.services."hdfs-${toLower name}" = {
inherit description preStart environment;
wantedBy = [ "multi-user.target" ];
inherit (serviceOptions) restartIfChanged;
serviceConfig = {
inherit User;
SyslogIdentifier = "hdfs-${toLower name}";
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} ${toLower name}";
Restart = "always";
};
};
networking.firewall.allowedTCPPorts = mkIf
((builtins.hasAttr "openFirewall" serviceOptions) && serviceOptions.openFirewall)
allowedTCPPorts;
}
);
in in
{ {
options.services.hadoop.hdfs = { options.services.hadoop.hdfs = {
namenode = hadoopServiceOption { serviceName = "HDFS NameNode"; } // { namenode = hadoopServiceOption { serviceName = "HDFS NameNode"; } // {
formatOnInit = mkOption { formatOnInit = mkOption {
type = types.bool; type = types.bool;
default = false; default = false;
description = '' description = ''
Format HDFS namenode on first start. This is useful for quickly spinning up ephemeral HDFS clusters with a single namenode. Format HDFS namenode on first start. This is useful for quickly spinning up
For HA clusters, initialization involves multiple steps across multiple nodes. Follow this guide to initialize an HA cluster manually: ephemeral HDFS clusters with a single namenode.
For HA clusters, initialization involves multiple steps across multiple nodes.
Follow this guide to initialize an HA cluster manually:
<link xlink:href="https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html"/> <link xlink:href="https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html"/>
''; '';
}; };
}; };
datanode = hadoopServiceOption { serviceName = "HDFS DataNode"; }; datanode = hadoopServiceOption { serviceName = "HDFS DataNode"; };
journalnode = hadoopServiceOption { serviceName = "HDFS JournalNode"; }; journalnode = hadoopServiceOption { serviceName = "HDFS JournalNode"; };
zkfc = hadoopServiceOption { zkfc = hadoopServiceOption {
serviceName = "HDFS ZooKeeper failover controller"; serviceName = "HDFS ZooKeeper failover controller";
firewallOption = false; firewallOption = false;
}; };
httpfs = hadoopServiceOption { serviceName = "HDFS JournalNode"; } // { httpfs = hadoopServiceOption { serviceName = "HDFS JournalNode"; } // {
tempPath = mkOption { tempPath = mkOption {
type = types.path; type = types.path;
@ -49,118 +91,65 @@ in
description = "HTTPFS_TEMP path used by HTTPFS"; description = "HTTPFS_TEMP path used by HTTPFS";
}; };
}; };
}; };
config = mkMerge [ config = mkMerge [
(mkIf cfg.hdfs.namenode.enable { (hadoopServiceConfig {
systemd.services.hdfs-namenode = { name = "NameNode";
description = "Hadoop HDFS NameNode"; allowedTCPPorts = [
wantedBy = [ "multi-user.target" ];
inherit (cfg.hdfs.namenode) restartIfChanged;
preStart = (mkIf cfg.hdfs.namenode.formatOnInit ''
${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true
'');
serviceConfig = {
User = "hdfs";
SyslogIdentifier = "hdfs-namenode";
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} namenode";
Restart = "always";
};
};
networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.namenode.openFirewall [
9870 # namenode.http-address 9870 # namenode.http-address
8020 # namenode.rpc-address 8020 # namenode.rpc-address
8022 # namenode. servicerpc-address 8022 # namenode. servicerpc-address
]); ];
preStart = (mkIf cfg.hdfs.namenode.formatOnInit
"${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true"
);
}) })
(mkIf cfg.hdfs.datanode.enable {
systemd.services.hdfs-datanode = {
description = "Hadoop HDFS DataNode";
wantedBy = [ "multi-user.target" ];
inherit (cfg.hdfs.datanode) restartIfChanged;
serviceConfig = { (hadoopServiceConfig {
User = "hdfs"; name = "DataNode";
SyslogIdentifier = "hdfs-datanode"; allowedTCPPorts = [
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} datanode";
Restart = "always";
};
};
networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.datanode.openFirewall [
9864 # datanode.http.address 9864 # datanode.http.address
9866 # datanode.address 9866 # datanode.address
9867 # datanode.ipc.address 9867 # datanode.ipc.address
]); ];
}) })
(mkIf cfg.hdfs.journalnode.enable {
systemd.services.hdfs-journalnode = {
description = "Hadoop HDFS JournalNode";
wantedBy = [ "multi-user.target" ];
inherit (cfg.hdfs.journalnode) restartIfChanged;
serviceConfig = { (hadoopServiceConfig {
User = "hdfs"; name = "JournalNode";
SyslogIdentifier = "hdfs-journalnode"; allowedTCPPorts = [
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} journalnode";
Restart = "always";
};
};
networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.journalnode.openFirewall [
8480 # dfs.journalnode.http-address 8480 # dfs.journalnode.http-address
8485 # dfs.journalnode.rpc-address 8485 # dfs.journalnode.rpc-address
]); ];
}) })
(mkIf cfg.hdfs.zkfc.enable {
systemd.services.hdfs-zkfc = {
description = "Hadoop HDFS ZooKeeper failover controller";
wantedBy = [ "multi-user.target" ];
inherit (cfg.hdfs.zkfc) restartIfChanged;
serviceConfig = { (hadoopServiceConfig {
User = "hdfs"; name = "zkfc";
SyslogIdentifier = "hdfs-zkfc"; description = "Hadoop HDFS ZooKeeper failover controller";
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} zkfc";
Restart = "always";
};
};
}) })
(mkIf cfg.hdfs.httpfs.enable {
systemd.services.hdfs-httpfs = {
description = "Hadoop httpfs";
wantedBy = [ "multi-user.target" ];
inherit (cfg.hdfs.httpfs) restartIfChanged;
environment.HTTPFS_TEMP = cfg.hdfs.httpfs.tempPath; (hadoopServiceConfig {
name = "HTTPFS";
preStart = '' environment.HTTPFS_TEMP = cfg.hdfs.httpfs.tempPath;
mkdir -p $HTTPFS_TEMP preStart = "mkdir -p $HTTPFS_TEMP";
''; User = "httpfs";
allowedTCPPorts = [
serviceConfig = {
User = "httpfs";
SyslogIdentifier = "hdfs-httpfs";
ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} httpfs";
Restart = "always";
};
};
networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.httpfs.openFirewall [
14000 # httpfs.http.port 14000 # httpfs.http.port
]); ];
}) })
(mkIf (
(mkIf
(
cfg.hdfs.namenode.enable || cfg.hdfs.datanode.enable || cfg.hdfs.journalnode.enable || cfg.hdfs.zkfc.enable cfg.hdfs.namenode.enable || cfg.hdfs.datanode.enable || cfg.hdfs.journalnode.enable || cfg.hdfs.zkfc.enable
) { )
users.users.hdfs = { {
description = "Hadoop HDFS user"; users.users.hdfs = {
group = "hadoop"; description = "Hadoop HDFS user";
uid = config.ids.uids.hdfs; group = "hadoop";
}; uid = config.ids.uids.hdfs;
}) };
})
(mkIf cfg.hdfs.httpfs.enable { (mkIf cfg.hdfs.httpfs.enable {
users.users.httpfs = { users.users.httpfs = {
description = "Hadoop HTTPFS user"; description = "Hadoop HTTPFS user";