Volga-محاسبه در صورت تقاضا در AI/ML در زمان واقعی-نمای کلی و معماری


نویسنده (ها): آندری نویتسکی

در ابتدا منتشر شده در به سمت هوش مصنوعیبشر

TL ؛ دکتر ولگا یک موتور محاسبه پردازش/ویژگی در زمان واقعی است که برای AI مدرن متناسب است/مولکولبشر این برنامه برای پشتیبانی از انواع مختلف ویژگی ها ، از جمله پخش (آنلاین) ، دسته (آفلاین) و ویژگی های موجود در تقاضا از طریق یک فشار ترکیبی+معماری کشش: یک سفارشی طراحی شده است. موتور جریان (برای آنلاین+آفلاین) و یک لایه محاسباتی در صورت تقاضا (برای تقاضا). در این پست ، ما به عمق لایه محاسبات بر روی تقاضا ، ویژگی های مورد تقاضا ، استفاده از موارد و معماری شیرجه خواهیم زد.

محتوا:

  • چه چیزی است و برای چه چیزی است
  • نمونه
  • معماری
  • نمای کلی API
  • قسمت مفقود شده از اکوسیستم ری
  • چگونه جریان و تقاضا با هم کار می کند
  • مراحل بعدی

چه چیزی است و برای چه چیزی است

بیشتر سیستم های زمان واقعی بر روی جریان رویدادها (به عنوان مثال کلیک/خریدهای کاربر ، درخواست های سواری ، معاملات کارت اعتباری و غیره) کار می کنند و نمایندگی می کنند سیستم کاملاً محور: کلیه تحولات داده/منطق سفارشی می تواند به نوعی با رویدادی که باعث ایجاد آن شده است گره خورده باشد و این برای هر بخشی از سیستم صادق است. این نوع سیستم ها را می توان به تنهایی توسط یک موتور پردازش جریان انجام داد.

مولکول بارهای کاری کمی متفاوت است: آنها متعلق به یک کلاس از سیستم های مبتنی بر درخواست (در مورد ما ، ما در مورد درخواست های استنباط مدل صحبت می کنیم). این کلاس از سیستم ها شامل بیشتر برنامه های وب مدرن است که معماری آنها مبتنی بر مدل درخواست پاسخ است و با استفاده از مفهوم یک سرور (یا یک سرویس وب) ساخته شده است.

به طور کلی ، الگوی درخواست پاسخ نیز می تواند به یک سیستم کاملاً رویداد محور تبدیل شود که در آن هر درخواست یک رویداد جداگانه است (این یک جهت طراحی مناسب برای کشف است). با این حال ، در عمل ، سیستم های مبتنی بر درخواست معمولاً بدون تابعیت هستند و نیازهای متفاوتی برای مقیاس پذیری ، تأخیر ، توان ، در دسترس بودن داده ها و تحمل گسل دارند و در نتیجه نیازهای پشته زیرساختی متفاوتی را در مقایسه با آنچه موتور جریان ارائه می دهد ، ایجاد می کند.

در نتیجه ، در زمینه پردازش داده های زمان واقعی و تولید ویژگی ها ، بیشتر سیستم های مبتنی بر ML به لایه ای نیاز دارند که بتواند درخواست های ورودی را با تأخیر به حداقل برساند ، منطق محاسبات خودسرانه را انجام دهد ، و نتایج را در اسرع وقت ارائه دهد تا بتواند در سایر قسمت های سیستم (به عنوان مثال مدل خدمت) یا مستقیماً توسط کاربر استفاده شود-این همان چیزی است که ما می نامیم لایه محاسباتی در صورت تقاضابشر

نمونه

برخی از نمونه های سیستم های ML در زمان واقعی که نیاز به محاسبات زمان درخواست در صورت تقاضا دارند و نمی توانند فقط به یک موتور جریان متکی باشند ، ممکن است شامل موارد زیر باشد:

  • یک سیستم شخصی سازی جستجو که به مختصات GPS کاربر متکی است: داده ها فقط در زمان درخواست در دسترس است و باید بلافاصله برای نتایج مربوطه اداره شود.
  • بوها سیستم پیشنهادی، جایی که پاسخ ها به یک محاسبات گران قیمت متکی هستند (به عنوان مثال ، تعبیه محصول نقطه ، GPUعملیات مبتنی بر و غیره) و/یا ارتباط با خدمات شخص ثالث (به عنوان مثال ، پرس و جو مدل دیگری) -اداره این کار در یک موتور جریان باعث ایجاد تنگنا می شود و به یک طراحی بسیار دقیق نیاز دارد.

این بخشی است که بسیاری از موتورهای جریان “AI/ML آماده” از دست می دهند: پردازش زمان رویداد به تنهایی برای تأمین تمام نیازهای AI/ML در زمان واقعی کافی نیست. به همین دلیل ، ولگا معماری خود را به فشار دادن، جایی که موتور جریان پادشاه است ، و همچنین معرفی می کند قسمت کردن، توسط لایه محاسبات در صورت تقاضا ، که محاسبه زمان درخواست انجام می شود ، اداره می شود.

بیشتر سیستم عامل های مدرن ویژگی/داده های ML معماری مشابهی را اتخاذ می کنند (ویژگی های تقاضا در تکتونبا استخراج کنندگان ویژگی در رازیانهبا حل کننده ها در گچ).

مثال خوب دیگر این است خط لوله ویژگی در زمان واقعی سیستم پیشنهاد دهنده Pinterest، که همچنین جدایی بین محاسبات زمان رویداد ، توسط یک موتور جریان (FLINK) و محاسبه زمان درخواست ، توسط یک سرویس سفارشی انجام می شود.

خط لوله تولید ویژگی در زمان واقعی در Pinterest

معماری

به طور خلاصه ، در ولگا ، لایه محاسبات بر روی تقاضا مجموعه ای از کارگرانی است که برای اجرای منطق تعریف شده توسط کاربر خودسرانه استفاده می شود (آنچه ما آن را می نامیم ویژگی در صورت تقاضا) در زمان درخواست/استنتاج و خدمت به کاربر. ساخته شده است تا با موتور جریان Volga قابل تعامل باشد ، بنابراین کل سیستم می تواند DAG های محاسباتی دلخواه را اجرا کند که شامل اجرای در هر دو رویداد و درخواست است. بیایید نگاهی به قسمت های کار سیستم و چرخه عمر درخواست بیندازیم.

معماری لایه محاسبه در تقاضا

این اولین مؤلفه ای است که به بازی می رسد. در OnDemandCoordinator یک بازیگر مسئول ارکستر و ردیابی است OnDemandServers– بازیگران کارگر (بیشتر در زیر). در OnDemandCoordinator Isolation Isolation Logical Worker (پیکربندی که ویژگی های هر کارگر را بر عهده دارد) ، مقیاس بالا و پایین ، بررسی های بهداشتی و در صورت لزوم دوباره راه اندازی می شود.

مؤلفه بیرونی که درخواست های ورودی را بر عهده دارد و آنها را بین گره های خوشه ای توزیع می کند. این معمولاً یک منبع مبتنی بر ابر است (برای معیارهای ما ، ما از آنها استفاده کردیم AWS Application Balancer) ، اما در عمل ، می تواند هر نوع تنظیم دیگری باشد (به عنوان مثال ، nginx/metallb). توجه داشته باشید که متعادل کننده بار بخشی از ولگا نیست و یک الگوی استقرار به احتمال زیاد را نشان می دهد.

یک کارگر پایتون که منطق توصیف شده در ویژگی های تقاضا را انجام می دهد. روند کارگر نمونه ای از a را اجرا می کند سرور Starlette برای رسیدگی به درخواست های دریافتی ، هر یک به یک درگاه ثابت روی یک گره میزبان گوش می دهند. به این ترتیب ، سیستم عامل (فقط لینوکس) همه درخواستهای کارگران را در آن گره به کار می برد و بار را متعادل نگه می دارد.

هر کارگر با لیستی از تعاریف ویژگی که قرار است از آن استفاده می شود آغاز می شود (شروع توسط این کار انجام می شود OnDemandCoordinator). وقتی درخواست می رسد ، OnDemandServer پارس هایی که ویژگی های آن را هدف قرار می دهد ، قرار است DAG از همه ویژگی های وابسته را اجرا و گردآوری کند. به یاد داشته باشید که ولگا از دو نوع ویژگی پشتیبانی می کند: on_demand (که توسط لایه بر روی تقاضا انجام شده است) و pipeline (اداره شده توسط موتور جریان).

از آنجا که قدرتمندترین جنبه ولگا این است که از هر دو رویداد و درخواست محاسبه زمان پشتیبانی می کند ، on_demand ویژگی ها می توانند به هر دو دیگری بستگی داشته باشند on_demand ویژگی ها و همچنین pipeline ویژگی ها این واقعیت یک جریان ویژه را ایجاد می کند: ویژگی های DAG از نظر توپولوژیکی مرتب شده و به صورت سفارش اجرا می شود. on_demand ویژگی ها با استفاده از نتایج وابستگان خود به عنوان ورودی اجرا می شوند. در محیط تقاضا ، pipeline ویژگی ها به سادگی همانطور که در ذخیره سازی خوانده می شود ، رفتار می شود: جریان پایان به پایان ولگا این است که اجرای واقعی از pipeline ویژگی ها توسط موتور جریان استفاده می شود ، که نتایج اجرای خط لوله را به صورت ناهمزمان ذخیره سازی می نویسد. کارگر در صورت تقاضا به سادگی نتایج ویژگی خط لوله مربوطه را می خواند (روشی که آن را می خواند نیز قابل تنظیم است OnDemandDataConnector، اطلاعات بیشتر در مورد آن در زیر) و از آن به عنوان ورودی برای منطق تقاضا استفاده می کند.

ذخیره سازی انتزاعی است که بین قطعات فشار و کشش مشترک است: جریان کار نتیجه گیری خط لوله در ذخیره سازی ، کارگران در صورت تقاضا محاسبات ناهمزمان را بر اساس داده های مادی انجام می دهند و نتایج را ارائه می دهند. توجه داشته باشید که در محیط در صورت تقاضا ، ذخیره فقط خواندنی است (on_demand ویژگی ها نیازی به ذخیره چیزی ندارند).

ذخیره سازی یک رابط قابل تنظیم است که می تواند از یک باطن دلخواه استفاده کند (از طریق اجرای PipelineDataConnector وت OnDemandDataConnector). توجه داشته باشید که از آنجا که ما می توانیم Volga را در هر دو حالت آنلاین و آفلاین اجرا کنیم ، هر حالت دارای نیازهای ذخیره سازی متفاوتی است ، به عنوان مثال آنلاین نیاز به به حداقل رساندن تأخیر خواندن/نوشتن (Redis/Scylla) دارد ، آفلاین برای فروشگاه بهینه شده ظرفیت است (HDFS ، دریاچه ها): این چیزی است که کاربر باید در نظر بگیرد.

نمای کلی API

ویژگی های موجود در تقاضا با استفاده از on_demand دکوراتور و می تواند به ویژگی های خط لوله یا موارد دیگر بستگی داشته باشد on_demand ویژگی ها

from volga.api.source import source
from volga.api.on_demand import on_demand

# mock simple pipeline feature via streaming source
@source(TestEntity)
def test_feature() -> Connector:
return MockOnlineConnector.with_periodic_items(
items=[...]
period_s=1
)

# on-demand features
@on_demand(dependencies=[('test_feature', 'latest'])
def simple_feature(
dep: TestEntity,
multiplier: float = 1.0
) -> TestEntity:
"""Simple on-demand feature that multiplies the value"""
return TestEntity(
id=dep.id,
value=dep.value * multiplier,
timestamp=datetime.now()
)

در dependencies پارامتر ویژگی های وابسته را توصیف می کند. سفارش باید آرگومان های مربوطه را در عملکرد مطابقت دهد. توجه داشته باشید که وابستگی یک نوع 2 است: مقدار اول نام ویژگی وابسته است و مورد دوم آن query_name تعریف شده در OnDemandDataConnector (MockDataConnector در مورد ما): این تعیین می کند که چگونه ارزش ها را برای آنها واگذار می کنیم test_feature - در این حالت ، ما به سادگی آخرین را واکشی می کنیم (بیشتر در مورد نمایش داده های اتصال داده در زیر).

کارگران را شروع کنید و ویژگی های ثبت نام را برای ارائه خدمات:

# start coordinator first
coordinator = create_on_demand_coordinator(OnDemandConfig(
num_servers_per_node=2,
server_port=DEFAULT_ON_DEMAND_SERVER_PORT,
data_connector=OnDemandDataConnectorConfig(
connector_class=MockOnDemandDataConnector,
connector_args={}
)
))
ray.get(coordinator.start.remote())

# register 'simple_feature'
ray.get(coordinator.register_features.remote(
FeatureRepository.get_features_with_deps(['simple_feature'])
))

درخواست را با استفاده از کلیدهای مورد نیاز و ویژگی های پرس و جو در زمان واقعی تهیه کنید:

request = OnDemandRequest(
target_features=['simple_feature'],
feature_keys={
'simple_feature': [
{'id': 'test-id'},
{'id': 'test-id-1'},
{'id': 'test-id-2'}
]
},
udf_args={
'simple_feature': {'multiplier': 2.0}
}
)

client = OnDemandClient(DEFAULT_ON_DEMAND_CLIENT_URL)
response = self.loop.run_until_complete(client.request(request))
pprint(response.results)

...

OnDemandResponse(results={'simple_feature': [
[{'id': 'test-id', 'value': 4.0, 'timestamp': '2025-04-06T16:30:24.324526'}],
[{'id': 'test-id-1', 'value': 6.0, 'timestamp': '2025-04-06T16:30:24.324536'}],
[{'id': 'test-id-2', 'value': 8.0, 'timestamp': '2025-04-06T16:30:24.324541'}]
]}, server_id=11)

قسمت مفقود شده از اکوسیستم ری

یک خواننده دقیق ممکن است توجه داشته باشد که معماری در صورت تقاضا تا حدودی شبیه به آن است ری خدمت (مدل خدمات زیرساخت مورد استفاده ری). در واقع ، هر دو سیستم مبتنی بر درخواست هستند و برای یکدیگر مکمل هستند ، زیرا هر دو سیستم نمایانگر قسمتهای حیاتی جریان استنباط مدل پایان به پایان هستند: ابتدا به دست آوردن ویژگی ها و سپس استفاده از آنها برای استنتاج واقعی.

در حالی که ری بخش خدمت مدل را فراهم می کند ، سرویس/محاسبه ویژگی از دست رفته است ، و به کاربران نیاز دارد تا به لایه های ارائه دهنده داده های سفارشی اعتماد کنند ، که باعث افزایش چشمگیر پیچیدگی و هزینه های عملیاتی اجرای ML در زمان واقعی می شود.

لایه بر روی تقاضا برای پر کردن این نقطه طراحی شده است و به همراه مدل سرو ، به مرز اولیه کاربر برای سیستم های مدرن مبتنی بر ML تبدیل می شود. این امر به حرکت به سمت یک طراحی سیستم همگن تر ، از بین بردن وابستگی های خارج از کشور و با موتور جریان Volga کمک می کند تا پردازش داده های زمان واقعی را در بالای پرتو متحد کند.

چگونه جریان و تقاضا با هم کار می کند

در این بخش ذخیره مشترک بین موتور جریان (فشار) و قطعات تقاضا (کشش) و چگونگی ارتباط لایه بر روی آن با آن صحبت می شود. همه on_demand ویژگی های مستقیم یا غیرمستقیم به آن بستگی دارد pipeline نتایج ویژگی ها ، که در ذخیره سازی مشترک وجود دارد (این شامل صرفاً سرو است pipeline ویژگی ها) برای ساده کردن API تعریف ویژگی و مخفی کردن کنترل لایه داده از کاربر ، تصمیم گرفته شد تا همه منطق واکشی داده های مربوط به ذخیره سازی را از منطق ویژگی واقعی به یک کلاس جداگانه که می تواند در بین ویژگی های مختلف استفاده شود ، انتزاع کند: OnDemandDataConnector (به نمودار معماری در بالا مراجعه کنید).

پس از pipeline مشاغل می توانند از نظر معنایی نتایج متفاوتی به دست آورند ، روشی که ما برای آن واگذار می کنیم on_demand همچنین ویژگی ها باید برای انعکاس این معناشناسی قابل تنظیم باشند ، به عنوان مثال برخی از ویژگی ها به جدیدترین مقادیر نیاز دارند ، برخی از آنها به داده های پنجره تا زمان مشخص نیاز دارند ، برخی نیاز به انجام پرس و جوهای پیچیده تر مانند جستجوی نزدیکترین همسایه (RAGS) دارند. بیایید نگاهی بیندازیم InMemoryActorOnDemandDataConnector مورد استفاده در محیط DEV محلی (نشان دهنده رابط کاربری با InMemoryCacheActor):

class InMemoryActorOnDemandDataConnector(OnDemandDataConnector):

def __init__(self):
self.cache_actor = None

async def init(self):
self.cache_actor = get_or_create_in_memory_cache_actor()

def query_dict(self) -> Dict[str, Callable]:
return {
'latest': self.fetch_latest,
'range': self.fetch_range,
}

async def fetch_latest(
self,
feature_name: str,
keys: List[Dict[str, Any]]
) -> List[List[Any]]:
return await self.cache_actor.get_latest.remote(feature_name, keys)

async def fetch_range(
self,
feature_name: str,
keys: List[Dict[str, Any]],
start: Optional[Decimal],
end: Optional[Decimal]
) -> List[List[Any]]:
return await self.cache_actor.get_range.remote(
feature_name, keys, start, end
)

async def close(self):
pass

روش اصلی که کاربر برای تعریف آن نیاز دارد query_dict: این یک تابع واکشی دلخواه را به یک نام ساده که ما به آن منتقل می کنیم نقشه می کند on_demand دکوراتور هنگام ایجاد ویژگی ها (به یاد داشته باشید latest پارام در sample_feature مثال بالا). آرگومان های منتقل شده به این توابع با استفاده از همان نام های ARG به عنوان کلیدها از شیء درخواست جدا می شوند.

این جداسازی واکشی داده ها از منطق ویژگی ، کدهای پاک کننده و قابل استفاده مجدد و همچنین دسترسی ایمن ، کنترل شده و بهینه شده به لایه داده را امکان پذیر می کند-کد تعریف شده توسط کاربر قادر به چکش ذخیره سازی یا انجام هر کاری نامشخص نیست.

مراحل بعدی

  • ویژگی های درخواستی در حال حاضر فقط در حالت آنلاین کار می کنند. ولگا از محاسبه ویژگی های تقاضا در داده های تاریخی پشتیبانی نمی کند. این یک مشکل مهندسی جالب است که نیاز به تبدیل سیستم های مبتنی بر درخواست را به یک جریان رویداد (مناسب برای حالت آفلاین) و ایجاد خط لوله جریان برای اجرای کامل در موتور جریان دارد.
  • همانطور که ممکن است توجه داشته باشید ، ویژگی های درخواستی از درخواست کاربر پارامترهای کلی و پارامترهای اتصال داده دریافت می کنند. اگر بخواهیم آن را از ویژگی وابسته بگیریم چه می شود؟ این نیاز به ایجاد یک دارد arg_mapping برای ترسیم آرگومان ها به توابع و به روزرسانی منطق سفارش دهنده مجری.
  • برخی از ویژگی های تقاضا ممکن است به وضعیت محلی نیاز داشته باشد (به عنوان مثال اولیه کردن مشتری برای یک سرویس شخص ثالث).
  • تحمل گسل با بررسی های بهداشتی و راه اندازی مجدد باید اجرا شود.
  • اعدام فعلی روی یک حلقه Asyncio است. استخر نخ و استخر فرایند/بازیگر مورد نیاز است.

اگر علاقه مند به کمک به اینها و مشارکت کننده هستید ، بررسی کنید نقشه راه و احساس راحتی کنید!

در پست بعدی ، معیارهای آزمایش بار را اجرا خواهیم کرد و نشان می دهیم که چگونه لایه محاسبه بر روی تقاضا تحت بار درخواست بالایی انجام می شود.

با تشکر از خواندن! لطفا پروژه را در آن ستاره کنید گیتوب، به جامعه بپیوندید لاغر، وبلاگ را به اشتراک بگذارید و نظرات خود را ترک کنید.

منتشر شده از طریق به سمت هوش مصنوعی



منبع: https://towardsai.net/p/machine-learning/volga-on-demand-compute-in-real-time-ai-ml-overview-and-architecture