#285 総流入量および純流入量を補正するコードを改善する

投稿#276から、都道府県のマテリアルフローを推計するコードの改善に取り組んでいます。

今回の投稿では、対象都道府県の産業構造や取引状況が考慮に入れるため、投稿#284で算出した比率を平成28年経済センサスの製造品出荷額を参照し、重みづけによる補正を施します。

その後、投稿#283で算出した総流入量および純流入量を補正後の比率で各産業に配分します。

マテリアルフロー推計手順の概要

2015年大阪府のマテリアルフローの作成にあたり、島崎(2008)、天野ら(2001)の手法を参照しました。以下に推計手順の概要を示します。

  1. 「都道府県間年間流動量調査」のデータから、年間の品類別の純移出量、純移入量、内部流動量を求めます。
  2. 「都道府県間流動量3日間調査」のデータから、3日間の品類・品目別の純移出量、純移入量、内部流動量を求めます。
  3. 上記1.の結果に、2.で求めた3日間の品類と品目の比率を配分し、年間の品類・品目別の純移出量、純移入量、内部流動量を算出します。
  4. 上記3.の結果について、物流センサスでの調査品目と港湾統計での調査品目を統合して67品目に集約します。
  5. 「港湾統計(年報)2014」および「港湾統計(年報)2015」の「第3表 海上出入貨物表 (2)品種別都道府県別表(輸移出入) 」のデータから、2014年度(2014年4月〜2015年3月)の輸出入量を求めます。
  6. 上記5.の結果について、物流センサスでの調査品目と港湾統計での調査品目を統合して67品目に集約します。
  7. 上記4.と6.の結果を合計して、純流入量・純流出および総流入量・総流出量を算出します。
  8. 「着産業業種・品類品目別流動量3日間調査」のデータから着産業業種別比率を算出します。
  9. 対象都道府県の産業構造や取引状況が考慮に入れるため、上記8.で算出した比率を平成28年経済センサスの製造品出荷額を参照し、重みづけによる補正を施します。その後、上記7.で算出した総流入量および純流入量を補正後の比率で各産業に配分します。
  10. 「発産業業種別・品類品目別流動量3日間調査」のデータから発産業業種別比率を算出します。
  11. 対象都道府県の産業構造や取引状況を考慮に入れるため、上記10.で算出比率を平成28年経済センサスの原材料・燃料・電気使用額を参照し、重みづけによる補正を施します。その後、その後、上記7.で算出した総流出量および純流出量を補正後の比率で各産業に配分します。
  12. 都道府県品類別内部流動量を算出します。

今回の投稿は、上記手順の9.に該当します。
改善したコードは以下のようになります。

import requests
import json
import urllib.parse # requestsを使うため直接は不要ですが、念のため
import pandas as pd

# -----------------------------------------------------------
# 1. APIエンドポイントとパラメータの定義 (辞書形式に修正)
# -----------------------------------------------------------

APP_ID = "ここにAPIのIDを入力"

base_url = "http://api.e-stat.go.jp/rest/3.0/app/json/getStatsData"

# パラメータを辞書形式で定義。カンマ区切りの値もそのまま記述できます。
API_PARAMS = {
    "cdCat01": "01500,02040,02280,03050,03310,03480,03730,03880,04370,04510,04860,05070,05300,05870,06210,06490,06950,07220,07600,07930,08180,08530,08750,09010",
    #"cdArea": "00000,27000",
    "cdArea": "00000," + str(pref_code + 1) + "000",   # cdAreaをpref_codeを利用して指定。
    "cdTab": "9700",
    "lang": "J",
    "statsDataId": "0003389393",
    "metaGetFlg": "Y",
    "cntGetFlg": "N",
    "explanationGetFlg": "Y",
    "annotationGetFlg": "Y",
    "sectionHeaderFlg": "1",
    "replaceSpChars": "0"
}

# 辞書にAPP_IDを追加
API_PARAMS["appId"] = APP_ID

# -----------------------------------------------------------
# 2. APIコール関数
# -----------------------------------------------------------
def fetch_estat_data(base_url, params):
    """
    指定されたe-Stat APIのURLとパラメータ辞書からJSONデータを取得する。
    成功すればJSONオブジェクトを返し、失敗すればNoneを返す。
    """
    # 実行前にApp IDの確認を改めて行う
    if params.get("appId") == "YOUR_APP_ID":
        print("**************************************************************")
        print("!!! ATTENTION: アプリケーションIDが設定されていません。!!!")
        print("!!! コード内の APP_ID 変数を有効なIDに書き換えてください。")
        print("**************************************************************")
        return None

    # requestsライブラリに辞書形式のparamsを渡す
    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()  # 200以外のステータスコードで例外を発生

        # requestsが実際に使用した完全なURLを表示(デバッグ用)
        print(f"APIリクエストURL: {response.url}")

        data = response.json()

        # API処理の全体ステータスは 'RESULT' の下にあるため、パスを修正
        result_info = data.get('GET_STATS_DATA', {}).get('RESULT', {})
        result_status = result_info.get('STATUS')
        result_msg = result_info.get('ERROR_MSG')

        print(f"--- API処理結果 ---")
        print(f"ステータス (STATUS): {result_status}, メッセージ: {result_msg}\n")

        if result_status is None or result_status != 0:
            # STATUSが取得できなかった、またはエラーコードが返された場合
            print("❌ データ取得に失敗しました。詳細をデバッグ情報で確認してください。")
            print("--- デバッグ情報:APIレスポンスの生データ(JSON)---")
            print(json.dumps(data, indent=2, ensure_ascii=False)) 
            print("-----------------------------------------------------")
            return None

        print("✅ データ取得に成功しました。")
        return data

    except requests.exceptions.RequestException as e:
        # HTTPエラー(4xx, 5xx)の場合
        print(f"!!! HTTPリクエストエラーが発生しました: {e}")
        if response.status_code == 403:
             print("\n!!! 403 Forbidden: アプリケーションIDが無効または未設定の可能性が高いです。")
        return None
    except json.JSONDecodeError:
        print("!!! レスポンスがJSON形式ではありませんでした。")
        return None
    except Exception as e:
        print(f"!!! データの処理中に予期せぬエラーが発生しました: {e}")
        return None

# -----------------------------------------------------------
# 3. DataFrame変換関数
# -----------------------------------------------------------
def convert_to_dataframe(data):
    """
    e-StatのJSONデータから統計値(VALUE)を抽出し、pandas DataFrameに変換する。
    """
    if data is None:
        return pd.DataFrame() # 空のDataFrameを返す

    try:
        stat_data = data.get('GET_STATS_DATA', {}).get('STATISTICAL_DATA', {})

        # 統計データ本体
        value_data = stat_data.get('DATA_INF', {}).get('VALUE', [])

        if not value_data:
            print("統計データ(VALUE)が見つかりませんでした。")
            return pd.DataFrame()

        df = pd.DataFrame(value_data)

        # 不要なメタ情報列を削除
        cols_to_drop = ['@tab_code', '@span_code', '@sex_code', '@class_code']
        df = df.drop(columns=[col for col in cols_to_drop if col in df.columns], errors='ignore')

        # '$'列を 'Value' にリネームし、'@'プレフィックスを削除
        df = df.rename(columns={'$': 'Value'})
        df.columns = [col.lstrip('@') if col.startswith('@') else col for col in df.columns]

        # 'Value'列を数値型に変換(変換できないデータはNaNにする)
        df['Value'] = pd.to_numeric(df['Value'], errors='coerce')

        # DataFrameの確認と表示を容易にするために、列名を日本語メタデータに変換する処理を追加することを推奨
        # 例:df['area']のコード(00000, 27000)を、'全国', '大阪府' に変換するなど

        print(f"DataFrameに変換されました。行数: {len(df)}")
        return df

    except Exception as e:
        print(f"!!! DataFrame変換中にエラーが発生しました: {e}")
        return pd.DataFrame()

# -----------------------------------------------------------
# 4. メイン処理 (一括実行)
# -----------------------------------------------------------
if __name__ == "__main__":
    # 1. データ取得
    # base_urlとAPI_PARAMSを引数として渡すように変更
    raw_data = fetch_estat_data(base_url, API_PARAMS)

    # 2. DataFrameへの変換と表示
    if raw_data:
        df = convert_to_dataframe(raw_data)

        print("\n--- 統計データのDataFrame(最初の5行)---")
        print(df.head())
        print("\n--- 全体のDataFrameの表示(データ分析用)---")
    else:
        print("\nDataFrameは作成されませんでした。")

# 経済センサスにおける製造業のコードと業種名の一覧を格納したDataFrameを取得する

data = raw_data['GET_STATS_DATA']['STATISTICAL_DATA']['CLASS_INF']['CLASS_OBJ']

def extract_level_2(structure):
    """
    再帰的に辞書やリストを探索し、キーが'@level'で値が'2'であるものをすべて抽出する。
    ここでは、'@level': '2' が見つかったオブジェクト(辞書)全体を返します。
    """
    results = []

    if isinstance(structure, dict):
        # まず、現在の辞書自体が '@level': '2' を含むかチェック
        if (structure.get('@level') == '2') & (structure.get('@parentCode') != '00000'):
            # 条件に一致する辞書全体を結果に追加
            results.append(structure)

        # 次に、ネストされた要素を再帰的に探索
        for value in structure.values():
            if isinstance(value, (dict, list)):
                results.extend(extract_level_2(value))

    elif isinstance(structure, list):
        # リストの各要素に対して再帰的に探索
        for item in structure:
            results.extend(extract_level_2(item))

    return results
# 経済センサスの製造業のコードと業種名の一覧をDataFrameに格納
level_2_items = extract_level_2(data)
df_manu_industry = pd.DataFrame(level_2_items)
df_manu_industry = df_manu_industry.drop("@level", axis=1)
df_manu_industry['@name'] = df_manu_industry['@name'].str.replace(r'^\d{2}', '', regex=True)
df_manu_industry = df_manu_industry.rename(columns={'@code': 'コード', '@name': '業種'})
df_manu_industry = df_manu_industry.set_index("コード")

# 製造品出荷額を参照し、重みづけによる補正を行う

df_manu_value = df.copy()
# 不要な列を削除
cols_to_drop = ['tab', 'time', 'unit']
df_manu_value = df_manu_value.drop(columns=[col for col in cols_to_drop if col in df.columns], errors='ignore')
df_manu_value = df_manu_value.rename(columns={'cat01': 'コード', 'area': '地域', 'Value': '製造品出荷額'})
df_manu_value = df_manu_value.set_index('コード')

# 都道府県の製造業製造品出荷額のDataFrameを作成
df_manu_pref = df_manu_value.copy()

# '地域'列の値が '00000' ではない行だけを選択し、DataFrameを更新する
df_manu_pref = df_manu_pref[df_manu_pref['地域'] != '00000']
df_manu_pref = df_manu_pref.drop('地域', axis=1)
df_manu_pref = df_manu_pref.rename(columns={'製造品出荷額': 'pref'})

# 全国の製造品出荷額のDataFrameを作成
df_manu_japan = df_manu_value.copy()

# '地域'列の値が '00000' の行だけを選択し、DataFrameを更新する
df_manu_japan = df_manu_japan[df_manu_japan['地域'] == '00000']
df_manu_japan = df_manu_japan.drop('地域', axis=1)
df_manu_japan = df_manu_japan.rename(columns={'製造品出荷額': 'japan'})

# df_manu_prefとdf_manu_japanを横にconcateする
df_manu_value = pd.concat([df_manu_pref, df_manu_japan], axis=1)

df_manu_value = pd.concat([df_manu_industry, df_manu_value], axis=1)
df_manu_value = df_manu_value.reset_index()
df_manu_value = df_manu_value.drop('コード', axis=1)
df_manu_value = df_manu_value.set_index('業種')

# 新規列「補正前総流入量」、「全国平均」、「重み」、「補正後総流入量」を追加、算出する
df_agg_chaku_industry['合計'] = df_agg_chaku_industry.sum(axis=1)
df_manu_value['補正後総流入量'] = 0.0
for manu_ind in df_manu_value.index:
    df_manu_value.loc[manu_ind, '補正前総流入量'] = df_agg_chaku_industry.loc[manu_ind, '合計']
df_manu_value['全国平均'] = round(df_manu_value['japan'] / 47, 1)
df_manu_value['重み'] = df_manu_value['pref'] / df_manu_value['全国平均']
df_manu_value['補正後総流入量'] = round(df_manu_value['補正前総流入量'] * df_manu_value['重み'], 1)

# 製造業において、補正後総発量を各品目に配分する
df_after_chaku_industry = df_agg_chaku_industry.copy()
for manu_ind in df_manu_value.index:
    for item in df_after_chaku_industry.columns:
        if item == '合計':
            df_after_chaku_industry.loc[manu_ind, item] = df_manu_value.loc[manu_ind, '補正後総流入量']
        else:
            df_after_chaku_industry.loc[manu_ind, item] = df_manu_value.loc[manu_ind, '補正後総流入量'] * \
                                                        df_agg_chaku_industry.loc[manu_ind, item] / \
                                                        df_agg_chaku_industry.loc[manu_ind, '合計']

# 新規行「合計」を作成し、列和を格納したDataFrameを返す関数を定義
def append_sum_row_label(df):
    df.loc['合計'] = df.sum(numeric_only=True)
    return df

# 補正後の着産業業種・品類別品目別流動量における業種別比率を算出する
df_chaku_item_to_industry = df_after_chaku_industry.copy()
# 列「合計」を削除する
df_chaku_item_to_industry = df_chaku_item_to_industry.drop('合計', axis=1)
# 行「合計」を追加する
df_chaku_item_to_industry = append_sum_row_label(df_chaku_item_to_industry)

# 各品目における業種別比率を算出する
for item in df_chaku_item_to_industry.columns:
    for industry in df_chaku_item_to_industry.index:
        if industry == '合計':
            continue
        else:
            df_chaku_item_to_industry.loc[industry, item] = df_chaku_item_to_industry.loc[industry, item] / \
                                                        df_chaku_item_to_industry.loc['合計', item]
df_chaku_item_to_industry = df_chaku_item_to_industry.drop('合計')

# 年間集約品目別総流入量を業種別比率で各産業に配分する
for item in df_chaku_item_to_industry.columns:
    for industry in df_chaku_item_to_industry.index:
        df_chaku_item_to_industry.loc[industry, item] = df_chaku_item_to_industry.loc[industry, item] \
                                                * df_agg_year_pref_item.loc[item, "総流入量"] 

# 着産業業種一覧.csvを読み込む
df_chaku_sector_view = pd.read_csv('着産業業種一覧.csv', index_col=0)

# 合計列を追加する
df_chaku_item_to_industry['合計'] = df_chaku_item_to_industry.sum(axis=1)

# df_item_to_industryに新規に列「sector」を追加する
df_chaku_item_to_industry['sector'] = df_chaku_sector_view['sector']

# 産業大分類別総流入量のDataFrameを作成する
df_sector_chaku = pd.DataFrame([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
                               index=['農林漁業','鉱業','建設業','製造業','卸売業','小売業',
                                      '金融・公務他','運輸・通信業','電気・ガス・水道業','輸出'],
                               columns=['総流入量']
                              )
for sector, group in df_chaku_item_to_industry.groupby('sector'):
    df_sector_chaku.loc[sector, '総流入量'] = round(group['合計'].sum(), 1)

# 都道府県品類別純流入量を算出する
# 集約品目と品類名との対応表を読み込む
df_item_to_goods = pd.read_csv('集約品目と品類名との対応表.csv', index_col=0)
# 対応表のDataFrameに新規列「純流入量」を追加する
df_item_to_goods['純流入量'] = df_agg_year_pref_item['純移入'] + df_agg_year_pref_item['輸入']

# 都道府県品類別純流入量のデータフレームを作成する
df_goods_netinflow = pd.DataFrame([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,],
                               index=['農水産品','林産品','鉱産品','金属機械工業品','化学工業品','軽工業品',
                                      '雑工業品','排出物','特殊品'],
                               columns=['純流入量']
                              )
for goods, group in df_item_to_goods.groupby("goods"):
    df_goods_netinflow.loc[goods, "純流入量"] = round(group["純流入量"].sum(), 1)

引用文献・参考文献

Follow me!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です