2015年12月22日火曜日

動画配信サーバを作ってみた

Google、Apple、Amazon、IT関連の大企業はみんな動画配信サービスを行っています。
世の中に動画配信サーバの作り方みたいのがあまりないし、セキュアに配信しないといけないし、どうやってやっているのか勉強するために動画配信サーバを作ってみました。

まず、iPhoneのコンテンツ配信の仕様のドキュメント、Safari Web ContentGuideを見ます。

https://developer.apple.com/library/safari/documentation/AppleApplications/Reference/SafariWebContent/

pdf版もあるのでダウンロードするのが良いと思います。

読むのが大変なので要約すると、HTTPでVideoやAudioを配信するサーバはRangeヘッダーに対応しなくてはいけないと書かれています。


次に、サーバを立てます。
ApacheでもAnhttpdでもなんでもよいです。

次にサーバの設定でCGIを許可します。
許可の仕方はサーバごとに異なるので、サーバのドキュメントを見て設定してください。

ここからが重要です。
次に、nphスクリプトを作ります。
nphスクリプトというのはスクリプトのファイル名がnph-で始まるCGIスクリプトを指します。
nph-で始まると通常のCGIと何が違うのかというと、HTTPのレスポンスヘッダーをサーバ側で自動で送らないので、クライアント側で自由なレスポンスを返すことができます。

npHスクリプトをかくことで、Rangeヘッダーの時に必要な206 Partial Contentをサーバが返すことができるのです。


あとはスクリプトを作ってストリームを返せば動画配信サーバが完成

ということで最後にスクリプトと言っておきながらC言語でnphスクリプトを書いて見ました。

あとは環境変数のQUERY_STRINGでセッション値を取得するなどして、セキュアにストリームを配信すれば誰でも動画配信サービスをすることができます。

動画配信サーバってめちゃくちゃ簡単。
こんなにも簡単な技術であんなにお金を儲けられちゃうなんて。


npg-stream.c
----------------------------------------------
#include<stdio.h>
#include<stdlib.h>
#include<string.h>

#ifdef WIN32
#include<fcntl.h>
#include <io.h>
#endif

#ifdef _MSC_VER
#if _MSC_VER >= 1400
#pragma warning( disable : 4996 )
#pragma warning( disable : 4819 )
#endif
#endif


//
//  binarymode
//
static void setstdoutmode()
{
#ifdef WIN32
_setmode(_fileno(stdout), _O_BINARY);
#endif

}


//
// util
//

void mygetenv(char* buf,const char* moji, int sz)
{
char *p;
if (buf == NULL || moji==NULL || sz<1)return;
p = getenv(moji);
if (p == NULL)p = "";

buf[0] = 0;
strncpy(buf, p, sz);
buf[sz-1] = 0;

}
int mymin(int a, int b)
{
if (a < b)return a;
return b;
}

FILE* fzengo_open(const char* file, const char* mode, const char* path)
{
FILE* ret = NULL;
char buf[4096];
if (file == NULL || mode == NULL || path == NULL)return ret;

if (strlen(file) > 1000)return ret;
if (strlen(path) > 1000)return ret;

// 1
sprintf(buf, "%s/%s", path, file);
ret = fopen(buf, mode);
if (ret != NULL)return ret;

// 2
sprintf(buf, "../%s/%s", path,file);
ret = fopen(buf, mode);
if (ret != NULL)return ret;

// 3
sprintf(buf, "../../%s/%s", path, file);
ret = fopen(buf, mode);
if (ret != NULL)return ret;

// 4
ret = fopen(file, mode);
if (ret != NULL)return ret;

// 5
sprintf(buf, "../%s", file);
ret = fopen(buf, mode);
if (ret != NULL)return ret;

return ret;
}

FILE* fall_zengo_open(const char* file, const char* mode)
{
FILE* ret = NULL;

ret = fzengo_open(file, mode, "excel");
if (ret != NULL)return ret;
ret = fzengo_open(file, mode, "music");
if (ret != NULL)return ret;
ret = fzengo_open(file, mode, "video");
if (ret != NULL)return ret;
ret = fzengo_open(file, mode, "movie");
if (ret != NULL)return ret;

return ret;
}

const char* getmime(const char* name)
{
const char* ret = "application/octet-stream";

if (name == NULL)return ret;
if (strstr(name, ".mp3")) ret = "audio/mpeg";
if (strstr(name, ".m4v")) ret = "video/x-m4v";
return ret;
}

//
// print
//
static void print(char* p)
{
if (p == NULL)return;
printf("%s\r\n",p);
}

static void printresponse()
{
print("HTTP/1.1 200 OK");
print("Connection: close");
print("Pragma: no-cache");
print("Cache-Control: no-cache");
print("Accept-Ranges: bytes");
}


static void printresponse_range(int st, int ed, int fsz)
{
char buf[256];
//  causes proxy issue
print("HTTP/1.1 206 Partial Content");
print("Connection: close");
print("Pragma: no-cache");
print("Cache-Control: no-cache");
print("Accept-Ranges: bytes");
sprintf(buf, "Content-Range: bytes %d-%d/%d", st, ed, fsz);
print(buf);
}


static void printheader(const char* p)
{
char buf[256];
if (p == NULL)return;
sprintf(buf, "Content-Type: %s", p);
print(buf);
print("");
}

static void printheader_size(const char* p,int sz)
{
char buf[256];
if (p == NULL)return;
sprintf(buf, "Content-Type: %s", p);
print(buf);
sprintf(buf, "Content-Length: %d", sz);
print(buf);
print("");
}

static void printbody(char*p, int l)
{
if (p == NULL || l < 1)return;
fwrite(p, 1, l, stdout);
}


static void print_file(char *file)
{
FILE* fp;
char buf[1024 * 256];
int sz, fsz;
char* mime;

if (file == NULL)return;
fp = fall_zengo_open(file, "rb");
if (fp == NULL)return;

fseek(fp, 0, SEEK_END);
fsz = ftell(fp);
fseek(fp, 0, SEEK_SET);

printresponse(fsz);

mime = getmime(file);
printheader_size(mime, fsz);

do{
sz = fread(buf, 1, mymin(fsz, sizeof(buf)), fp);
if (sz < 1)break;
printbody(buf, sz);
fsz -= sz;
if (fsz < 1)break;
} while (1);
fclose(fp);

}

static void print_file_range(char *file,int st,int ed)
{
FILE* fp;
char buf[1024 * 256];
int sz, fsz,ssz;
char* mime;

if (file == NULL)return;
fp = fall_zengo_open(file, "rb");
if (fp == NULL)return;

fseek(fp, 0, SEEK_END);
fsz = ftell(fp);
fseek(fp, 0, SEEK_SET);

if (ed == -1)ed = fsz - 1;
ssz = ed - st + 1;

if (ed < 0 || st < 0)goto end;
if (st + ssz>fsz)goto end;


fseek(fp, st, SEEK_SET);
printresponse_range(st,ed,fsz);

mime = getmime(file);
printheader_size(mime, ssz);

do{
sz = fread(buf, 1, mymin(ssz, sizeof(buf)), fp);
if (sz < 1)break;
printbody(buf, sz);
ssz -= sz;
if (ssz < 1)break;
} while (1);

end:
fclose(fp);

}



//
// main
//
int main(int argc, char *argv[])
{
char env[1024];
char query[1024];
char buf[1024];
char* filename="hoge.m4v";
char* p;
int f = 0, st = -1, ed = -1;
int fno = 0;

setstdoutmode();

mygetenv(env,"HTTP_RANGE",sizeof(env));
mygetenv(query, "QUERY_STRING", sizeof(env));

p = strstr(query, "f=");
if (p)sscanf(p + 2, "%d", &fno);
if (fno < 1)fno = 1;


if (filename[0] == 0)strcpy(filename, "null");

p = strstr(env, "bytes=");
if (p){
p += 6;
sscanf(p, "%d-%d", &st, &ed);
}

if (st != -1)f = 1;

if (f){
print_file_range(filename, st, ed);
}
else{
print_file(filename);
}
return 0;
}

----------------------------------------------



Protocol Buffersを使ってC++でRPCをやってみた

Googleが作っているProtocolBuffersは、異なるPC間やプロセス間で関数コールやデータのやり取りをするライブラリです。いろいろな言語で動くので、サーバ間で分散処理を行うのにとても便利です。Googleのサーバ側の分散処理の最も重要な中核な技術といっても過言ではないと思います。ProtocolBuffersを制するものはサーバを制します。

しかしC++でこのProtocol Buffersを使ってRPCをやろうとするとちょっとめんどくさいからなのか、GoogleのProtocolBuffersの公式ドキュメントにもC++のRPCの完全なサンプルがありません。
みんなC++でProtocolBuffers使わないのかなぁ。ネットで検索しても世の中にあまりいいサンプルがありません。
なので、C++でProtocol Buffersを使ってTCP/IPでRPCをするサンプルを作ってみました。



まず、person.protoを定義します。
オプションにcc_generic_servicesをつけないとRPC用のServiceのヘッダーを生成してくれないので注意が必要です。


person.proto
-----------------------------

option cc_generic_services = true;

message Person {
  required int32 id = 1;
  required string name = 2;
}

message HelloResponse {
  required string message = 1;
}

service HelloService {
  rpc hello(Person) returns (HelloResponse);
}

-----------------------------
このファイルをprotocにかけて、person.pb.hとperson.pb.ccを生成します。




次にサーバ
main_server.cc
-----------------------------
#include "person.pb.h"
//#include "HelloRpcChannel.h"

#include "sock2_local.h"

static int myrecv(SOCKET sc,char* buf,int sz)
{
int ret = 0;
int readed = 0;
while (sz > 0){
ret = recv(sc, buf, sz, 0);
if (ret < 1)break;
sz -= ret;
buf += sz;
readed += sz;
}
return readed;
}

void hello(int id, const char* name, char* ret, int sz)
{
printf("id=%d\n", id);
printf("name=%s\n", name);
strcpy(ret, "hagehagehagehage");
}

int main(int argc, char* argv[]) {

SOCKET s=-1;
char buf[1024];
int sz = 0;
char*p;
int ret;
SOCKADDR_IN sa;
memset(&sa, 0, sizeof(sa));

winsock_init();

sa.sin_addr.s_addr = htonl(INADDR_ANY);
sa.sin_port = htons(12340);
sa.sin_family = PF_INET;

s = socket(PF_INET, SOCK_STREAM, 0);
if (s == -1)goto error;
ret = bind(s, (SOCKADDR*)&sa, sizeof(sa));
if (ret == -1){
printf("biund error\n");
goto error;
}
ret = listen(s, 20);
if (ret == -1){
printf("listen error\n");
goto error;
}
while (1){
char resbuf[256];
Person person;
HelloResponse res;
SOCKET sc;
SOCKADDR_IN sac;
memset(&sac, 0, sizeof(sac));
sz = sizeof(sac);
printf("before accept\n");
sc = accept(s,(SOCKADDR*)&sac,&sz);
printf("after accept\n");
if (sc == -1){
printf("accept error\n");
break;
}

ret = myrecv(sc, (char*)&sz, 4);
sz = ntohl(sz);
if (sz<1 || sz>1024)goto next;
printf("server recv data size=%d\n",sz);
myrecv(sc, buf, sz);
person.ParseFromArray(buf, sz);

resbuf[0] = 0;
hello(person.id(), person.name().c_str(), resbuf, sizeof(resbuf));
res.set_message(resbuf);
sz=res.ByteSize();
if (sz<1 || sz>1024)goto next;
printf("server send response %d\n",sz);
ret = htonl(sz);
send(sc, (char*)&ret, 4, 0);
res.SerializeToArray(buf, sz);
send(sc, buf, sz, 0);
next:
closesocket(sc);
}

error:
if(s!=-1)closesocket(s);
winsock_done();




return 0;
}
-----------------------------




最後にクライアント
main_client.cc
-----------------------------
#include "person.pb.h"

#include "sock2_local.h"


static int myrecv(SOCKET sc, char* buf, int sz)
{
int ret = 0;
int readed = 0;
while (sz > 0){
ret = recv(sc, buf, sz, 0);
if (ret < 1)break;
sz -= ret;
buf += sz;
readed += sz;
}
return readed;
}


class HelloRpcChannel :public google::protobuf::RpcChannel{
public:
HelloRpcChannel()
{
}
virtual ~HelloRpcChannel()
{
}
virtual void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done)
{
SOCKET s;
SOCKADDR_IN sa;
char buf[1024];
int sz;
int ret;

winsock_init();
s = socket(PF_INET, SOCK_STREAM, 0);
if (s == -1)goto error;

memset(&sa, 0, sizeof(sa));
sa.sin_port = htons(12340);
sa.sin_addr.s_addr = htonl(0x7f000001);
sa.sin_family = PF_INET;

ret = connect(s, (SOCKADDR*)&sa, sizeof(sa));
if (ret == -1){
printf("client connect NG\n");
goto error;
}
else{
printf("client connect OK\n");
}
sz = request->ByteSize();
if (sz<1 || sz>1000)goto error;

request->SerializeToArray(buf, sz);

printf("client request data size=%d\n", sz);
ret = htonl(sz);
ret = send(s, (char*)&ret, 4, 0);
ret = send(s, (char*)buf, sz, 0);
ret = myrecv(s, (char*)&sz, 4);
sz = ntohl(sz);
if (sz<1 || sz>1000)goto error;

ret = myrecv(s, buf, sz);
printf("client response data size=%d\n", sz);
response->ParseFromArray(buf, sz);
//printf("response Message=%s\n",esponse.message().c_str());
error:
if (s != -1)closesocket(s);
done->Run();
winsock_done();
}

};


class MyRpcController : public google::protobuf::RpcController{
public:
MyRpcController()
{
}
virtual ~MyRpcController()
{
}
virtual void Reset()
{
}
virtual bool Failed () const
{
return false;
}
virtual std::string ErrorText() const
{
return "";
}
virtual void StartCancel()
{
}
virtual void SetFailed(const std::string& reason)
{
}
virtual bool IsCanceled() const
{
return false;
}
virtual void NotifyOnCancel(google::protobuf::Closure* callback){
}
};

class MyClosure : public google::protobuf::Closure{
public:
MyClosure()
{
}
virtual ~MyClosure()
{
}
virtual void Run()
{
}
};


void hello(int id, const char* name, char* buf, int sz)
{
HelloRpcChannel hrc;

HelloService_Stub hss(&hrc);
Person req;
HelloResponse res;
MyRpcController con;
MyClosure clo;


if (name == NULL || buf == NULL || sz < 1)return;
buf[0] = 0;

req.set_id(123);
req.set_name("hogehoge");
hss.hello(&con, &req, &res, &clo);

strncpy(buf,res.message().c_str(),sz);
buf[sz - 1] = 0;
}


int main(int argc, char* argv[]) {
char buf[256];
hello(123, "hoge", buf, sizeof(buf));
printf("hello() return=>>%s<<\n ",buf);
return 0;
}
-----------------------------

DeepLearningのライブラリTensorFlowとかもこうやって分散処理させているのね。
多言語間のプロセス間通信って意外と簡単。
これで誰でも分散処理ができる。


2015年12月8日火曜日

Android NDKのatof()問題

AndroidにはCランタイムライブラリのatof()関数がある機種とない機種があります。
正確にいうとatof()関数がインライン展開される機種とされない機種があります。

これらの関数を使うプログラムでは、ランタイムライブラリとNDKのバージョンが合わず動かないことがあります。

D/dalvikvm(18948): Trying to load lib /data/app-lib/com.yomei.ndkshell/libndkshell_.so 0x41ed4ba8
E/dalvikvm(18948): dlopen("/data/app-lib/com.yomei.ndkshell/libndkshell_.so") failed: Cannot load library: soinfo_relocate(linker.cpp:975): cannot locate symbol "atof" referenced by "libndkshell_.so"...
W/System.err(18948): java.lang.UnsatisfiedLinkError: Cannot load library: soinfo_relocate(linker.cpp:975): cannot locate symbol "atof" referenced by "libndkshell_.so"...
W/System.err(18948):    at java.lang.Runtime.loadLibrary(Runtime.java:371)
W/System.err(18948):    at java.lang.System.loadLibrary(System.java:535)
W/System.err(18948):    at com.yomei.SomeJni.<clinit>(SomeJni.java:8)
W/System.err(18948):    at com.yomei.SomeMain.main(SomeMain.java:283)
W/System.err(18948):    at com.yomei.SomeMain$MainActivity$1.run(SomeMain.java:357)

こんな風に怒られてしまいます。

この問題を解決するために、ランタイムライブラリ側ではatofを必ず持っていないといけなく、NDK側ではインライン展開されるヘッダーファイルでないといけないのですが、一部の古いランタイムライブラリと最近のNDKの組み合わせではそうなっていない・・・。
Google自身も64ビット化の対応で大変でもう32ビットのルールのことを忘れてるでしょ?


ネットで解決方法を調べてみたのですが、どれもまだ正しくないというかなんというか。
すべての機種で動かいないのです。

ということで、問題を分析して解決方法を考えてみました。
わかってしまえば、解決方法はすごい簡単なのですが、自分のsoファイルにatof()関数を追加すればよいだけです。こんな感じ。

atof_.c
------------------------------
#include <stdio.h>
#include <ctype.h>
#include "atof_.h"

#ifdef ANDROID
double atof(char *s)
#else
double atof__(char *s)
#endif
{
double a = 0.0;
int e = 0;
int c;
while ((c = *s++) != '\0' && isdigit(c)) {
a = a*10.0 + (c - '0');
}
if (c == '.') {
while ((c = *s++) != '\0' && isdigit(c)) {
a = a*10.0 + (c - '0');
e = e-1;
}
}
if (c == 'e' || c == 'E') {
int sign = 1;
int i = 0;
c = *s++;
if (c == '+')
c = *s++;
else if (c == '-') {
c = *s++;
sign = -1;
}
while (isdigit(c)) {
i = i*10 + (c - '0');
c = *s++;
}
e += i*sign;
}
while (e > 0) {
a *= 10.0;
e--;
}
while (e < 0) {
a *= 0.1;
e++;
}
return a;
}

------------------------------


atof_.h
------------------------------
#ifndef ATOF__H_
#define ATOF__H_


#ifdef __cplusplus
extern "C" {
#endif


double atof(char *s);
double atof__(char *s);


#ifdef __cplusplus
}
#endif


#endif
------------------------------

これで自作アプリでatof()関数を使用しても最新のNDKと古い32ビットの機種の組み合わせでも動きます。