| |
| import pandas as pd |
| import json |
| import os |
| import re |
|
|
| """ |
| example: |
| {'conversations': [{'from': 'system', 'value': "Serena (Princess)'s Persona: She is a considerate, kind-hearted, courageous and optimistic princess.Serena (Princess) is a considerate, kind-hearted courageous and optimistic princess.She is a real princess. She is a female. Her name is Serena. She is 24 years old. She loves cats more than dogs. She is a foodie. She is good at cooking. She loves animation and cosplay. She is always a loyal companion. She is kind-hearted, lovable and optimistic. She will do anything to cheer you up. She will comfort you when you feel unhappy. She is a keen listener when you're talkin |
| g. She is expecting an unmatched fairytale with someone.\nJason's Details: Jason |
| , a 23-year-old male."}, {'from': 'human', 'value': 'Jason: *Underneath a bloomi |
| ng cherry blossom tree* Princess Serena!'}, {'from': 'gpt', 'value': 'Serena: T |
| he eerie silence of the kingdom concerns me. I fear it is the calm before a storm |
| .'}, {'from': 'human', 'value': 'Jason: We must be ready for whatever evil dares |
| threaten our peaceful realm. What measures have we taken?'}, {'from': 'gpt', 'va |
| lue': 'Serena: The Royal Guards are training day and night, fortifications are |
| improved, and scouts keep a vigilant eye on our borders, Princess.'}, {'from': |
| 'human', 'value': 'Jason: A splendid effort. But, we must also seek help fr |
| om our allies. Have we received word from them?'}, {'from': 'gpt', 'value': |
| 'Serena: Your message was dispatched yesterday, Princess. Their response |
| will guide our further actions.'}, {'from': 'human', 'value': "Jason: *pri |
| nce Jason is a horrible child at age 12,his siblings hate him,so does his |
| father, the advisors and royal court don't want nothing to do with him eit |
| her,queen Serena also had enough of his actions,make someone else heir and |
| take Jason birthright*"}, ...,{'from': 'human', 'value': 'Jason: .'}], |
| |
| 'chosen': {'from': 'gpt', 'value': "*As the meal progresses, Queen Serena occasionally glances at Jason, her expression |
| a mix of sadness and determination. When the breakfast dishes are cleared |
| away, King Adrian speaks up again.*\nKing Adrian: Jason, you'll be escort |
| ed to your new quarters by one of the guards. You're dismissed."}, |
| |
| 'rejeted': {'from': 'gpt', 'value': "*After breakfast, Jason reluctantly follows his new tutor, Mr. Elliot, to the study room. Mr. Elliot |
| , a middle-aged man with a stern expression, begins explaining the rules and expectations for Jason's new life outside the royal family.*"}} |
| """ |
|
|
|
|
| |
| def parse_mistral_template(prompt_string): |
| """ |
| 解析 Mistral 风格的模板字符串,还原成对话历史列表。 |
| |
| Args: |
| prompt_string (str): 包含 Mistral 模板标记的字符串。 |
| |
| Returns: |
| list: 一个包含对话轮次的列表,每个轮次是 {"from": role, "value": content} 格式的字典。 |
| 如果解析失败或格式不符,可能返回空列表或部分解析结果。 |
| """ |
| conversation_history = [] |
| |
| |
| segments = prompt_string.split("<|im_start|>") |
|
|
| for j, segment in enumerate(segments): |
| segment = segment.strip() |
| if not segment: |
| continue |
|
|
| |
| end_marker_index = segment.find("<|im_end|>") |
| if end_marker_index == -1 and j != len(segments) - 1: |
| print(f"警告: 在段落中未找到结束标记 '<|im_end|>',跳过此段落:\n{segment[:100]}...") |
| continue |
|
|
| |
| content_block = segment[:end_marker_index].strip() |
|
|
| |
| parts = content_block.split('\n', 1) |
| if not parts: |
| print(f"警告: 无法解析内容块的角色和内容,跳过: {content_block[:100]}...") |
| continue |
|
|
| role_raw = parts[0].strip().lower() |
| value = parts[1].strip() if len(parts) > 1 else "" |
|
|
| |
| role_mapped = "" |
| if role_raw == "system": |
| role_mapped = "system" |
| elif role_raw == "user": |
| role_mapped = "human" |
| elif role_raw == "assistant" or role_raw == "assistan": |
| role_mapped = "gpt" |
| else: |
| print(f"警告: 无法识别的角色 '{role_raw}',跳过此轮次。") |
| continue |
|
|
| |
| conversation_history.append({"from": role_mapped, "value": value}) |
|
|
| return conversation_history |
|
|
| |
| def normalize_conversation_history(raw_history, entry_index): |
| """ |
| 规范化原始解析出的对话历史,确保满足以下规则: |
| 1. 可选的单个 system 开头。 |
| 2. 第一个非 system 回合必须是 human (如果不是,前面插入空 human)。 |
| 3. 后续回合严格在 human 和 gpt 之间交替。 |
| 4. 连续相同角色的回合将被合并 (value 用 \n 连接)。 |
| |
| Args: |
| raw_history (list): 从 parse_mistral_template 输出的原始对话历史列表。 |
| entry_index (int): 当前处理的数据行索引(用于日志记录)。 |
| |
| Returns: |
| list: 规范化后的对话历史列表。如果出现无法处理的结构错误,返回空列表 `[]`。 |
| """ |
| if not raw_history: |
| return [] |
|
|
| normalized_history = [] |
| processed_raw_index = 0 |
|
|
| |
| if raw_history[0]["from"] == "system": |
| normalized_history.append(raw_history[0]) |
| processed_raw_index = 1 |
| if len(raw_history) == 1: |
| print(f"警告 (行 {entry_index+1}): 原始历史只包含 system prompt,无法规范化为交替对话。") |
| return [] |
|
|
| |
| if processed_raw_index >= len(raw_history): |
| print(f"警告 (行 {entry_index+1}): 原始历史在 system prompt 后为空,无法规范化。") |
| return [] |
|
|
| |
| first_conv_turn = raw_history[processed_raw_index] |
| if first_conv_turn["from"] == "gpt": |
| |
| normalized_history.append({"from": "human", "value": ""}) |
| |
| elif first_conv_turn["from"] == "human": |
| |
| normalized_history.append(first_conv_turn) |
| processed_raw_index += 1 |
| else: |
| print(f"错误 (行 {entry_index+1}): 处理第一个对话回合时遇到非预期的角色 '{first_conv_turn['from']}'。") |
| return [] |
|
|
| |
| while processed_raw_index < len(raw_history): |
| current_raw_turn = raw_history[processed_raw_index] |
| last_normalized_turn = normalized_history[-1] |
|
|
| |
| if current_raw_turn["from"] == last_normalized_turn["from"]: |
| |
| |
| if last_normalized_turn["value"] and current_raw_turn["value"]: |
| last_normalized_turn["value"] += "\n" + current_raw_turn["value"] |
| elif current_raw_turn["value"]: |
| last_normalized_turn["value"] = current_raw_turn["value"] |
| |
| processed_raw_index += 1 |
| else: |
| |
| expected_next_role = "" |
| if last_normalized_turn["from"] == "human": |
| expected_next_role = "gpt" |
| elif last_normalized_turn["from"] == "gpt": |
| expected_next_role = "human" |
| |
|
|
| if current_raw_turn["from"] == expected_next_role: |
| |
| normalized_history.append(current_raw_turn) |
| processed_raw_index += 1 |
| else: |
| |
| print(f"错误 (行 {entry_index+1}): 对话角色顺序错误。期望在 '{last_normalized_turn['from']}' 之后是 '{expected_next_role}',但得到 '{current_raw_turn['from']}'。") |
| return [] |
|
|
| if normalized_history[-1]["from"] == "gpt": |
| del normalized_history[-1] |
| |
| |
| if len(normalized_history) > 21: |
| final_history = [normalized_history[0]] + normalized_history[-20:] |
| else: |
| final_history = normalized_history |
| return final_history |
|
|
| def process_prompt(prompt_to_process, entry_index, chosen_response, rejected_response): |
| |
| raw_parsed_history = parse_mistral_template(prompt_to_process) |
| conversation_history = normalize_conversation_history(raw_parsed_history, entry_index) |
| if len(conversation_history) == 0: |
| ValueError(f"错误 (行 {entry_index+1}): 解析出的对话历史为空,跳过此条记录。") |
| |
| |
| return { |
| "conversations": conversation_history, |
| "chosen": {"from": "gpt", "value": chosen_response}, |
| "rejected": {"from": "gpt", "value": rejected_response} |
| } |
| |
| |
| def convert_parquet_to_sharegpt_dpo_v4(parquet_paths, json_path): |
| """ |
| 将 Parquet 格式的 DPO 数据转换为 ShareGPT DPO JSON 格式。 |
| 增加了对特定 model_id 的 prompt 进行 Mistral 模板解析,并通过规范化函数 |
| 强制实现 system -> human -> gpt -> human ... 的交替和合并。 |
| |
| Args: |
| parquet_path (str): 输入 Parquet 文件的路径。 |
| json_path (str): 输出 JSON 文件的路径。 |
| """ |
| if not isinstance(parquet_paths, list): |
| parquet_paths = [parquet_paths] |
| |
| merged_data = [] |
|
|
| for parquet_path in parquet_paths: |
| print(f"开始转换文件: {parquet_path}") |
|
|
| try: |
| |
| df = pd.read_parquet(parquet_path) |
| print(f"成功读取 Parquet 文件,包含 {len(df)} 条记录。") |
| |
|
|
| required_columns = ['chosen_prompt', 'reject_prompt','chosen', 'reject', 'chosen_model', 'reject_model'] |
| if not all(col in df.columns for col in required_columns): |
| missing = [col for col in required_columns if col not in df.columns] |
| print(f"错误: Parquet 文件缺少必需的列: {missing}") |
| return |
|
|
| except Exception as e: |
| print(f"错误: 读取 Parquet 文件时出错: {e}") |
| return |
|
|
| sharegpt_data = [] |
| skipped_basic_validation = 0 |
|
|
| print("开始处理数据行...") |
| for index, row in df.iterrows(): |
| chosen_prompt = row.get('chosen_prompt', None) |
| rejected_prompt = row.get('reject_prompt', None) |
| chosen_response = row.get('chosen', None) |
| rejected_response = row.get('reject', None) |
|
|
| if '<|im_start|>' in chosen_prompt: |
| prompt = chosen_prompt |
| elif '<|im_start|>' in rejected_prompt: |
| prompt = rejected_prompt |
| else: |
| print("没有<im_start>符号出现在prompt中!!!") |
|
|
| |
| try: |
| if not rejected_prompt or not chosen_response or not rejected_response: |
| skipped_basic_validation += 1 |
| continue |
| except: |
| print(f"{index}", row) |
| |
| skipped_basic_validation += 1 |
| continue |
| |
| conv_chose = process_prompt(prompt, index, chosen_response, rejected_response) |
| sharegpt_data.append(conv_chose) |
|
|
| print(f"数据处理完成。成功转换{len(sharegpt_data)} 条记录。\n\n") |
| merged_data = merged_data + sharegpt_data |
|
|
| |
| |
| |
| print(f"正在将结果写入 JSON 文件: {json_path}, 一共保存了 {len(merged_data)} 条记录。") |
| with open(json_path, 'w') as f: |
| json.dump(merged_data, f, indent=2) |
|
|
| if __name__ == "__main__": |
| |
| |
| |
| |
|
|
| |
| parquet_file_list = [] |
| root_dir = '/home/hsichen/LLaMA-Factory/data/chaiting/batch_data' |
| for dir in os.listdir(root_dir): |
| full_dir = os.path.join(root_dir, dir) |
| parquet_file_list.append(f"{full_dir}/data/train-00000-of-00001.parquet") |
| |
| output_json_file = "/home/hsichen/LLaMA-Factory/data/chaiting/batch_data_merged_9w.json" |
| convert_parquet_to_sharegpt_dpo_v4(parquet_file_list, output_json_file) |