CREATE OR REPLACE FUNCTION netmax.sp_import_l(p_load_shell text, p_file_path text, p_tablename text, p_csv_file_suffix text, p_task_id text, p_file_names
text)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_ext_tablename text;
v_column_list text;
v_execute_express text;
v_sql text;
v_file_path text;
v_ext_column_list text;
v_all_column_list text;
v_errCnt INTEGER;
v_errSql text;
v_errMsg text;
v_err_data_file text;
v_err_ext_tablename text;
v_err_process_shell text;
BEGIN
PERFORM Sp_performance('Sp_import_L', p_task_id||' '||p_tablename);
v_ext_tablename := 'ext_'||p_tablename;
v_file_path := p_file_path ||'/'|| p_task_id;
v_column_list := array_to_string(array(
SELECT attname FROM pg_catalog.pg_attribute WHERE attrelid IN
(SELECT oid FROM pg_catalog.pg_class WHERE UPPER(relname) = UPPER(p_tablename))
AND attname NOT IN('ctid','xmin','cmin','xmax','cmax','tableoid','gp_segment_id','taskid')
AND attname NOT LIKE '%pg.drop%' AND attname NOT LIKE '%..%'
ORDER BY attnum), ',');
IF position('L_T' in upper(p_tablename)) > 0 or position('N_T' in upper(p_tablename)) > 0 or position('NR_' in upper(p_tablename)) > 0 THEN
v_execute_express := p_load_shell||' '||p_file_path||' '||p_csv_file_suffix||' SOPinU_2020 '||p_file_names;
ELSE
v_execute_express := p_load_shell||' '||v_file_path||' '||p_csv_file_suffix||' SOPinU_2020 '||p_file_names;
END IF;
v_sql := 'DROP EXTERNAL TABLE IF EXISTS '||v_ext_tablename;
EXECUTE v_sql;
IF upper(p_tablename) in ('L_T051', 'L_T052', 'L_T053', 'L_T054', 'L_T305', 'L_T306', 'L_T051_SS', 'L_T052_SS', 'L_T053_SS', 'L_T054_SS', 'L_T305_
SS', 'L_T306_SS','NR_T054')
THEN
v_sql :=
'CREATE EXTERNAL WEB TABLE '||v_ext_tablename||'(LIKE '||p_tablename||'_NDS)
EXECUTE '''||v_execute_express||'''
FORMAT ''CSV''(DELIMITER AS '','' NULL AS '''' QUOTE AS ''"'' FILL MISSING FIELDS) LOG ERRORS SEGMENT REJECT LIMIT 99 PERCENT';
ELSIF upper(p_tablename) in ('L_T365', 'L_T366', 'L_T367')
THEN
v_ext_column_list := Sp_import_get_column_list(p_tablename || '_NDS');
v_sql :=
'CREATE EXTERNAL WEB TABLE '||v_ext_tablename||'(LIKE '||p_tablename||'_NDS)
EXECUTE '''||v_execute_express||'''
FORMAT ''CSV''(DELIMITER AS '','' NULL AS '''' QUOTE AS ''"'' FILL MISSING FIELDS) LOG ERRORS SEGMENT REJECT LIMIT 99 PERCENT';
ELSE
v_sql :=
'CREATE EXTERNAL WEB TABLE '||v_ext_tablename||'(LIKE '||p_tablename||')
EXECUTE '''||v_execute_express||'''
FORMAT ''CSV''(DELIMITER AS '','' NULL AS '''' QUOTE AS ''"'' FILL MISSING FIELDS) LOG ERRORS SEGMENT REJECT LIMIT 99 PERCENT';
END IF;
EXECUTE v_sql;
v_sql := 'ALTER EXTERNAL TABLE '||v_ext_tablename||' ADD c9999 varchar(1)';
EXECUTE v_sql;
IF upper(p_tablename) in ('L_T051', 'L_T052', 'L_T053', 'L_T054', 'L_T305', 'L_T306', 'L_T051_SS', 'L_T052_SS', 'L_T053_SS', 'L_T054_SS', 'L_T305_
SS', 'L_T306_SS','NR_T054')
THEN
v_sql := 'INSERT INTO '||p_tablename||'(TaskID,'||v_column_list||') SELECT ''' ||p_task_id|| ''','||v_column_list||' FROM '||v_ext_tablename;
ELSIF upper(p_tablename) in ('L_T365', 'L_T366', 'L_T367')
THEN
v_all_column_list := Sp_import_get_column_list(p_tablename);
v_sql := 'INSERT INTO '||p_tablename||'('||v_all_column_list||') SELECT ''' ||p_task_id|| ''',null,null,null,null,null,null,null,null,null
,'||v_ext_column_list||'
FROM '||v_ext_tablename;
ELSE
v_sql := 'INSERT INTO '||p_tablename||'('||v_column_list||') SELECT '||v_column_list||' FROM '||v_ext_tablename;
END IF;
EXECUTE v_sql;
v_sql := 'DROP TABLE IF EXISTS '||v_ext_tablename||'_ErrorRecord;';
EXECUTE v_sql;
SELECT count(*) INTO v_errCnt FROM gp_read_error_log(v_ext_tablename);
--Only NGI DM do the following
IF v_errCnt > 0 AND position(':' in p_file_names)>0
THEN
v_err_data_file := substring(p_file_names, position(':' in p_file_names)+1)||'_errorRecord';
v_sql := 'CREATE TABLE '||v_ext_tablename||'_ErrorRecord AS SELECT errmsg,rawdata FROM gp_read_error_log('''||v_ext_tablename||''');';
EXECUTE v_sql;
v_sql := 'COPY
(
SELECT errmsg,rawdata FROM '||v_ext_tablename||'_ErrorRecord WHERE rawdata IN
(
SELECT min(rawdata) FROM '||v_ext_tablename||'_ErrorRecord GROUP BY substring(errmsg,position(''column'' in errmsg)) limit 100
)
) TO '''||v_err_data_file||''' WITH DELIMITER ''|'' CSV';
EXECUTE v_sql;
v_err_ext_tablename := 'err_'||v_ext_tablename;
v_sql := 'DROP EXTERNAL TABLE IF EXISTS '||v_err_ext_tablename;
EXECUTE v_sql;
v_err_process_shell := '/home/gpadmin/.netmax_shell/errorRecordProcess.sh';
v_execute_express := v_err_process_shell||' '||p_task_id||' '||p_csv_file_suffix||' SOPinU_2020 '||p_file_names;
v_sql :=
'CREATE EXTERNAL WEB TABLE '||v_err_ext_tablename||'(file_name TEXT,line_num INTEGER,err_data TEXT,err_reason TEXT)
EXECUTE '''||v_execute_express||'''
FORMAT ''CSV''(DELIMITER AS '':'' NULL AS '''' QUOTE AS ''"'' FILL MISSING FIELDS)';
EXECUTE v_sql;
FOR v_errSql,v_errMsg IN EXECUTE 'SELECT file_name||''|''||line_num||''|''||err_data,err_reason FROM '||v_err_ext_tablename LOOP
PERFORM Sp_etl_log('Sp_import_L', v_errSql, 'Import', v_errMsg);
END LOOP;
END IF;
PERFORM Sp_performance('Sp_import_L','end');
EXCEPTION WHEN OTHERS THEN
PERFORM Sp_etl_log('Sp_import_L', v_sql ,SQLSTATE, SQLERRM);
RAISE Warning 'Exception: %', SQLERRM;
END;