nixos/hadoop: add gateway role

This commit is contained in:
illustris 2022-02-27 11:52:18 +05:30
parent d39056d165
commit 716b0dfaaf
4 changed files with 57 additions and 52 deletions

View file

@ -149,6 +149,8 @@ with lib;
description = "Directories containing additional config files to be added to HADOOP_CONF_DIR"; description = "Directories containing additional config files to be added to HADOOP_CONF_DIR";
}; };
gatewayRole.enable = mkEnableOption "gateway role for deploying hadoop configs";
package = mkOption { package = mkOption {
type = types.package; type = types.package;
default = pkgs.hadoop; default = pkgs.hadoop;
@ -158,21 +160,16 @@ with lib;
}; };
config = mkMerge [ config = mkIf cfg.gatewayRole.enable {
(mkIf (builtins.hasAttr "yarn" config.users.users || users.groups.hadoop = {
builtins.hasAttr "hdfs" config.users.users || gid = config.ids.gids.hadoop;
builtins.hasAttr "httpfs" config.users.users) { };
users.groups.hadoop = { environment = {
gid = config.ids.gids.hadoop; systemPackages = [ cfg.package ];
}; etc."hadoop-conf".source = let
environment = { hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/";
systemPackages = [ cfg.package ]; in "${hadoopConf}";
etc."hadoop-conf".source = let variables.HADOOP_CONF_DIR = "/etc/hadoop-conf/";
hadoopConf = "${import ./conf.nix { inherit cfg pkgs lib; }}/"; };
in "${hadoopConf}"; };
variables.HADOOP_CONF_DIR = "/etc/hadoop-conf/";
};
})
];
} }

View file

@ -51,6 +51,8 @@ let
}; };
}; };
services.hadoop.gatewayRole.enable = true;
networking.firewall.allowedTCPPorts = mkIf networking.firewall.allowedTCPPorts = mkIf
((builtins.hasAttr "openFirewall" serviceOptions) && serviceOptions.openFirewall) ((builtins.hasAttr "openFirewall" serviceOptions) && serviceOptions.openFirewall)
allowedTCPPorts; allowedTCPPorts;
@ -145,17 +147,13 @@ in
]; ];
}) })
(mkIf (mkIf cfg.gatewayRole.enable {
( users.users.hdfs = {
cfg.hdfs.namenode.enable || cfg.hdfs.datanode.enable || cfg.hdfs.journalnode.enable || cfg.hdfs.zkfc.enable description = "Hadoop HDFS user";
) group = "hadoop";
{ uid = config.ids.uids.hdfs;
users.users.hdfs = { };
description = "Hadoop HDFS user"; })
group = "hadoop";
uid = config.ids.uids.hdfs;
};
})
(mkIf cfg.hdfs.httpfs.enable { (mkIf cfg.hdfs.httpfs.enable {
users.users.httpfs = { users.users.httpfs = {
description = "Hadoop HTTPFS user"; description = "Hadoop HTTPFS user";
@ -163,5 +161,6 @@ in
isSystemUser = true; isSystemUser = true;
}; };
}) })
]; ];
} }

View file

@ -49,10 +49,7 @@ in
}; };
config = mkMerge [ config = mkMerge [
(mkIf ( (mkIf cfg.gatewayRole.enable {
cfg.yarn.resourcemanager.enable || cfg.yarn.nodemanager.enable
) {
users.users.yarn = { users.users.yarn = {
description = "Hadoop YARN user"; description = "Hadoop YARN user";
group = "hadoop"; group = "hadoop";
@ -74,6 +71,9 @@ in
Restart = "always"; Restart = "always";
}; };
}; };
services.hadoop.gatewayRole.enable = true;
networking.firewall.allowedTCPPorts = (mkIf cfg.yarn.resourcemanager.openFirewall [ networking.firewall.allowedTCPPorts = (mkIf cfg.yarn.resourcemanager.openFirewall [
8088 # resourcemanager.webapp.address 8088 # resourcemanager.webapp.address
8030 # resourcemanager.scheduler.address 8030 # resourcemanager.scheduler.address
@ -119,6 +119,8 @@ in
}; };
}; };
services.hadoop.gatewayRole.enable = true;
networking.firewall.allowedTCPPortRanges = [ networking.firewall.allowedTCPPortRanges = [
(mkIf (cfg.yarn.nodemanager.openFirewall) {from = 1024; to = 65535;}) (mkIf (cfg.yarn.nodemanager.openFirewall) {from = 1024; to = 65535;})
]; ];

View file

@ -145,7 +145,14 @@ import ../make-test-python.nix ({ package, ... }: {
}; };
}; };
}; };
}; client = { options, ... }: {
services.hadoop = {
gatewayRole.enable = true;
inherit package coreSite hdfsSite;
yarnSite = options.services.hadoop.yarnSite.default // yarnSiteHA;
};
};
};
testScript = '' testScript = ''
start_all() start_all()
@ -202,26 +209,26 @@ import ../make-test-python.nix ({ package, ... }: {
# DN should have started by now, but confirm anyway # DN should have started by now, but confirm anyway
dn1.wait_for_unit("hdfs-datanode") dn1.wait_for_unit("hdfs-datanode")
# Print states of namenodes # Print states of namenodes
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat") client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
# Wait for cluster to exit safemode # Wait for cluster to exit safemode
dn1.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait") client.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat") client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
# test R/W # test R/W
dn1.succeed("echo testfilecontents | sudo -u hdfs hdfs dfs -put - /testfile") client.succeed("echo testfilecontents | sudo -u hdfs hdfs dfs -put - /testfile")
assert "testfilecontents" in dn1.succeed("sudo -u hdfs hdfs dfs -cat /testfile") assert "testfilecontents" in client.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
# Test NN failover # Test NN failover
nn1.succeed("systemctl stop hdfs-namenode") nn1.succeed("systemctl stop hdfs-namenode")
assert "active" in dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState") assert "active" in client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat") client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
assert "testfilecontents" in dn1.succeed("sudo -u hdfs hdfs dfs -cat /testfile") assert "testfilecontents" in client.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
nn1.succeed("systemctl start hdfs-namenode") nn1.succeed("systemctl start hdfs-namenode")
nn1.wait_for_open_port(9870) nn1.wait_for_open_port(9870)
nn1.wait_for_open_port(8022) nn1.wait_for_open_port(8022)
nn1.wait_for_open_port(8020) nn1.wait_for_open_port(8020)
assert "standby" in dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState") assert "standby" in client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat") client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
#### YARN tests #### #### YARN tests ####
@ -237,21 +244,21 @@ import ../make-test-python.nix ({ package, ... }: {
nm1.wait_for_unit("yarn-nodemanager") nm1.wait_for_unit("yarn-nodemanager")
nm1.wait_for_open_port(8042) nm1.wait_for_open_port(8042)
nm1.wait_for_open_port(8040) nm1.wait_for_open_port(8040)
nm1.wait_until_succeeds("yarn node -list | grep Nodes:1") client.wait_until_succeeds("yarn node -list | grep Nodes:1")
nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat") client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
nm1.succeed("sudo -u yarn yarn node -list | systemd-cat") client.succeed("sudo -u yarn yarn node -list | systemd-cat")
# Test RM failover # Test RM failover
rm1.succeed("systemctl stop yarn-resourcemanager") rm1.succeed("systemctl stop yarn-resourcemanager")
assert "standby" not in nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState") assert "standby" not in client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat") client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
rm1.succeed("systemctl start yarn-resourcemanager") rm1.succeed("systemctl start yarn-resourcemanager")
rm1.wait_for_unit("yarn-resourcemanager") rm1.wait_for_unit("yarn-resourcemanager")
rm1.wait_for_open_port(8088) rm1.wait_for_open_port(8088)
assert "standby" in nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState") assert "standby" in client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat") client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
assert "Estimated value of Pi is" in nm1.succeed("HADOOP_USER_NAME=hdfs yarn jar $(readlink $(which yarn) | sed -r 's~bin/yarn~lib/hadoop-*/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar~g') pi 2 10") assert "Estimated value of Pi is" in client.succeed("HADOOP_USER_NAME=hdfs yarn jar $(readlink $(which yarn) | sed -r 's~bin/yarn~lib/hadoop-*/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar~g') pi 2 10")
assert "SUCCEEDED" in nm1.succeed("yarn application -list -appStates FINISHED") assert "SUCCEEDED" in client.succeed("yarn application -list -appStates FINISHED")
''; '';
}) })