dsjob.pjb.xml抽出

 import sys

#from lxml import etree

from collections import Counter

import glob

import os

import xml.etree.ElementTree as ET

import datetime

import re

from collections import defaultdict


root_stages = []

stages = {}

stage_outpins = {}

stage_inpins = {}

link_to_inpins = {}

link_from_outpins = {}

pin_names = {}

pin_ids = {}

flows = []

terminlal_flows = []

def initialize_global_list():

    global job_name

    job_name= ""

    global flows

    flows = []

    global link_from_outpins

    link_from_outpins = {}

    global link_to_inpins

    link_to_inpins = {}

    global pin_ids

    pin_ids = {}

    global pin_names

    pin_names = {}

    global stage_inpins

    stage_inpins = {}

    global stage_outpins

    stage_outpins = {}

    global stages

    stages = {}

    global terminlal_flows

    terminlal_flows = []

    global root_stages

    root_stages = []


def detectDb2WithSort():

    for flow in flows :

        if flow[0] == 'Container:Start':

            start='Container:Start'+":"+flow[1]

        else:

            start=flow[0]

        if flow[-1] == 'Container:End':

            end = flow[-2] + ":" 'Container:End'

        else:

            end=flow[-1]

        terminlal_flows.append((start + ":" + end))

        counter = Counter(terminlal_flows)


    for flow , count in counter.items():

        print(f'{job_name},{flow},{count}')



def detectForkJoin():

    for flow in flows :

        if flow[0] == 'Container:Start':

            start='Container:Start'+":"+flow[1]

        else:

            start=flow[0]

        if flow[-1] == 'Container:End':

            end = flow[-2] + ":" 'Container:End'

        else:

            end=flow[-1]

        terminlal_flows.append((start + ":" + end))

        counter = Counter(terminlal_flows)


    for flow , count in counter.items():

        print(f'{job_name},{flow},{count}')




def appendNextLink(pre_stage,flow,is_container):

    output_pins = []

    child_flow_id = 0


    for pin, stage in stage_outpins.items() :

        if stage == pre_stage:

            output_pins.append(pin)


    if is_container == True :

        for outpin_id in output_pins:

            l_flow = flow.copy()

            outpin_name = pin_names[outpin_id]

            link_name = link_from_outpins[outpin_name]


            l_flow.append(pre_stage + ":" + link_name)


            inpin_id = link_to_inpins[link_name]

            inpin_name = pin_ids[inpin_id]


            nest_stage = stage_inpins[inpin_name]


            l_flow.append(nest_stage)



            if stages[nest_stage]["stage_type"] != None :

                appendNextLink(nest_stage,  l_flow, False)



    else :

        for outpin_id in output_pins :

            l_flow = flow.copy()

            outpin_name = pin_names[outpin_id]

            link_name = link_from_outpins[outpin_name]


            l_flow.append(link_name)


            inpin_id = link_to_inpins[link_name]

            inpin_name   = pin_ids[inpin_id]


            nest_stage = stage_inpins[inpin_name]


            l_flow.append(nest_stage)


            if stages[nest_stage]["stage_type"] != None :

                appendNextLink(nest_stage,  l_flow, False)

            else :

                print(flow)

                flows.append(flow)


    if len(output_pins) == 0 :

        print(flow)

        flows.append(flow)


def getFlow():

    stages["Container_End"] = {"stage_type": "Container_End"}


    for root_stage in root_stages:

        flow = []

        flow.append(root_stage)

        appendNextLink(root_stage,flow,False)


    for stage_name,property in stages.items():

        flow = []

        if property["stage_type"] == None :

            appendNextLink(stage_name,flow,True)




def getchild(f,element):


    if element.tag == '{http:///com/ibm/datastage/ai/dtm/ds.ecore}DSJobDefSDO' or element.tag == '{http:///com/ibm/datastage/ai/dtm/ds.ecore}DSSharedContainerDefSDO':

        global job_name

        job_name=element.attrib.get('name')


    if element.tag == 'contains_JobObject' and element.attrib.get('{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSStageSDO':

        root_stage_name = element.attrib.get('name')

        stage_type = element.attrib.get('stageType')

        inputPins = element.attrib.get('inputPins')

        outputPins = element.attrib.get('outputPins')


        stages[root_stage_name] = {"stage_type" : stage_type }


        if inputPins == None :

            root_stages.append(root_stage_name)


        if inputPins != None:

            pins = inputPins.split("|")

            for pin in pins:

                stage_inpins[pin] = root_stage_name


        if outputPins != None:

            pins = outputPins.split("|")

            for pin in pins:

                stage_outpins[pin] = root_stage_name


    if element.tag == 'has_DSDesignView' and element.attrib.get('name') == "コンテナー":

        root_stage_name = "Container"

        inputPins = element.attrib.get('inputPins')

        outputPins = element.attrib.get('outputPins')


        root_stages.append(root_stage_name+":"+"Start")

        stages[root_stage_name+":"+"Start"] = {"stage_type" : "ContainerStart" }

        stages[root_stage_name+":"+"End"] = {"stage_type" : "ContainerEnd" }


        if inputPins != None:

            pins = inputPins.split("|")

            for pin in pins:

                stage_inpins[pin] = root_stage_name+":"+"End"


        if outputPins != None:

            pins = outputPins.split("|")

            for pin in pins:

                stage_outpins[pin] = root_stage_name+":"+"Start"




    if element.tag == 'has_OutputPin' and element.attrib.get('{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSOutputPinSDO':

        pin_id   = element.attrib.get('{http://www.omg.org/XMI}id')

        pin_name = element.attrib.get('internalID')


        pin_names[pin_name] = pin_id

        pin_ids[pin_id] = pin_name


    if element.tag == 'has_InputPin' and element.attrib.get('{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSInputPinSDO':

        pin_id   = element.attrib.get('{http://www.omg.org/XMI}id')

        pin_name = element.attrib.get('internalID')


        pin_names[pin_name] = pin_id

        pin_ids[pin_id] = pin_name



    if element.tag == 'contains_JobObject' and element.attrib.get(

            '{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSLinkSDO':

        link_name = element.attrib.get('name')

        to_InputPin = element.attrib.get('to_InputPin')

        from_OutputPin = element.attrib.get('from_OutputPin')


        link_from_outpins[from_OutputPin] = link_name

        link_to_inpins[link_name] = to_InputPin


    for child in element:

        getchild(f,child)


    return 0


def main( args=sys.argv ):

    # 開始時間出力

    now = datetime.datetime.now()

    time = now.__format__("%Y%m%d_%H%M%S")

    print(f'start : {time}')


    len_args = len(args)

    i = 1


    while (i < len_args):

        if (args[i] == "--indir" and (i + 1) < len_args):

            indir = args[i + 1]

            i += 1

        elif (args[i] == "--outdir" and (i + 1) < len_args):

            outdir = args[i + 1]

            i += 1

        i += 1


    filelist = glob.glob(indir + '*.xml')


    if os.path.exists(outdir + 'ds_job_analyze.csv'):

        os.remove(outdir + 'ds_job_analyze.csv')




    with open(outdir + 'ds_job_analyze.csv','a') as f:

        f.write('file_name,job_name,root_stage,flow_path\n')

        #情報抽出

        counter = 0

        for xmlfile in filelist :


            #parser = etree.XMLParser(recover=True)

            #tree = etree.parse(xmlfile, parser=parser)

            

            tree = ET.ElementTree(file=xmlfile) 

            root = tree.getroot()


            #変数初期化

            initialize_global_list()


            #XMlを読み込んでメタデータを変数に格納

            getchild(f,root)


            getFlow()


            detectForkJoin()


            for root_stage in root_stages :

                f.write(f'{xmlfile},{job_name},flow_path\n')



            if counter == round(len(filelist)/10) :

                counter += 1

                print(f'{counter}/{len(filelist)}')



    # 終了時間出力

    now = datetime.datetime.now()

    time = now.__format__("%Y%m%d_%H%M%S")

    print(f'end   : {time} ')


if __name__ == "__main__":

    sys.exit( main() )


コメント

人気の投稿